Thursday, February 14, 2013

Netflix Queue: Data migration for a high volume web application



There will come a time in the life of most systems serving data, when there is a need to migrate data to a more reliable, scalable and high performance data store while maintaining or improving data consistency, latency and efficiency. This document explains the data migration technique we used at Netflix to migrate the user’s queue data between two different distributed NoSQL storage systems.

What is the Netflix Queue

The Netflix Queue lets you keep and maintain a list of the movies & TV shows you want to watch on your devices and computers. 

Previous Implementation 

Netflix embraces Service Oriented Architecture (SOA) composed of many small fine grained services that do one thing and one thing well. In that vein, the Queue service is used to fetch and maintain the user’s Queue. For every Netflix user, a list of ordered videos and other meta data related to when and where the video was added to their Queue is persisted in AWS Cloud, with SimpleDB as the source of truth. Data in SimpleDB are sharded across multiple domains (similar to RDBMS tables) for performance and scalability purposes. Queue data is used for both display purposes as well as to influence personalization ranking. 
Queue RPS and Data Size 

Following graph shows the RPS served by Queue service, with a max of 40K RPS. There are in total of 150+ Million records in our data store, with a total size of 300GB.


Goals

Back when Queue service was originally designed in 2009, SimpleDB was a good solution. However, since then, it has not kept pace with our subscriber growth both in terms of SLA and cost effectiveness. Our goal was to migrate data off of SimpleDB with the following requirements: 
  • High Data Consistency 
  • High Reliability and Availability 
  • No downtime for reads and writes 
  • No degradation in performance of the existing application 
After careful considerations and running various performance benchmarks, we decided to use Cassandra as the new data store for Queue service as it suited well for our high volume, low latency writes requirements and for our reads that are primarily accessed through key-value lookups. 

Data Migration 

Migrating data to an eventually consistent data store, such as Cassandra, for a high volume, low latency application and verifying its consistency is a multi step process. It involves an one time data forklifting and then applying further changes incrementally. There could be error cases where the incremental updates cannot be successfully applied for reasons such as timeouts, throttling of data stores, temporary node unavailability etc. Running an end to end consistency checker and validating data by doing shadow reads helped us better evaluate the consistency of the migrated data. The following sections elaborate on the steps taken to end of life SimpleDB for Queue service. 

Our migrator code base is configured to run in one of the three modes viz Forklift, Incremental Replication and Consistency Checker.

a) Forklift 
The first step in the process of data migration is to forklift the data from the source data store into the target data store. In this mode, the current snapshot of the source data store is copied in its entirety to the target data store. SimpleDB throttles requests when the RPS to a domain is greater than a certain threshold value to impose fairness on all users of the system. Hence, it is imperative to not put too much load on a SimpleDB domain during the migration, as it would affect the SLA requirements of the existing Queue service. Depending on the data size, throttling of the source data store and the latency requirements for migration, we can choose the number of parallel instances and the number of worker threads within each instance that perform the forklift. 

Each migrator thread worked on different data sets within a domain, to avoid migrating the same data multiple times. Based on the configured number of threads, the migrator will automatically chose different data sets for each thread. The migrator is also time aware; it pauses thread execution during peak hours of production traffic and continues forklifting during non-peak hours. The Migrator instances had the state of all forklifting related threads persisted periodically. Thus, if the instance or the forklift application terminates, we could resume the migration from where it had stopped. 

Forklift was ran just once as the initial step of the migration process. It look a little over 30 hours to forklift the entire data set. 
b) Incremental Replication 
This phase was started after the forklift was completed. At this stage, updates to user’s Queue were still only sent to SimpleDB. Migration code was run in Incremental Replication mode to have Cassandra in sync with the updates that happened after forklifting. In this mode, instead of copying all the data from SimpleDB, only the data that were changed since the previous Incremental Replication run were copied to Cassandra. 

We had an attribute called Last_Updated_TS in SimpleDB that gets updated for every mutation. This attribute was indexed to get better performance while fetching the source records that were updated since the last run. We only did soft deletes with a delete marker being set in SimpleDB. This mode would not be able to handle hard deletes. Migration code, in this mode, was run continuously. 
c) Consistency Checker 
At this stage, Incremental Replication was continuously running. However, there could be error cases where the incremental updates cannot be successfully applied to Cassandra for reasons such as timeout, throttling by SimpleDB, temporary node unavailability etc. To circumvent these cases, we ran an end to end Consistency Checker. This mode is similar to Forklift, except that instead of blindly copying the source data, we compared all the data in both the source and the target data stores, and updated the target data store only with the records that mismatched. We kept track of the number of such mismatches for each run and other related meta data about the records that mismatched. Migration code was run continuously even in this mode. 
d) Shadow Writes 
Following are the steps taken, in chronological order, to update Queue service to use Cassandra and eventually end of life SimpleDB. 
  • Reads: Only from SimpleDB (Source of truth) 
  • Writes: SimpleDB and Cassandra 
At this stage, we updated Queue service to do shadow writes to Cassandra. The source of truth for reads was still SimpleDB. For every user request to update their Queue, which earlier used to just update SimpleDB, an additional asynchronous request to update Cassandra was submitted. We kept track of the number of successful/unsuccessful updates to Cassandra. Any unsuccessful update would eventually be fixed by the Incremental Replicator or by the Consistency Checker. Like every other project in Netflix, to make sure our Cassandra cluster could handle the production write traffic, we rolled out this feature incrementally, starting with 1% of our users to 10% and eventually to 100% of our users. This gave us a good indication of the Cassandra write latencies before we made it the source of truth. 
e) Shadow Writes and Shadow Reads for Validation 
  • Reads: SimpleDB (Source of truth) and Cassandra 
  • Writes: SimpleDB and Cassandra 
Once Shadow writes, Incremental Replication and Consistency checker were up and running, the next step was to do shadow reads. The source of truth for reads still continued to be SimpleDB. At this stage, for every user request to fetch an user’s Queue, an additional asynchronous request to fetch their Queue from Cassandra was submitted. Once the asynchronous request was completed, Queue data returned from both SimpleDB and Cassandra were compared. We kept track of the number of requests for which data in both these stores mismatched. The mismatched records would eventually be fixed by the Incremental Replication or by the Consistency Checker. Again, to make sure our Cassandra cluster could handle the production read traffic, we rolled out this feature incrementally. These shadow read traffic also helped us evaluate the performance of Cassandra read latencies on production traffic patterns.
f) End of Life SimpleDB 
  • Reads: Cassandra (Source of truth) 
  • Writes: Cassandra 
Within a short span of time, there were minimal data mismatch (<0.01%) found during Shadow reads, Incremental Replication and Consistency checker. At this stage, we flipped a flag to make Cassandra as the source of truth. After that, all requests to fetch user's Queue were synchronously retrieved from Cassandra and all updates to Queue were written only to Cassandra. SimpleDB was finally laid to rest in peace.
Life at Netflix

When we started this project, the only requirement given to us was to remove SimpleDB as a dependency. It was up to us to choose the right persistence store. We chose Cassandra and designed the correct data models for it. One of the things we loved about this project was the speed at which it was executed, which by the way was completely determined by us. We made several code pushes every week to production, but that comes with a huge responsibility to make sure our codes are well unit and integration tested. It is amazing to see ideas being formed, implemented and pushed to Production in a short span of time. 

If these kinds of scalability problems coupled with our freedom and responsibility enthuse you, we are looking for Senior Software Engineers on the Product Infrastructure team. At Netflix, you’ll be working with some of the brightest minds in the industry. Visit http://jobs.netflix.com to get started.