Tuesday, May 31, 2016

Meson: Workflow Orchestration for Netflix Recommendations

At Netflix, our goal is to predict what you want to watch before you watch it. To do this, we run a large number of machine learning (ML) workflows every day. In order to support the creation of these workflows and make efficient use of resources, we created Meson.

Meson is a general purpose workflow orchestration and scheduling framework that we built to manage ML pipelines that execute workloads across heterogeneous systems. It manages the lifecycle of several ML pipelines that build, train and validate personalization algorithms that drive video recommendations.

One of the primary goals of Meson is to increase the velocity, reliability and repeatability of algorithmic experiments while allowing engineers to use the technology of their choice for each of the steps themselves.

Powering Machine Learning Pipelines

Spark, MLlib, Python, R and Docker play an important role in several current generation machine learning pipelines within Netflix.

Let’s take a look at a typical machine learning pipeline that drives video recommendations and how it is represented and handled in Meson.

The workflow involves:
  • Selecting a set of users - This is done via a Hive query to select the cohort for analysis
  • Cleansing / preparing the data - A Python script that creates 2 sets of users for ensuring parallel paths
  • In the parallel paths, one uses Spark to build and analyze a global model with HDFS as temporary storage.
    The other uses R to build region (country) specific models. The number of regions is dynamic based on the cohort selected for analysis. The Build Regional Model and Validate Regional Model steps in the diagram are repeated for each region (country), expanded at runtime and executed with different set of parameters as shown below
  • Validation - Scala code that tests for the stability of the models when the two paths converge. In this step we also go back and repeat the whole process if the model is not stable.
  • Publish the new model - Fire off a Docker container to publish the new model to be picked up by other production systems

The above picture shows a run in progress for the workflow described above
  • The user set selection, and cleansing of the data has been completed as indicated by the steps in green.
  • The parallel paths are in progress
    • The Spark branch has completed the model generation and the validation
    • The for-each branch has kicked off 4 different regional models and all of them are in progress (Yellow)
  • The Scala step for model selection is activated (Blue). This indicates that one or more of the incoming branches have completed, but it is still not scheduled for execution because there are incoming branches that have either (a) not started or (b) are in progress
  • Runtime context and parameters are passed along the workflow for business decisions

Under the Hood

Let’s dive behind the scenes to understand how Meson orchestrates across disparate systems and look at the interplay within different components of the ecosystem. Workflows have a varying set of resource requirements and expectations on total run time. We rely on resource managers like Apache Mesos to satisfy these requirements. Mesos provides task isolation and excellent abstraction of CPU, memory, storage, and other compute resources. Meson leverages these features to achieve scale and fault tolerance for its tasks.

Meson Scheduler
Meson scheduler, which is registered as a Mesos framework, manages the launch, flow control and runtime of the various workflows. Meson delegates the actual resource scheduling to Mesos. Various requirements including memory and CPU are passed along to Mesos. While we do rely on Mesos for resource scheduling, the scheduler is designed to be pluggable, should one choose to use another framework for resource scheduling.

Once a step is ready to be scheduled, the Meson scheduler chooses the right resource offer from Mesos and ships off the task to the Mesos master.

Meson Executor

The Meson executor is a custom Mesos executor. Writing a custom executor allows us to  maintain a communication channel with Meson. This is especially useful for long running tasks where framework messages can be sent to the Meson scheduler. This also enables us to pass custom data that’s richer than just exit codes or status messages.

Once Mesos schedules a Meson task, it launches a Meson executor on a slave after downloading all task dependencies. While the core task is being executed, the executor does housekeeping chores like sending heartbeats, percent complete, status messages etc.


Meson offers a Scala based DSL that allows for easy authoring of workflows. This makes it very easy for developers to use and create customized workflows. Here is how the aforementioned workflow may be defined using the DSL.

val getUsers = Step("Get Users", ...)
val wrangleData = Step("Wrangle Data", ...)
val regionSplit = Step("For Each Region", ...)
val regionJoin = Step("End For Each", ...)
val regions = Seq("US", "Canada", "UK_Ireland", "LatAm", ...)
val wf = start -> getUsers -> wrangleData ==> (
  trainGlobalModel -> validateGlobalModel,
  regionSplit **(reg = regions) --< (trainRegModel, validateRegModel) >-- regionJoin
) >== selectModel -> validateModel -> end

