Monday, February 25, 2013

Announcing EVCache: Distributed in-memory datastore for Cloud

By : Shashi Madappa

EVCache is a distributed in-memory caching solution based on memcached & spymemcached that is well integrated with Netflix OSS and AWS EC2 infrastructure. Today we are announcing the open sourcing of EVCache client library on Github.

EVCache is an abbreviation for:
Ephemeral  - The data stored is for a short duration as specified by its TTL(Time To Live).
Volatile  - The data can disappear any time (Evicted).
Cache – An in-memory key-value store.

The advantages of distributed caching are:

  • Faster response time compared to data being fetched from source/database
  • Reduces the load and number of servers needed to handle the requests as most of the requests are served by the cache
  • Increases the throughput of the services fronted by the cache

Please read more about EVCache from our earlier blog post for more details.

What is an EVCache App?
EVCache App is a logical grouping of one or more memcached instances (servers). Each instance can be a

  • EVCache Server (to be open sourced soon) running memcached and a Java sidecar app
  • EC2 instance running memcached
  • ElastiCache instance
  • instance that can talk memcahced protocol (eg. Couchbase, MemcacheDB)

Each app is associated with a name. Though it is not recommended, a memcached instance can be shared across multiple EVCache Apps.

What is an EVCache Client?
EVCache client manages all the operations between an Java application and EVCache App.

What is an EVCache Server?
EVCache Server is an EC2 instance running an instances of memcached and a Java Sidecar application. The sidecar is responsible for interacting with Eureka, monitoring the memcached process and collecting and reporting performance data to the Servo. This will be Open Sourced soon.

Generic EVCache Deployment

The  Figure 1 shows an EVCache App consisting of 3 memcached nodes with an EVCache client connecting to it.
Figure 1
The data is sharded across the memcached nodes based on Ketama consistent hashing algorithm. In this mode all the memcached nodes can be in the same availability zone or spread out across multiple availability zones.

Multi-Cluster EVCache Deployment
The  Figure 2 shows an EVCache App in 2 Clusters (A & B)  with 3 memcached nodes in each Cluster. Data is replicated between the two clusters.  To achieve low latency, reliability and isolation all the EC2 instances for a cluster should be in the same availability zone. This way if an availability zone is having any issues the performance of the other zone is not impacted. In a scenario where we lose instances in one cluster, we can dynamically set that cluster to “write only” and direct all the read traffic to other zone. This ensures that latency and cache hit rate is not impacted. 

Figure 2
In the above scenario, the data is replicated across both the clusters and is sharded across the 3 instances in each cluster based on Ketama consistent hashing algorithm. All the reads (get, getBulk, getAndTouch) by a client are sent to the same zone whereas the writes(set & delete) are done on both the zones.  The data replication across both the clusters increases its availability. Since the data is always read from the local zone this improves the latency. This approach is best suited if you want to achieve better performance with higher reliability. 

If some data is lost due to an instance failure or eviction in a cluster, then the data can be fetched from the other cluster. Having fallback improves the both availability & reliability. In most cases fetching data from other cluster(fallback) is much faster than getting the data from source.

EVCache Deployment using Eureka

The  Figure 3 shows an EVCache App in 3 Clusters (A, B & C)  with 3 EVCache servers in each Cluster. Each cluster is in an availability zones. An EVCache server (to be open sourced soon) consists of a memcached instance and sidecar app. The sidecar app interacts with Eureka Server and monitor the memcached process.
Figure 3
In the above scenario the EVCache client gets the list of servers from Eureka and creates cluster based on the availability zone of each EVCache Server. If the EVCache Server instances are added or removed the EVCache client re-configures itself to reflect this change. This is transparent to the client.

Similar to Multi-Clustered deployment the data is sharded across the 3 instances within the same zone based on Ketama consistent hashing algorithm. All the reads by a client are performed on the same zone as the client whereas the writes are done across all the zones. This ensures that data is replicated across all the zones thus increasing its availability. Since the data is always read from the local zone this improves the latency at the same time improving the data reliability.

