Monday, April 18, 2011

“More Like This…” Building a network of similarity

Ever wondered what makes “Inception” similar to “12 Monkeys”?

Looking at similarity between movies and TV shows is a useful way to find great titles to watch. At Netflix, we calculate levels of similarity between each movie and TV show and use these “similars”, as we call them, for a variety of customer-facing and internal systems.

Hi, there. Hans Granqvist, senior algorithm engineer, here to tell you more about how we build similars. I want to share some insights into how we lifted and improved this build as we moved it from our datacenter to the cloud.

The similars build process

First a little background.

To create a network of similars, we look at more than thousands facets associated with each title. A future post will go into more detail of the similarity build, but it follows this somewhat simplified process:

First we discover sets of titles that may be similar to the source title, based on the algorithm used and facets associated with each title. We then refine these sets of titles and filter them to remove unwanted matches, depending on algorithm deployment types and audiences.

After this, we dynamically weigh each facet and score it. The sum is the measure of similarity: the higher the sum, the more similar the titles.

Until last year, we built similars in our data center. As internal dependencies transferred to the cloud, its scaling capabilities became real. We wanted to use the cloud’s more flexible deployment structure, and its inherent parallelism would let us change the build to scale linearly with number of machines. We could increase the numbers of algorithms by an order of a magnitude.

Shortcomings of old Datacenter build. Opportunities and challenges of the cloud.

While the old build worked well, its shortcomings were several:
  • Algorithms were defined in the code making them hard to change.
  • The datacenter was limited to small set of machines leading to long recalculation times (several days).
  • Longer push cycles due to code linkage and runtime dependencies.
  • Building directly on production DB structure with varying resource availability.
Moving to the cloud presented new opportunities:
  • A new architecture lets us define algorithms outside of code, using distributed stores to properly isolate and share newer versions of algorithms.
  • The cloud’s unlimited capacity (within reason) could be exercised to build massively in parallel.
  • Netflix components are now all re-architected as services. We can push new code much faster, almost instantaneous. Internal dependencies are just an API call away. 
Of course, with this come challenges:
  • Remote service calls have latency.  Going from nanoseconds to milliseconds makes a huge difference when you repeat it millions of times.
  • The cloud persistence layers (SimpleDB and S3) have wildly varying performance characteristics. For some searches via SimpleDB, for example, there are surprisingly no SLAs.
  • With hundreds of machines building simultaneously, the need to partition and properly synchronize work becomes paramount.
  • The distributed nature by the cloud environment increases the risk of failures around data store, message bus systems, and caching layers.

We based our new cloud architecture on series of tasks distributed by a controller to a set of builder nodes that communicate through a set of message queues.

Each task contains information including source title and algorithm, with optional versioning. As a build node picks up these tasks off the queue, it collects the definition of the algorithm from persistent storage, converts it into a sequence of executional steps, and starts executing.

Technologies used
  • AWS Simple Queue System for communication between controller and nodes.
  • AWS SimpleDB, Amazon’s row database, to store the definitions of algorithms.
  • AWS S3, Amazon’s key/value store.
  • EV Cache, a Netflix-developed version of memcache to increase throughput.
  • A Netflix-developed persistent store mechanism that transparently chains various types of caching (local near-cache LRU cache and service-shared EV cache, for example) to S3.
Build process

The following figure shows the various components in the build process. The Controller sends tasks on an SQS instruction queue. These tasks are read by a set of Build Nodes, which read the algorithms from SimpleDB and S3, and use various data sources to calculate the set of similars. When done, the node writes the result to persistent store and signals build status back to the controller via an SQS feedback queue.

Architecture of the Similars Build process (click to enlarge). A ‘wrapped component’ indicates the component needs to be instrumented to handle network hiccups, failures and AWS API rate limitations.
Based on their availability to process new tasks, build nodes periodically read from the instruction queue. When a message has been seen and read, SQS guarantees other nodes will not read the same message until the message visibility window expires. 

The build process spins off independent task threads for each task parsed. The first time an algorithm is seen, a builder node reads the algorithm definition and decides whether it can process the task. Newer versions of build nodes with knowledge of newer sets of data sources can co-exist with older ones using versioned messages.

If a node cannot process the task, it drops the message on the floor and relies on the SQS time out window to expire so the message becomes visible for other nodes.  The time-out window has been tuned to give a node reasonable time to process the message.

SQS guarantees only that messages arrive, not that they arrive in the order they were put on the queue. Care has to be given to define each message as independent and idempotent.

The final step is to persist the now calculated list of score-ordered similar to S3.

Once the task has been performed, the node puts a feedback message on a feedback queue. The controller uses this feedback to measure build task progress and also to collect statistics on each node’s performance. Based on this statistic, the controller may change the number of builder threads for a node, how often it should read from the queue, or various other timeout and retry values for SQS, S3, and SimpleDB.

Error situations and solutions

Building the system made us realize that we’re in a different reality in the cloud.

Some of the added complexity comes from writing a distributed system, where anything can fail at any given time. But some of the complexity was unexpected and we had to learn how to handle the following on a much larger scale than we initially envisioned.
  • Timeouts and slowness reading algorithms and weights from persistent store systems, each of which can rate-limit a client if it believes the client abuses the service. Once in such a restricted state, your code needs to quickly ease off. The only way to try to prevent AWS API rate limitation is to start out slow and gradually increase your activity. Restriction normally applies to the entire domain, so all clients on that domain will be restricted, not just the one client currently misusing it. We handled these issues via multiple levels of caching (using both a near cache on the builder node + application level cache to store partial results) with exponential fallback retries.
  • Timeouts and AWS API rate limitation writing to SQS. Putting messages on the queue can fail. We handle this via exponential fallback retries.
  • Inability of a node to read from SQS. Also handled via exponential fallback retries.
  • Inability of nodes to process all tasks in a message. We batch messages on SQS for both cost and performance reasons. When a node cannot process all tasks in a message, we drop message on floor and rely on SQS to resend it.
  • Inability of a node to process a task inside a batch message. A node may have occasional glitches or find it impossible to finish its tasks (e.g., data sources may have gone offline). We collate all failed tasks and retry on each node until set is empty or fail the batch message after a number of tries.
  • Timeouts and AWS API rate limitation writing to persistency layers. We handle with exponential retry.
All exponential retries typically wait in the order of 500ms, 2000ms, 8000ms, and so on, with some randomness added to avoid nodes retrying at fixed intervals. Sometimes operations have to be retried up to dozens of times.


By moving our build to the cloud, we managed to cut the time it takes to calculate a network of similars from up to two weeks down to mere hours. This means we can now experiment and A/B test new algorithms much more easily.

We also now have combinatorial algorithms (algorithms defined in terms of other algorithms) and the build nodes use this fact to execute builds in dependency order. Subsequent builds pick up cached results and we have seen exponential speed increases.

While network builds such as these many times can be embarrassingly parallel, it is worth noticing that the error situations come courtesy by a distributed environment where there in many cases are ill-defined (or none at all) SLAs.

One key insight is that the speed with which we can build new networks is now gated by how fast we can pump the result data to the receiving permanent store. CPU and RAM is a cheap and predictable cloud commodity. I/O is not.

The lessons of building this system will be invaluable as we progress into more complex processes where we take in more factors, many of which are highly temporal and real-time driven or limited to specific countries, regions, languages, and cultures.

Let us know what you think… and thanks for reading!

PS. Want to work in our group? Send us your resume and let us know why you think, or don’t think, “Inception” is similar to “12 Monkeys.”