// If verbs are preferred over operators
val wf = sequence(start, getUsers, wrangleData) parallel {
  sequence(trainGlobalModel, validateGlobalModel)
           forEach(reg = regions) sequence(trainRegModel, validateRegModel) forEach,
} parallel sequence(selectModel, validateModel, end)

Extension architecture

Meson was built from the ground up to be extensible to make it easy to add custom steps and extensions. Spark Submit Step, Hive Query Step, Netflix specific extensions that allow us to reach out to microservices or other systems like Cassandra are a some examples.

In the above workflow, we built a Netflix specific extension to call out to our Docker execution framework that enables developers to specify the bare minimum parameters for their Docker images. The extension handles all communications like getting all the status URLs, the log messages and monitoring the state of the Docker process.


Outputs of steps can be treated as first class citizens within Meson and are stored as Artifacts. Retries of a workflow step can be skipped based on the presence or absence of an artifact id. We can also have custom visualization of artifacts within the Meson UI. For e.g. if we store feature importance as an artifact as part of a pipeline, we can plug in custom visualizations that allow us to compare the past n days of the feature importance.

Screen Shot 2016-05-27 at 4.01.02 PM.png

Mesos Master / Slave
Mesos is used for resource scheduling with Meson registered as the core framework. Meson’s custom Mesos executors are deployed across the slaves. These are responsible for  downloading all the jars and custom artifacts and send messages / context / heartbeats back to the Meson scheduler. Spark jobs submitted from Meson share the same Mesos slaves to run the tasks launched by the Spark job.

Native Spark Support

Supporting Spark natively within Meson was a key requirement and goal. The Spark Submit within Meson allows for monitoring of the Spark job progress from within Meson, has the ability to retry failed spark steps or kill Spark jobs that may have gone astray. Meson also supports the ability to target specific Spark versions - thus, supporting innovation for users that want to be on the latest version of Spark.

Supporting Spark in a multi-tenant environment via Meson came with an interesting set of challenges. Workflows have a varying set of resource requirements and expectations on total run time. Meson efficiently utilizes the available resources by matching the resource requirements and SLA expectation to a set of Mesos slaves that have the potential to meet the criteria. This is achieved by setting up labels for groups of Mesos slaves and using the Mesos resource attributes feature to target a job to a set of slaves.

ML Constructs

As adoption increased for Meson, a class of large scale parallelization problems like parameters sweeping, complex bootstraps and cross validation emerged.
Meson offers a simple ‘for-loop’ construct that allows data scientists and researchers to express parameter sweeps allowing them to run tens of thousands of docker containers across the parameter values. Users of this construct can monitor progress across the thousands of tasks in real time, find failed tasks via the UI and have logs streamed back to a single place within Meson making managing such parallel tasks simple.


Here are some screenshots of the Meson UI:

And here are couple of interesting workflows in production:


Meson has been powering hundreds of concurrent jobs across multiple ML pipelines for the past year. It has been a catalyst in enabling innovation for our algorithmic teams thus improving overall recommendations to our members.

We plan to open source Meson in the coming months and build a community around it. If you want to help accelerate the pace of innovation and the open source efforts, join us to help make Meson better.

Wednesday, May 25, 2016

Application data caching using SSDs

The Moneta project: Next generation EVCache for better cost optimization

With the global expansion of Netflix earlier this year came the global expansion of data. After the Active-Active project and now with the N+1 architecture, the latest personalization data needs to be everywhere at all times to serve any member from any region. Caching plays a critical role in the persistence story for member personalization as detailed in this earlier blog post.

There are two primary components to the Netflix architecture. The first is the control plane that runs on the AWS cloud for generic, scalable computing for member signup, browsing and playback experiences. The second is the data plane, called Open Connect, which is our global video delivery network. This blog is about how we are bringing the power and economy of SSDs to EVCache - the primary caching system in use at Netflix for applications running in the control plane on AWS.