If zone fallback is enabled and some data is lost due to instance/zone failure or eviction, then the data can be fetched from the clusters in other zone. This however causes an increase in latency but higher reliability. In most cases fetching data from other zone is much faster than getting the data from source.

Netflix OSS - All Netflix Open Source Software
EVCache - Sources  & EVCache Wiki - Documentation & Configuration
memcached - A high-performance in-memory data store
spymemcached - Java Client to memcached
Archaius - Library for configuration management API
Eureka - Discovery and Managing EVCache servers
Servo  - Application Monitoring Library

If you like building infrastructure components like this, for a service that millions of people use world wide, take a look at

Thursday, February 14, 2013

Netflix Queue: Data migration for a high volume web application

There will come a time in the life of most systems serving data, when there is a need to migrate data to a more reliable, scalable and high performance data store while maintaining or improving data consistency, latency and efficiency. This document explains the data migration technique we used at Netflix to migrate the user’s queue data between two different distributed NoSQL storage systems.

What is the Netflix Queue

The Netflix Queue lets you keep and maintain a list of the movies & TV shows you want to watch on your devices and computers. 

Previous Implementation 

Netflix embraces Service Oriented Architecture (SOA) composed of many small fine grained services that do one thing and one thing well. In that vein, the Queue service is used to fetch and maintain the user’s Queue. For every Netflix user, a list of ordered videos and other meta data related to when and where the video was added to their Queue is persisted in AWS Cloud, with SimpleDB as the source of truth. Data in SimpleDB are sharded across multiple domains (similar to RDBMS tables) for performance and scalability purposes. Queue data is used for both display purposes as well as to influence personalization ranking. 
Queue RPS and Data Size 

Following graph shows the RPS served by Queue service, with a max of 40K RPS. There are in total of 150+ Million records in our data store, with a total size of 300GB.


Back when Queue service was originally designed in 2009, SimpleDB was a good solution. However, since then, it has not kept pace with our subscriber growth both in terms of SLA and cost effectiveness. Our goal was to migrate data off of SimpleDB with the following requirements: 
  • High Data Consistency 
  • High Reliability and Availability 
  • No downtime for reads and writes 
  • No degradation in performance of the existing application 
After careful considerations and running various performance benchmarks, we decided to use Cassandra as the new data store for Queue service as it suited well for our high volume, low latency writes requirements and for our reads that are primarily accessed through key-value lookups. 

Data Migration 

Migrating data to an eventually consistent data store, such as Cassandra, for a high volume, low latency application and verifying its consistency is a multi step process. It involves an one time data forklifting and then applying further changes incrementally. There could be error cases where the incremental updates cannot be successfully applied for reasons such as timeouts, throttling of data stores, temporary node unavailability etc. Running an end to end consistency checker and validating data by doing shadow reads helped us better evaluate the consistency of the migrated data. The following sections elaborate on the steps taken to end of life SimpleDB for Queue service. 

Our migrator code base is configured to run in one of the three modes viz Forklift, Incremental Replication and Consistency Checker.

a) Forklift 
The first step in the process of data migration is to forklift the data from the source data store into the target data store. In this mode, the current snapshot of the source data store is copied in its entirety to the target data store. SimpleDB throttles requests when the RPS to a domain is greater than a certain threshold value to impose fairness on all users of the system. Hence, it is imperative to not put too much load on a SimpleDB domain during the migration, as it would affect the SLA requirements of the existing Queue service. Depending on the data size, throttling of the source data store and the latency requirements for migration, we can choose the number of parallel instances and the number of worker threads within each instance that perform the forklift. 

Each migrator thread worked on different data sets within a domain, to avoid migrating the same data multiple times. Based on the configured number of threads, the migrator will automatically chose different data sets for each thread. The migrator is also time aware; it pauses thread execution during peak hours of production traffic and continues forklifting during non-peak hours. The Migrator instances had the state of all forklifting related threads persisted periodically. Thus, if the instance or the forklift application terminates, we could resume the migration from where it had stopped. 