One of the main use cases of EVCache is to act as globally replicated storage for personalized data for each of our more than 81 million members. EVCache plays a variety of roles inside Netflix besides holding this data, including acting as a standard working-set cache for things like subscriber information. But its largest role is for personalization. Serving anyone from anywhere means that we must hold all of the personalized data for every member in each of the three regions that we operate in. This enables a consistent experience in all AWS regions and allows us to easily shift traffic during regional outages or during regular traffic shaping exercises to balance load. We have spoken at length about the replication system used to make this happen in a previous blog post.

During steady state, our regions tend to see the same members over and over again. Switching between regions is not a very common phenomenon for our members. Even though their data is in RAM in all three regions, only one region is being used regularly per member. Extrapolating from this, we can see that each region has a different working set for these types of caches. A small subset is hot data and the rest is cold.

Besides the hot/cold data separation, the cost of holding all of this data in memory is growing along with our member base. As well, different A/B tests and other internal changes can add even more data. For our working set of members, we have billions of keys already and that number will only grow. We have the challenge of continuing to support Netflix use cases while balancing cost. To meet this challenge, we are introducing a multi-level caching scheme using both RAM and SSDs.

The EVCache project to take advantage of this global request distribution and cost optimization is called Moneta, named for the Latin Goddess of Memory, and Juno Moneta, the Protectress of Funds for Juno.

Current Architecture

We will talk about the current architecture of the EVCache servers and then talk about how this is evolving to enable SSD support.

The picture below shows a typical deployment for EVCache and the relationship between a single client instance and the servers. A client of EVCache will connect to several clusters of EVCache servers. In a region we have multiple copies of the whole dataset, separated by AWS Availability Zone. The dashed boxes delineate the in-region replicas, each of which has a full copy of the data and acts as a unit. We manage these copies as separate AWS Auto Scaling groups. Some caches have 2 copies per region, and some have many. This high level architecture is still valid for us for the foreseeable future and is not changing. Each client connects to all of the servers in all zones in their own region. Writes are sent to all copies and reads prefer topologically close servers for read requests. To see more detail about the EVCache architecture, see our original announcement blog post.

Client - Server architecture.png

The server as it has evolved over the past few years is a collection of a few processes, with two main ones: stock Memcached, a popular and battle tested in-memory key-value store, and Prana, the Netflix sidecar process. Prana is the server's hook into the rest of Netflix's ecosystem, which is still primarily Java-based. Clients connect directly to the Memcached process running on each server. The servers are independent and do not communicate with one another.

Old Server Closeup.png


As one of the largest subsystems of the Netflix cloud, we're in a unique position to apply optimizations across a significant percentage of our cloud footprint. The cost of holding all of the cached data in memory is growing along with our member base. The output of a single stage of a single day's personalization batch process can load more than 5 terabytes of data into its dedicated EVCache cluster. The cost of storing this data is multiplied by the number of global copies of data that we store. As mentioned earlier, different A/B tests and other internal changes can add even more data. For just our working set of members, we have many billions of keys today, and that number will only grow.

To take advantage of the different data access patterns that we observe in different regions, we built a system to store the hot data in RAM and cold data on disk. This is a classic two-level caching architecture (where L1 is RAM and L2 is disk), however engineers within Netflix have come to rely on the consistent, low-latency performance of EVCache. Our requirements were to be as low latency as possible, use a more balanced amount of (expensive) RAM, and take advantage of lower-cost SSD storage while still delivering the low latency our clients expect.

In-memory EVCache clusters run on the AWS r3 family of instance types, which are optimized for large memory footprints. By moving to the i2 family of instances, we gain access to 10 times the amount of fast SSD storage as we had on the r3 family (80 → 800GB from r3.xlarge to i2.xlarge) with the equivalent RAM and CPU. We also downgraded instance sizes to a smaller amount of memory. Combining these two, we have a potential of substantial cost optimization across our many thousands of servers.

Moneta architecture

The Moneta project introduces two new processes to the EVCache server: Rend and Mnemonic. Rend is a high-performance proxy written in Go with Netflix use cases as the primary driver for development. Mnemonic is a disk-backed key-value store based on RocksDB. Mnemonic reuses the Rend server components that handle protocol parsing (for speaking the Memcached protocols), connection management, and parallel locking (for correctness). All three servers actually speak the Memcached text and binary protocols, so client interactions between any of the three have the same semantics. We use this to our advantage when debugging or doing consistency checking.