Forklift was ran just once as the initial step of the migration process. It look a little over 30 hours to forklift the entire data set. 
b) Incremental Replication 
This phase was started after the forklift was completed. At this stage, updates to user’s Queue were still only sent to SimpleDB. Migration code was run in Incremental Replication mode to have Cassandra in sync with the updates that happened after forklifting. In this mode, instead of copying all the data from SimpleDB, only the data that were changed since the previous Incremental Replication run were copied to Cassandra. 

We had an attribute called Last_Updated_TS in SimpleDB that gets updated for every mutation. This attribute was indexed to get better performance while fetching the source records that were updated since the last run. We only did soft deletes with a delete marker being set in SimpleDB. This mode would not be able to handle hard deletes. Migration code, in this mode, was run continuously. 
c) Consistency Checker 
At this stage, Incremental Replication was continuously running. However, there could be error cases where the incremental updates cannot be successfully applied to Cassandra for reasons such as timeout, throttling by SimpleDB, temporary node unavailability etc. To circumvent these cases, we ran an end to end Consistency Checker. This mode is similar to Forklift, except that instead of blindly copying the source data, we compared all the data in both the source and the target data stores, and updated the target data store only with the records that mismatched. We kept track of the number of such mismatches for each run and other related meta data about the records that mismatched. Migration code was run continuously even in this mode. 
d) Shadow Writes 
Following are the steps taken, in chronological order, to update Queue service to use Cassandra and eventually end of life SimpleDB. 
  • Reads: Only from SimpleDB (Source of truth) 
  • Writes: SimpleDB and Cassandra 
At this stage, we updated Queue service to do shadow writes to Cassandra. The source of truth for reads was still SimpleDB. For every user request to update their Queue, which earlier used to just update SimpleDB, an additional asynchronous request to update Cassandra was submitted. We kept track of the number of successful/unsuccessful updates to Cassandra. Any unsuccessful update would eventually be fixed by the Incremental Replicator or by the Consistency Checker. Like every other project in Netflix, to make sure our Cassandra cluster could handle the production write traffic, we rolled out this feature incrementally, starting with 1% of our users to 10% and eventually to 100% of our users. This gave us a good indication of the Cassandra write latencies before we made it the source of truth. 
e) Shadow Writes and Shadow Reads for Validation 
  • Reads: SimpleDB (Source of truth) and Cassandra 
  • Writes: SimpleDB and Cassandra 
Once Shadow writes, Incremental Replication and Consistency checker were up and running, the next step was to do shadow reads. The source of truth for reads still continued to be SimpleDB. At this stage, for every user request to fetch an user’s Queue, an additional asynchronous request to fetch their Queue from Cassandra was submitted. Once the asynchronous request was completed, Queue data returned from both SimpleDB and Cassandra were compared. We kept track of the number of requests for which data in both these stores mismatched. The mismatched records would eventually be fixed by the Incremental Replication or by the Consistency Checker. Again, to make sure our Cassandra cluster could handle the production read traffic, we rolled out this feature incrementally. These shadow read traffic also helped us evaluate the performance of Cassandra read latencies on production traffic patterns.
f) End of Life SimpleDB 
  • Reads: Cassandra (Source of truth) 
  • Writes: Cassandra 
Within a short span of time, there were minimal data mismatch (<0.01%) found during Shadow reads, Incremental Replication and Consistency checker. At this stage, we flipped a flag to make Cassandra as the source of truth. After that, all requests to fetch user's Queue were synchronously retrieved from Cassandra and all updates to Queue were written only to Cassandra. SimpleDB was finally laid to rest in peace.
Life at Netflix

When we started this project, the only requirement given to us was to remove SimpleDB as a dependency. It was up to us to choose the right persistence store. We chose Cassandra and designed the correct data models for it. One of the things we loved about this project was the speed at which it was executed, which by the way was completely determined by us. We made several code pushes every week to production, but that comes with a huge responsibility to make sure our codes are well unit and integration tested. It is amazing to see ideas being formed, implemented and pushed to Production in a short span of time. 

If these kinds of scalability problems coupled with our freedom and responsibility enthuse you, we are looking for Senior Software Engineers on the Product Infrastructure team. At Netflix, you’ll be working with some of the brightest minds in the industry. Visit to get started. 

Monday, February 11, 2013

First NetflixOSS Meetup

By Adrian Cockcroft and Ruslan Meshenberg

The inaugural Netflix Open Source Software (NetflixOSS) meetup was held at Netflix headquarters in Los Gatos California on February 6th 2013. We had over 200 attendees; individual developers, academics, vendors and companies, from small startups to the largest global corporations.

For those who missed it, a video of the presentations made at the event is included below. We also had visitors from TechCrunch and GigaOM writing stories about us.

How to host a Tech Meetup

We were very happy with the way the meeting itself worked out, but Paul Guth wrote a wonderful blog post on what he saw.

“Netflix has once again set the bar. Not with their technology this time – but with their organizing. I just got back from the first meetup of the NetflixOSS group - and it was spectacular.”

The inside story of how this meetup happened provides another example of how Netflix culture works. We don’t have a recipe for a meetup, there was no master plan with a Steve Jobs like attention to getting the fine details exactly right. There was no process. Instead, we told people the high level goals of what we were trying to do, got out of the way and trusted that they could figure it out or ask for clarification as needed. On the day of the event we had excellent facilities support setting everything up, making sure people knew where to go, and staying up very late to put it all away. We had wonderful asian finger food (sushi!) and plenty of beer and wine. There was signage and large monitors in 10 separate demo stations with the engineers that own individual NetflixOSS projects. Ruslan led the event overall and structured the talks, Adrian worked on the what/why message and how it works slides, Joris from PR got journalists to come along. Betty coordinated the event setup. The Netflix design team came up with a NetflixOSS logo, T-shirt design and TechTattoo stickers. Leslie turned the designs into actual T-shirts and stickers.  A lot of work from a lot of people went into the meetup, but it was fun and frictionless, with immediate buy-in. It sounds too good to be true but good ideas “get legs” at Netflix, and take off in a way that isn’t often seen at other companies. Netflix gets out of the way of innovation. We get more done with fewer people in less time, and this is a key ingredient to maintaining high talent density, because employees aren’t being frustrated by bureaucracy, the default behavior is to trust their judgement.  Just like with NetflixOSS components, we apply similar philosophy with our people - whole is greater than the sum of the parts.  Together, the amazing people that we have are able to accomplish much more, than all of their individual accomplishments put together.

NetflixOSS History

Ruslan opened by showing everyone an email exchange. In mid 2011 Jordan sent an email to some managers asking what the process was to open-source a project, the reply was that there is no process or policy, just go ahead. Jordan then asked if he should just put on Apache license headers and show it to legal, and the response was “If you think legal review is going to improve your code quality, go ahead!”. When the code was ready it was released in late 2011 as Curator. During 2012 another 15 projects were added, and three more already in 2013.

We were releasing the platform that runs Netflix streaming one piece at a time, and other people started using bits and pieces individually. The transition we are making in 2013 is that we are putting the puzzle pieces together as a coherent platform, branding it NetflixOSS and will make it easy to adopt it as a complete Platform as a Service solution. NetflixOSS supports teams of developers who are deploying large scale globally distributed applications. Netflix has hundreds of developers deploying on tens of thousands of instances across several AWS Regions to support the global streaming product.

Why Open Source the Platform?

In 2009-2010 Netflix was figuring out the architecture from first principles, trying to follow AWS guidelines and building a platform based on scalabiity experience some of our engineers had gained at places like Yahoo, eBay, and Google. In 2011 and 2012 we started talking about how we had built a large scale cloud-native application, and other companies began to follow our patterns. What was bleeding edge innovation in 2009 became accepted best practices by 2012, and is becoming a commodity in 2013-2014. Many competing cloud platforms have appeared, and by making it easy for people to adopt NetflixOSS we hope to become part of a larger ecosystem rather than having the industry pass us by.

When we started Open Sourcing pieces of our infrastructure, we had several goals for the program - outlined in this Techblog post.  We’re seeing great adoption and engagement across many developers and companies, and increasing stream of feedback and external contributions.  There is a growing number of 3rd party projects that utilize NetflixOSS components, some of them are listed on our Github page.

Putting it all Together