New Server Closeup.png

Where clients previously connected to Memcached directly, they now connect to Rend. From there, Rend will take care of the L1/L2 interactions between Memcached and Mnemonic. Even on servers that do not use Mnemonic, Rend still provides valuable server-side metrics that we could not previously get from Memcached, such as server-side request latencies. The latency introduced by Rend, in conjunction with Memcached only, averages only a few dozen microseconds.

As a part of this redesign, we could have integrated the three processes together. We chose to have three independent processes running on each server to maintain separation of concerns. This setup affords better data durability on the server. If Rend crashes, the data is still intact in Memcached and Mnemonic. The server is able to serve customer requests once they reconnect to a resurrected Rend process. If Memcached crashes, we lose the working set but the data in L2 (Mnemonic) is still available. Once the data is requested again, it will be back in the hot set and served as it was before. If Mnemonic crashes, it wouldn't lose all the data, but only possibly a small set that was written very recently. Even if it did lose the data, at least we have the hot data still in RAM and available for those users who are actually using the service. This resiliency to crashes is on top of the resiliency measures in the EVCache client.


Rend, as mentioned above, acts as a proxy in front of the two other processes on the server that actually store the data. It is a high-performance server that speaks the binary and text Memcached protocols. It is written in Go and relies heavily on goroutines and other language primitives to handle concurrency. The project is fully open source and available on Github. The decision to use Go was deliberate, because we needed something that had lower latency than Java (where garbage collection pauses are an issue) and is more productive for developers than C, while also handling tens of thousands of client connections. Go fits this space well.

Rend has the responsibility of managing the relationship between the L1 and L2 caches on the box. It has a couple of different policies internally that apply to different use cases. It also has a feature to cut data into fixed size chunks as the data is being inserted into Memcached to avoid pathological behavior of the memory allocation scheme inside Memcached. This server-side chunking is replacing our client-side version, and is already showing promise. So far, it's twice as fast for reads and up to 30 times faster for writes. Fortunately, Memcached, as of 1.4.25, has become much more resilient to the bad client behavior that caused problems before. We may drop the chunking feature in the future as we can depend on L2 to have the data if it is evicted from L1.


The design of Rend is modular to allow for configurable functionality. Internally, there are a few layers: Connection management, a server loop, protocol-specific code, request orchestration, and backend handlers. To the side is a custom metrics package that enables Prana, our sidecar, to poll for metrics information while not being too intrusive. Rend also comes with a testing client library that has a separate code base. This has helped immensely in finding protocol bugs or other errors such as misalignment, unflushed buffers, and unfinished responses.

Rend Internals.png

Rend's design allows different backends to be plugged in with the fulfillment of an interface and a constructor function. To prove this design out, an engineer familiar with the code base took less than a day to learn LMDB and integrate it as a storage backend. The code for this experiment can be found at https://github.com/Netflix/rend-lmdb.

Usage in Production

For the caches that Moneta serves best, there are a couple of different classes of clients that a single server serves. One class is online traffic in the hot path, requesting personalization data for a visiting member. The other is traffic from the offline and nearline systems that produce personalization data. These typically run in large batches overnight and continually write for hours on end.

The modularity allows our default implementation to optimize for our nightly batch compute by inserting data into L2 directly and smartly replacing hot data in L1, rather than letting those writes blow away our L1 cache during the nightly precompute. The replicated data coming from other regions can also be inserted directly into L2, since data replicated from another region is unlikely to be “hot” in its destination region. The diagram below shows the multiple open ports in one Rend process that both connect to the backing stores. With the modularity of Rend, it was easy to introduce another server on a different port for batch and replication traffic with only a couple more lines of code.

Rend Ports.png


Rend itself is very high throughput. While testing Rend separately, we consistently hit network bandwidth or packet processing limits before maxing CPU. A single server, for requests that do not need to hit the backing store, has been driven to 2.86 million requests per second. This is a raw, but unrealistic, number. With Memcached as the only backing storage, Rend can sustain 225k inserts per second and 200k reads per second simultaneously on the largest instance we tested. An i2.xlarge instance configured to use both L1 and L2 (memory and disk) and data chunking, which is used as a standard instance for our production clusters, can perform 22k inserts per second (with sets only), 21k reads per second (with gets only), and roughly 10k sets and 10k gets per second if both are done simultaneously. These are lower bounds for our production traffic, because the test load consisted of random keys thus affording no data locality benefits during access. Real traffic will hit the L1 cache much more frequently than random keys do.