We released NetflixOSS bit by bit, and we don’t have a naming convention, so it can be hard to figure out how they fit together. Adrian presented a series of slides that explained how they fit together to form a build system, a set of services that support the platform, a collection of libraries that applications build against, and testing and maintenance functionality.

The Portability Question

When we built our platform we focused on what mattered to Netflix: scalability so we could roll out our streaming product globally; functionality so that our developers could build faster; and availability by leveraging multiple availability zones in each region. We regard portability as an issue we can defer, since AWS is so far ahead of the market in terms of functionality and scale, and we don’t want to hobble ourselves in the short term. However as we share the NetflixOSS platform, there is demand from end users and vendors to begin porting some of the projects for use in datacenter and hybrid cloud configurations. For the foreseeable future Netflix is planning to stay on AWS, but in the long term portability may be useful to us as well.

A New Project - Denominator

We announced a brand new project at the Meetup; we are working on multi-region failover and traffic sharing patterns to provide higher availability for the streaming service during regional outages caused by our own bugs and AWS issues. To do this we need to directly control the DNS configuration that routes users to each region and each zone. When we looked at the features and vendors in this space we found that we were already using AWS Route53, which has a nice API but is missing some advanced features; Neustar UltraDNS, which has a SOAP based API; and DynECT, which has an awkwardly structured REST API. We couldn’t find a Java based API that abstracted all these vendors into the common set of capabilities that we are interested in, so we are creating one. The idea is that any feature that is supported by more than one vendor API is the highest common denominator, and that functionality can be switched between vendors as needed, or in the event of a DNS vendor outage.

With most NetflixOSS projects, we are running the code in production at the point where we open source it on github. In the case of Denominator we are already sharing the code with DNS vendors who are helping us get the abstraction model right, and we will make it generally available during the development process. Denominator is a Java library for controlling DNS, we are building it to be as portable as possible, with few dependencies. We will embed it in services such as Edda, which collects the historical state of our cloud. This project is being led by Adrian Cole, who is well known as the author of the cross platform jClouds open source project. He recently joined Netflix and is bringing a lot of valuable experience to our platform team.


In addition to announcing Denominator, the next NetflixOSS Meetup will be on March 13th, and signup was enabled during the meeting. We are planning some surprise announcements for that event, and within two days we already have over 200 attendees registered.

Video and Slides

The video contains the introduction by Ruslan and Adrian, the lightning talks by each engineer and an extended demonstration of the Asgard console.

Monday, February 4, 2013

Reactive Programming in the Netflix API with RxJava

by Ben Christensen and Jafar Husain

Our recent post on optimizing the Netflix API introduced how our web service endpoints are implemented using a reactive programming model for composition of asynchronous callbacks from our service layer.

This post takes a closer look at how and why we use the reactive model and introduces our open source project RxJava – a Java implementation of Rx (Reactive Extensions).

Embrace Concurrency

Server-side concurrency is needed to effectively reduce network chattiness. Without concurrent execution on the server, a single "heavy" client request might not be much better than many "light" requests because each network request from a device naturally executes in parallel with other network requests. If the server-side execution of a collapsed "heavy" request does not achieve a similar level of parallel execution it may be slower than the multiple "light" requests even accounting for saved network latency.

Java Futures are Expensive to Compose

Java Futures are straight-forward to use for a single level of asynchronous execution but they start to add non-trivial complexity when they're nested (prior to Java 8 CompletableFuture).

Conditional asynchronous execution flows become difficult to optimally compose (particularly as latencies of each request vary at runtime) using Futures. It can be done of course, but it quickly becomes complicated (and thus error prone) or prematurely blocks on 'Future.get()', eliminating the benefit of asynchronous execution.

Callbacks Have Their Own Problems

Callbacks offer a solution to the tendency to block on Future.get() by not allowing anything to block. They are naturally efficient because they execute when the response is ready.

Similar to Futures though, they are easy to use with a single level of asynchronous execution but become unwieldy with nested composition.


Reactive programming offers efficient execution and composition by providing a collection of operators capable of filtering, selecting, transforming, combining and composing Observable's.

The Observable data type can be thought of as a "push" equivalent to Iterable which is "pull". With an Iterable, the consumer pulls values from the producer and the thread blocks until those values arrive. By contrast with the Observable type, the producer pushes values to the consumer whenever values are available. This approach is more flexible, because values can arrive synchronously or asynchronously.

The Observable type adds two missing semantics to the Gang of Four's Observer pattern, which are available in the Iterable type:

  1. The ability for the producer to signal to the consumer that there is no more data available.
  2. The ability for the producer to signal to the consumer that an error has occurred.

With these two simple additions, we have unified the Iterable and Observable types. The only difference between them is the direction in which the data flows. This is very important because now any operation we perform on an Iterable, can also be performed on an Observable. Let's take a look at an example ...

Observable Service Layer

The Netflix API takes advantage of Rx by making the entire service layer asynchronous (or at least appear so) - all "service" methods return an Observable<T>.

Making all return types Observable combined with a functional programming style frees up the service layer implementation to safely use concurrency. It also enables the service layer implementation to:

  • conditionally return immediately from a cache
  • block instead of using threads if resources are constrained
  • use multiple threads
  • use non-blocking IO
  • migrate an underlying implementation from network based to in-memory cache

This can all happen without ever changing how client code interacts with or composes responses.

In short, client code treats all interactions with the API as asynchronous but the implementation chooses if something is blocking or non-blocking.

This next example code demonstrates how a service layer method can choose whether to synchronously return data from an in-memory cache or asynchronously retrieve data from a remote service and callback with the data once retrieved. In both cases the client code consumes it the same way.

Retaining this level of control in the service layer is a major architectural advantage particularly for maintaining and optimizing functionality over time. Many different endpoint implementations can be coded against an Observable API and they work efficiently and correctly with the current thread or one or more worker threads backing their execution.

The following code demonstrates the consumption of an Observable API with a common Netflix use case – a grid of movies:

That code is declarative and lazy as well as functionally "pure" in that no mutation of state is occurring that would cause thread-safety issues.

The API Service Layer is now free to change the behavior of the methods 'getListOfLists', 'getVideos', 'getMetadata', 'getBookmark' and 'getRating' – some blocking others non-blocking but all consumed the same way.

In the example, 'getListOfLists' pushes each 'VideoList' object via 'onNext()' and then 'getVideos()' operates on that same parent thread. The implementation of that method could however change from blocking to non-blocking and the code would not need to change.


RxJava is our implementation of Rx for the JVM and is available in the ReactiveX repository in Github (prior to September 2014 was in the Netflix repo).

It is not yet feature complete with the .Net version of Rx, but what is implemented has been in use for the past year in production within the Netflix API.

We are open sourcing the code as version 0.5 as a way to acknowledgement that it's not yet feature complete. The outstanding work is logged in the RxJava Issues.

(Update: As of August 2014 the project hit the 1.0.0 Release Candidate milestone.)

Documentation is available on the RxJava Wiki including links to material available on the internet.

Some of the goals of RxJava are:

  • Stay close to the original Rx.Net implementation while adjusting naming conventions and idioms to Java
  • All contracts of Rx should be the same
  • Target the JVM not a language. The first languages supported (beyond Java itself) are Groovy, Clojure, Scala and JRuby. New language adapters can be contributed.
  • Support Java 6 (to include Android support) and higher with an eventual goal to target a build for Java 8 with its lambda support. (Update: Java 8 support was achieved without a separate build)

Here is an implementation of one of the examples above but using Clojure instead of Groovy:


Reactive programming with RxJava has enabled Netflix developers to leverage server-side conconcurrency without the typical thread-safety and synchronization concerns. The API service layer implementation has control over concurrency primitives, which enables us to pursue system performance improvements without fear of breaking client code.

RxJava is effective on the server for us and it spreads deeper into our code the more we use it.

We hope you find the RxJava project as useful as we have and look forward to your contributions.

If this type of work interests you we are always looking for talented engineers.

September 2014 Update

  • This blog post originally used the term "functional reactive programming" or FRP. This term was used in error. RxJava does not implement "continuous time" which is a requirement for FRP from previous literature.
  • Updated to new ReactiveX location for RxJava.