As a server-side application, Rend unlocks all kinds of future possibilities for intelligence on the EVCache server. Also, the underlying storage is completely disconnected from the protocol used to communicate. Depending on Netflix needs, we could move L2 storage off-box, replace the L1 Memcached with another store, or change the server logic to add global locking or consistency. These aren't planned projects, but they are possible now that we have custom code running on the server.


Mnemonic is our RocksDB-based L2 solution. It stores data on disk. The protocol parsing, connection management, and concurrency control of Mnemonic are all managed by the same libraries that power Rend. Mnemonic is another backend that is plugged into a Rend server. The native libraries in the Mnemonic project expose a custom C API that is consumed by a Rend handler.
Mnemonic stack.png

The interesting parts of Mnemonic are in the C++ core layer that wraps RocksDB. Mnemonic handles the Memcached-style requests, implementing each of the needed operations to conform to Memcached behavior, including TTL support. It includes one more important feature: it shards requests across multiple RocksDB databases on a local system to reduce the work for each individual instance of RocksDB. The reasons why will be explored in the next section.


After looking at some options for efficiently accessing SSDs, we picked RocksDB, an embedded key-value store which uses a Log Structured Merge Tree data structure design. Write operations are first inserted into a in-memory data structure (a memtable) that is flushed to disk when full. When flushed to disk, the memtable becomes an immutable SST file. This makes most writes sequential to the SSD, which reduces the amount of internal garbage collection that the SSD must perform and thus improve latency on long running instances while also reducing wear.

One type of work that is done in the background by each separate instance of RocksDB includes compaction. We initially used the Level style compaction configuration, which was the main reason to shard the requests across multiple databases. However, while we were evaluating this compaction configuration with production data and production-like traffic, we found that compaction was causing a great deal of extra read/write traffic to the SSD, increasing latencies past what we found acceptable. The SSD read traffic surpassed 200MB/sec at times. Our evaluation traffic included a prolonged period where the number of write operations was high, simulating daily batch compute processes. During that period, RocksDB was constantly moving new L0 records into the higher levels, causing a very high write amplification.

To avoid this overhead, we switched to FIFO style compaction. In this configuration, no real compaction operation is done. Old SST files are deleted based on the maximum size of the database. Records stay on disk in level 0, so the records are only ordered by time across the multiple SST files. The downside of this configuration is that a read operation must check each SST file in reverse chronological order before a key is determined to be missing. This check does not usually require a disk read, as the RocksDB bloom filters prevent a high percentage of the queries from requiring a disk access on each SST. However, the number of SST files causes the overall effectiveness of the set of bloom filters to be less than the normal Level style compaction. The initial sharding of the incoming read and write requests across the multiple RocksDB instances helps lessen the negative impact of scanning so many files.


Re-running our evaluation test again with the final compaction configuration, we are able to achieve a 99th percentile latency of ~9ms for read queries during our precompute load. After the precompute load completed, the 99th percentile read latency reduced to ~600μs on the same level of read traffic. All of these tests were run without Memcached and without RocksDB block caching.

To allow this solution to work with more varied uses, we will need to reduce the number of SST files that needs to be checked per query. We are exploring options like RocksDB’s Universal style compaction or our own custom compaction where we could better control the compaction rate thereby lowering the amount of data transferred to and from the SSD.


We are rolling out our solution in phases in production. Rend is currently in production serving some of our most important personalization data sets. Early numbers show faster operations with increased reliability, as we are less prone to temporary network problems. We are in the process of deploying the Mnemonic (L2) backend to our early adopters. While we're still in the process of tuning the system, the results look promising with the potential for substantial cost savings while still allowing the ease of use and speed that EVCache has always afforded its users.

It has been quite a journey to production deployment, and there's still much to do: deploy widely, monitor, optimize, rinse and repeat. The new architecture for EVCache Server is allowing us to continue to innovate in ways that matter. If you want to help solve this or similar big problems in cloud architecture, join us.