Friday, December 12, 2014

Introducing Atlas: Netflix's Primary Telemetry Platform

Various previous Tech Blog posts have referred to our centralized monitoring system, and we've presented at least one talk about it previously. Today, we want to both discuss the platform and ecosystem we built for time-series telemetry and its capabilities and announce the open-sourcing of its underlying foundation.

How We Got Here

While working in the datacenter, telemetry was split between an IT-provisioned commercial product and a tool a Netflix engineer wrote that allowed engineers to send in arbitrary time-series data and then query that data. This tool's flexibility was very attractive to engineers, so it became the primary system of record for time series data. Sadly, even in the datacenter we found that we had significant problems scaling it to about two million distinct time series. Our global expansion, increase in platforms and customers and desire to improve our production systems' visibility required us to scale much higher, by an order of magnitude (to 20M metrics) or more. In 2012, we started building Atlas, our next-generation monitoring platform. In late 2012, it started being phased into production, with production deployment completed in early 2013. 

Our Goals

Common API

Our previous home-grown tool, internally known as Epic, did a number of things really well that we didn't want to lose when transitioning. In particular:
  • Normalization and consolidation 
  • Flexible legends that scale independently of the chart 
  • Math, especially handling of NaN values representing no data 
  • Holt-Winters used for alerting 
  • Visualization options 
  • Deep linking 
Many of these are capabilities that are provided by the RRDTool library Epic was using, but most alternatives we looked at fell short in these categories. In addition, we have uses for other 3rd party services like CloudWatch and it is desirable to have common query capability for that data.


As noted above, metrics volume was growing and we needed a system that could keep up. For a long time our biggest concern was write volume, however, we also wanted to scale in terms of the amount of data we could read or aggregate as part of a graph request. Since then, we've scaled up Atlas significantly: This graph is substantially smoothed; during failover exercises when we send all North American traffic to one AWS region, we can easily see Atlas sustain greater than 1.2 billion time series (corresponding to publishing of billions of data points per minute)


Most time-series systems rely on metric identity which is, essentially, a string. As users of the system want the metric name to be more meaningful, they make it longer, more complicated, and include more data. For example, one metric name we had in the old system is:
This metric name encodes in itself some information:
Key Value
nccprt (Request Type)authorization
devtypid (Device Type ID)101
clver (Client Version)PHL_0AB
uiver (UI Version)UI_169_mid
geo (Geographic location)US

This created two types of problems:
  • Since it was all mangled into a name with different conventions by team, users would have to resort to complex regular expressions to try and slice/dice the data based on the dimensions.
  • It provided us with still-insufficient dimensionality. As we were running into metric length limits (255 characters in the old system), we could not, for example, differentiate metrics coming from one AWS availability zone vs another.
We wanted the ability to support many, many more dimensions, in a way that was predictable, easy to discovery for users, and possible to scale for us, the engineering team responsible for the care and feeding of this system.

In Atlas, a metric's identity is an arbitrary unique set of key-value pairs; a few of the keys are set automatically by the telemetry client library (e.g. server name, AWS zone, ASG, cluster, application, region, etc), with significant flexibility for the user to specify the keys that make sense for their use case, and essentially-unlimited unique values supported for any key.

What We Built

The Query Layer

In order to get a common API, have flexibility for backend implementations, and provide merged views across backends we built a query layer that can be hierarchically composed. The diagram below shows the main Netflix setup: We have isolated regional deployments in each operational region as well as a global deployment that can combine the results from multiple regions. The query and aggregation operations can be performed on the fan-out so most of the big summarization operations will distribute the computation across the tree and typically to an optimized storage layer at some point.

Allowing the query and rendering layer to work on multiple backends also makes it easier for us to consider transitioning to other backends in the future such as OpenTSDB or InfluxDB. Compatibility was one of the biggest hurdles we had in switching to Atlas, and this approach allows us to abstract parts of the system and avoid further transitioning pain (as experienced by our users).

The Stack Language

One of the key requirements for us was to be able to have deep links to a particular chart and be able to reliably pass around or embed these images via email, wikis, html pages, etc. In addition, the user who receives it should be able to tweak the result. Atlas uses a simple stack language that has a minimal punctuation and allows arbitrarily complex graph expressions to be encoded in a URL friendly way. This means that all images can be accessed using a GET request. It is also simple to parse and interpret so it can be easily consumed from a variety of tools. The core features:
  • Embedding and linking using GET request
  • URL friendly stack language
    • Few special symbols (comma, colon, parentheses)
    • Easy to extend
  • Basic operations
    • Query: and, or, equal, regex, has key, not
    • Aggregation: sum, count, min, max, group by
    • Consolidation: aggregate across time
    • Math: add, subtract, multiply, etc
    • Boolean: and, or, lt, gt, etc
    • Graph settings: legends, area, transparency

In-Memory Storage

Finding the right storage solution for Atlas -- and attempting to identify the best balance of cost with the necessary capabilities for speed and scale -- has in some respects been our biggest challenge. We tried many backends and ended up moving more and more to a model where pretty much all data available for querying is stored in memory either in or off the JVM heap.

Engineering for Performance

The primary goal for Atlas is to support queries over dimensional time series data so we can slice and dice to drill down into problems. This means we frequently have a need to perform a large aggregations that involve many data points even though the final result set might be small.

As an example consider a simple graph showing the number of requests per second hitting a service for the last 3 hours. Assuming minute resolution that is 180 datapoints for the final output. On a typical service we would get one time series per node showing the number of requests so if we have 100 nodes the intermediate result set is around 18k datapoints. For one service users went hog-wild with dimensions breaking down requests by device (~1000s) and country (~50) leading to about 50k time series per node. If we still assume 100 nodes that is about 900M datapoints for the same 3h line.

Though obviously we have to be mindful about the explosion of dimensions, we also want that where possible to be a decision based on cost and business value rather than a technical limitation.

We routinely see Atlas fetch and graph many billions of datapoints per second.

Engineering for Resilience

What has to be working in order for the monitoring system to work? If it falls over what is involved in getting it back up? Our focus is primarily operational insight so the top priority is to be able to determine what is going on right now. This leads to the following rules of thumb:
  • Data becomes exponentially less important as it gets older
  • Restoring service is more important than preventing data loss
  • Try to degrade gracefully
As a result the internal Atlas deployment breaks up the data into multiple windows based on the window of data they contain: With this setup we can show the last 6h of data as long as clients can successfully publish. The data is all in memory sharded across machines in the 6h clusters. Because the data and index are all in memory on the local node, each instance is self contained and doesn't need any external service to function. We typically run multiple mirrors of the 6h cluster so data is replicated and we can handle loss of an instance. In AWS we run each mirror in a different zone so that a zone failure will only impact a single mirror.

The publish cluster needs to know all instances in the mirror cluster and takes care of splitting the traffic up so it goes to the correct shard. The set of mirror instances and shards are assigned based on slots from the Edda autoScalingGroups API. Since the set of instances for the mirrors changes rarely, the publish instances can cache the Edda response and still successfully publish most data if Edda fails. If an instance is replaced and we can't update data we would have partial loss for a single shard if the same shard was missing in another mirror.

Historical data can also fail in which case graphs would not be able to show data for some older windows. This doesn't have to be fully continuous, for example a common use case for us is to look at week-over-week (WoW) charts even though the span of the chart might only be a few hours. If the < 4d cluster fails but the < 16d cluster is functioning we could still serve that graph even though we couldn't show a continuous graph for the full week. A graph would still be shown but would be missing data in the middle.

After data is written to the mirrors, they will flush to a persistence layer that is responsible for writing the data to the long term storage in S3. The data at full resolution is kept in S3 and we use Hadoop (Elastic MapReduce) for processing the data to perform corrective merging of data from the mirrors, generate reports, and perform rollups into a form that can be loaded into the historical clusters.

As always, we're aided by the fact that engineers at Netflix are responsible for the operational care and feeding of their systems, eliminating potential impedance mismatch between engineering concerns and operational concerns.

Engineering for Cost

Keeping all data in memory is expensive -- in particular with the large growth rate of data. The combination of dimensionality and time based partitioning used for resilience give us a way to help manage costs.
The first way is in controlling the number of replicas. In most cases we are using replicas for redundancy rather than for additional query capacity. For historical data that can be reloaded from stable storage we typically run only one replica as the duration of partial downtime was not deemed to be worth the cost for an additional replica.

The second way is as part of the Hadoop processing we can compute rollups so that we have a much smaller data volume to load in historical clusters. At Netflix the typical policy is roughly:
< 6 hoursKeep all data received
< 4 daysKeep most data; we do early rollup by dropping the node dimension on some business metrics
< 16 daysRollup by dropping the node dimension on all metrics
olderExplicit whitelist; we generally recommend BI systems for longer-term use-cases

Using these policies we get greatly reduced index sizes for the number of distinct time series despite a significant amount of churn. With auto-scaling and red/black deployment models the set of instances change frequently so typically the intersection of distinct time series from one day to the next is less than 50%. Rollups target the dimensions which lead to that churn giving us much smaller index sizes. Also, in many cases dimensions like node name that lead to this increase become less relevant after the node goes away. Deep-dive or investigative use-cases can still access the data using Hadoop if needed.

The Ecosystem

We've built an extensive ecosystem of products around Atlas; virtually every operational visibility system or dashboard used within Netflix utilizes data from it. In addition to the components described above, we've also built:
  • User interfaces
    • Main UI for browsing data and constructing queries.
    • Dashboards
    • Alerts
  • Platform
    • Inline aggregation of reported data before storage layer
    • Storage options using off-heap memory and lucene
    • Percentile backend
    • Publish and persistence applications
    • EMR processing for computing rollups and analysis
    • Poller for SNMP, healthchecks, etc
  • Client
    • Supports integrating Servo with Atlas
    • Local rollups and alerting
  • Real-Time Analytics
    • Metrics volume report
    • Automated Canary Analysis
    • Outlier and anomaly detection
    • Automated server culling based on outlier characteristics
Today, we are open-sourcing the query layer and some of the in-heap memory structure capabilities. Our intention is to open-source more of the ecosystem as soon as feasibly possible, focusing on the components most likely to be relevant to people outside Netflix first.

You can now find Atlas on Github; you can also find a more detailed overview there.

Brian Harrington and Roy Rapoport

Monday, December 8, 2014

Version 7: The Evolution of JavaScript

By Jafar Husain

We held our last Netflix JavaScript Talks event of 2014 on Dec. 2nd, where we discussed some of the interesting features already available for use in JavaScript 6, and where things are headed with JavaScript 7. As a member of the JavaScript TC39 ECMAScript committee, Netflix has been working with other committee members to explore powerful new features like Object.observe, async functions and async generators. Many of these features have already started appearing in modern browsers and it’s not too early to start playing with them.

The video from the event is now available to watch below or on our Netflix UI Engineering channel on YouTube along with other talks from past events.

It’s been a lot of fun hosting our Netflix JavaScript Talks series this past year and we have several exciting talks planned for 2015. If you’re interested in being notified of our future events, just sign up on our notification list. We’ll also be sharing some interesting work we’re doing with React at the React.js Conf at the end of January.

Wednesday, November 19, 2014

Node.js in Flames

We’ve been busy building our next-generation web application using Node.js. You can learn more about our approach from the presentation we delivered at a few months ago. Today, I want to share some recent learnings from performance tuning this new application stack.

We were first clued in to a possible issue when we noticed that request latencies to our Node.js application would increase progressively with time. The app was also burning CPU more than expected, and closely correlated to the higher latency. While using rolling reboots as a temporary workaround, we raced to find the root cause using new performance analysis tools and techniques in our Linux EC2 environment.

Flames Rising

We noticed that request latencies to our Node.js application would increase progressively with time. Specifically, some of our endpoints’ latencies would start at 1ms and increase by 10ms every hour. We also saw a correlated increase in CPU usage.


This graph plots request latency in ms for each region against time. Each color corresponds to a different AWS AZ. You can see latencies steadily increase by 10 ms an hour and peak at around 60 ms before the instances are rebooted.

Dousing the Fire

Initially we hypothesized that there might be something faulty, such as a memory leak in our own request handlers that was causing the rising latencies. We tested this assertion by load-testing the app in isolation, adding metrics that measured both the latency of only our request handlers and the total latency of a request, as well as increasing the Node.js heap size to 32Gb.

We saw that our request handler’s latencies stayed constant across the lifetime of the process at 1 ms. We also saw that the process’s heap size stayed fairly constant at around 1.2 Gb. However, overall request latencies and CPU usage continued to rise. This absolved our own handlers of blame, and pointed to problems deeper in the stack.

Something was taking an additional 60 ms to service the request. What we needed was a way to profile the application’s CPU usage and visualize where we’re spending most of our time on CPU. Enter CPU flame graphs and Linux Perf Events to the rescue.

For those unfamiliar with flame graphs, it’s best to read Brendan Gregg’s excellent article explaining what they are -- but here’s a quick summary (straight from the article).
  • Each box represents a function in the stack (a "stack frame").
  • The y-axis shows stack depth (number of frames on the stack). The top box shows the function that was on-CPU. Everything beneath that is ancestry. The function beneath a function is its parent, just like the stack traces shown earlier.
  • The x-axis spans the sample population. It does not show the passing of time from left to right, as most graphs do. The left to right ordering has no meaning (it's sorted alphabetically).
  • The width of the box shows the total time it was on-CPU or part of an ancestry that was on-CPU (based on sample count). Wider box functions may be slower than narrow box functions, or, they may simply be called more often. The call count is not shown (or known via sampling).
  • The sample count can exceed elapsed time if multiple threads were running and sampled concurrently.
  • The colors aren't significant, and are picked at random to be warm colors. It's called "flame graph" as it's showing what is hot on-CPU. And, it's interactive: mouse over the SVGs to reveal details.
Previously Node.js flame graphs had only been used on systems with DTrace, using Dave Pacheco’s Node.js jstack() support. However, the Google v8 team has more recently added perf_events support to v8, which allows similar stack profiling of JavaScript symbols on Linux. Brendan has written instructions for how to use this new support, which arrived in Node.js version 0.11.13, to create Node.js flame graphs on Linux.

Here’s the original SVG of the flame graph. Immediately, we see incredibly high stacks in the application (y-axis). We also see we’re spending quite a lot of time in those stacks (x-axis). On closer inspection, it seems the stack frames are full of references to Express.js’s router.handle and functions. The Express.js source code reveals a couple of interesting tidbits1.
  • Route handlers for all endpoints are stored in one global array.
  • Express.js recursively iterates through and invokes all handlers until it finds the right route handler.
A global array is not the ideal data structure for this use case. It’s unclear why Express.js chose not to use a constant time data structure like a map to store its handlers. Each request requires an expensive O(n) look up in the route array in order to find its route handler. Compounding matters, the array is traversed recursively. This explains why we saw such tall stacks in the flame graphs. Interestingly, Express.js even allows you to set many identical route handlers for a route. You can unwittingly set a request chain like so.
[a, b, c, c, c, c, d, e, f, g, h]
Requests for route c would terminate at the first occurrence of the c handler (position 2 in the array). However, requests for d would only terminate at position 6 in the array, having needless spent time spinning through a, b and multiple instances of c. We verified this by running the following vanilla express app.
var express = require('express');
var app = express();
app.get('/foo', function (req, res) {
// add a second foo route handler
app.get('/foo', function (req, res) {
console.log('stack', app._router.stack);
Running this Express.js app returns these route handlers.
stack [ { keys: [], regexp: /^\/?(?=/|$)/i, handle: [Function: query] },
 { keys: [],
   regexp: /^\/?(?=/|$)/i,
   handle: [Function: expressInit] },
 { keys: [],
   regexp: /^\/foo\/?$/i,
   handle: [Function],
   route: { path: '/foo', stack: [Object], methods: [Object] } },
 { keys: [],
   regexp: /^\/foo\/?$/i,
   handle: [Function],
   route: { path: '/foo', stack: [Object], methods: [Object] } } ]
Notice there are two identical route handlers for /foo. It would have been nice for Express.js to throw an error whenever there’s more than one route handler chain for a route.

At this point the leading hypothesis was that the handler array was increasing in size with time, thus leading to the increase of latencies as each handler is invoked. Most likely we were leaking handlers somewhere in our code, possibly due to the duplicate handler issue. We added additional logging which periodically dumps out the route handler array, and noticed the array was growing by 10 elements every hour. These handlers happened to be identical to each other, mirroring the example from above.
{ handle: [Function: serveStatic],
   name: 'serveStatic',
   params: undefined,
   path: undefined,
   keys: [],
   regexp: { /^\/?(?=\/|$)/i fast_slash: true },
   route: undefined },
 { handle: [Function: serveStatic],
   name: 'serveStatic',
   params: undefined,
   path: undefined,
   keys: [],
   regexp: { /^\/?(?=\/|$)/i fast_slash: true },
   route: undefined },
 { handle: [Function: serveStatic],
   name: 'serveStatic',
   params: undefined,
   path: undefined,
   keys: [],
   regexp: { /^\/?(?=\/|$)/i fast_slash: true },
   route: undefined },
Something was adding the same Express.js provided static route handler 10 times an hour. Further benchmarking revealed merely iterating through each of these handler instances cost about 1 ms of CPU time. This correlates to the latency problems we’ve seen, where our response latencies increase by 10 ms every hour.

This turned out be caused by a periodic (10/hour) function in our code. The main purpose of this was to refresh our route handlers from an external source. This was implemented by deleting old handlers and adding new ones to the array. Unfortunately, it was also inadvertently adding a static route handler with the same path each time it ran. Since Express.js allows for multiple route handlers given identical paths, these duplicate handlers were all added to the array. Making matter worse, they were added before the rest of the API handlers, which meant they all had to be invoked before we can service any requests to our service.

This fully explains why our request latencies were increasing by 10ms every hour. Indeed, when we fixed our code so that it stopped adding duplicate route handlers, our latency and CPU usage increases went away.

Here we see our latencies drop down to 1 ms and remain there after we deployed our fix.

When the Smoke Cleared

What did we learn from this harrowing experience? First, we need to fully understand our dependencies before putting them into production. We made incorrect assumptions about the Express.js API without digging further into its code base. As a result, our misuse of the Express.js API was the ultimate root cause of our performance issue.

Second, given a performance problem, observability is of the utmost importance. Flame graphs gave us tremendous insight into where our app was spending most of its time on CPU. I can’t imagine how we would have solved this problem without being able to sample Node.js stacks and visualize them with flame graphs.

In our bid to improve observability even further, we are migrating to Restify, which will give us much better insights, visibility, and control of our applications2. This is beyond the scope of this article, so look out for future articles on how we’re leveraging Node.js at Netflix.

Interested in helping us solve problems like this? The Website UI team is hiring engineers to work on our Node.js stack.

Author: Yunong Xiao @yunongx

1 Specifically, this snippet of code. Notice next() is invoked recursively to iterate through the global route handler array named stack.
2 Restify provides many mechanisms to get visibility into your application, from DTrace support, to integration with the node-bunyan logging framework.

Wednesday, November 12, 2014

ZeroToDocker: An easy way to evaluate NetflixOSS through runtime packaging


The NetflixOSS platform and related ecosystem services are extensive.  While we make every attempt to document each project, being able to quickly evaluate NetflixOSS is a large challenge due to the breadth for most users.  This becomes a very large challenge to anyone trying to understand individual parts of the platform.

Another part of the challenge relates to how NetflixOSS was designed for scale. Most services are intended to be setup in a multi-node, auto-recoverable cluster.  While this is great once you are ready for production, it is prohibitively complex for new users to try out a smaller scale environment of NetflixOSS.

A final part of the challenge is that in order to keep the platform a collection of services and libraries that could be used wherever they make sense by users, the runtime artifacts are distributed in ways that can be later assembled by users in different ways.  This means many of the Java libraries are in Maven Central, some of the complete services are assembled as distribution zips and wars in our CI environment on CloudBees, and others through file distribution services.  None of these distributions gives you a single command line that is guaranteed to work across the many places people might want to run the NetflixOSS technologies.

A simple solution:

Recently it has become popular to be able to quickly demonstrate technology through the use of Docker containers.  If you search GitHub there are hundreds of projects that include Dockerfiles, which are the image build description file for Docker containers.  By including such Dockerfiles, the developer is showing you exactly how to not only build the code on GitHub, but also assemble and run the compiled artifacts as a full system.

ZeroToDocker is a project that solves the above problems.  Specifically, it allows anyone with a Docker host (on their laptop, on a VM in the cloud, etc.) to, with a single command, run a single node of any NetflixOSS technology.  If you have the network bandwidth to download 500-700M images, you can now run each part of the NetflixOSS platform with a single command.  For example, here is the command to run a single node of Zookeeper managed through NetflixOSS Exhibitor:
  • docker run -d --name exhibitor netflixoss/exhibitor:1.5.2
This command tells Docker to pull the image file “exhibitor” of version “1.5.2” from the official NetflixOSS account and run it with as a daemon with a container name of “exhibitor”.  This will start up just as quick as the Java processes would on a base OS due to the process model of Docker.  It will not start a separate OS instance.  It will also “containerize” the environment meaning that the exhibitor and zookeeper process will be isolated from other containers and processes running on the same Docker host.  Finally, if you examine the Dockerfile that builds this image you will see that it exposes the zookeeper and exhibitor ports of 2181, 2888, 3888, and 8080 to the network.  This means that if you can access these ports via standard Docker networking.  In fact you can load up the following URL:
  • http://EXHIBITORIPADDRESS:8080/exhibitor/v1/ui/index.html
All of this can be done in seconds beyond the initial image download time with very little starting knowledge of NetflixOSS.  We expect this should reduce the learning curve of starting NetflixOSS by at least an order of magnitude.

Images so far:

We decided to focus on the platform foundation of NetflixOSS, but we are already in discussions with our chaos testing and big data teams to create docker images of other aspects of the NetflixOSS ecosystem.  For now, we have released:

  • Asgard
  • Eureka
  • A Karyon based Hello World Example
  • A Zuul Proxy used to proxy to the Karyon service
  • Exhibitor managed Zookeeper
  • Security Monkey

Can you trust these images:

Some of our great OSS community members have already created Docker images for aspects of NetflixOSS.  While we don’t want to take anything away from these efforts, we wanted to take them a step further.  Recently, Docker announced Docker Hub.  You can think of Docker Hub as a ready to run image repository similar to how you think of GitHub for your code, or CloudBees for you deployment artifacts.  Docker Hub creates an open community around images.

Additionally Docker Hub has the concept of a trusted build.  What this means is anyone can point their Docker Hub account at GitHub and tell Docker Hub to build, on their behalf their, trusted images.  After these builds are done, the images are exposed to the standard cloud registry from which anyone can pull and run.  By the fact that the images are built by Docker in a trusted isolated environment combined with the fact that any user can trace the image build back to exact Dockerfiles and source on GitHub and Maven Central, you can see exactly where all the running code originated and make stronger decisions of trust.  With the exception of Oracle Java, Apache Tomcat, and Apache Zookeeper all of the code on images originates from trusted NetfixOSS builds.  Even Java (cloned from Feng Honglin’s Java 7 Dockerfile), Tomcat and Zookeeper are easy to trust as you can read the Dockerfile to trace exactly their origination.

Can you learn from these images:

If you go to the Docker Hub image you are now running, you can navigate back to the GitHub project that hosts the Dockerfiles.  Inside of the Dockerfile you will find the exact commands required to assemble this running image.  Also, files are included with the exact properties needed to have a functioning single node service.

This means you can get up and running very quickly on a simple NetflixOSS technology and then learn how those images work and then progress to your own production deployment using what you learned was under the covers of the running single instance.  While we tried to document this on the GitHub wiki in the past for each project, it is just so much easier to document through a running technology than document it fully in prose on a Wiki.

A final note on accelerated learning vs. production usage:

As noted on the ZeroToDocker Wiki, we are not recommending the use of these Docker images in production.  We designed these images to be as small as possible in scope so you can get the minimum function running as quickly as possible.  That means they do not consider production issues that matter like multi-host networking, security hardening, operational visibility, storage management, and high availability with automatic recovery.

We also want to make it clear that we do not run these images in production.  We continue to run almost all of our systems on the EC2 virtual machine based IaaS.  We do this as the EC2 environment along with Netflix additions provides all of the aforementioned production requirements.  We are starting to experiment with virtual machines running Docker hosting multiple containers in EC2, but those experiments are limited to classes of workloads that get unique value out of a container based deployment model while being managed globally by EC2 IaaS.

Based on the fact that these images are not production ready, we have decided to keep ZeroToDocker on our Netflix-Skunkworks account on GitHub.  However, we believe the value in helping people get up and running on NetflixOSS is valuable and wanted to make the images available.


We started small.  Over time, you can expect more images that represent a larger slice of the NetflixOSS platform and ecosystem.  We also may expand the complexity showing how to set up clusters or more tightly secure the images.  We’ve built specific versions of each service, but in the future will need to create a continuous integration system for the building of our images.

If you enjoy helping with open source and want to build future technologies like that we’ve just demonstrated, check out our some of our open jobs.  We are always looking for excellent engineers to extend our NetflixOSS platform and ecosystem.

Tuesday, November 11, 2014

Genie 2.0: Second Wish Granted!

By Tom Gianos and Amit Sharma @ Big Data Platform Team

A little over a year ago we announced Genie, a distributed job and resource management tool. Since then, Genie has operated in production at Netflix, servicing tens of thousands of ETL and analytics jobs daily. There were two main goals in the original design of Genie:

  • To abstract execution environment from the Hadoop, Hive and Pig job submissions.
  • To enable horizontal scaling of client resources based on demand.

Since the development of Genie 1.0, much has changed in both the big data ecosystem and here at Netflix. Hadoop 2 was officially released, enabling clusters to use execution engines beyond traditional MapReduce. Newer tools, such as interactive query engines like Presto and Spark, are quickly gaining in popularity. Other emerging technologies like Mesos and Docker are changing how applications are managed and deployed. Some changes to our big data platform in the last year include:

  • Upgrading our Hadoop clusters to Hadoop 2.
  • Moving to Parquet as the primary storage format for our data warehouse.
  • Integrating Presto into our big data platform.
  • Developing, deploying and open sourcing Inviso, to help users and admins gain insights into job and cluster performance.

Amidst all this change, we reevaluated Genie to determine what was needed to meet our evolving needs. Genie 2.0 is the result of this work and it provides a more flexible, extensible and feature rich distributed configuration and job execution engine.

Reevaluating Genie 1.0

Genie 1.0 accomplished its original goals well, but the narrow scope of those goals lead to limitations including:

  • It only worked with Hadoop 1.
  • It had a fixed data model designed for a very specific use case. Code changes were required to accomplish minor changes in behavior.
    • As an example, the s3CoreSiteXml, s3HdfsSiteXml fields of the ClusterConfigElement entity stored the paths to the core-site and hdfs-site XML files of a Hadoop cluster rather than storing them as a generic collection field.
  • The execution environment selection criteria was very limited. The only way to select a cluster was by setting one of three types of schedules: SLA, ad hoc or bonus.

Genie 1.0 could not continue to meet our needs as the number of desired use cases increased and we continued to adopt new technologies. Therefore, we decided to take this opportunity to redesign Genie.

Designing and Developing Genie 2.0

The goals for Genie 2.0 were relatively straightforward:

  • Develop a generic data model, which would let jobs run on any multi-tenant distributed processing cluster.
  • Implement a flexible cluster and command selection algorithm for running a job.
  • Provide richer API support.
  • Implement a more flexible, extensible and robust codebase.

Each of these goals are explored below.

The Data Model

The new data model consists of the following entities:

Cluster: It stores all the details of an execution cluster including connection information, properties, etc. Some cluster examples are Hadoop 2, Spark, Presto, etc. Every cluster can be linked to a set of commands that it can run.

Command: It encapsulates the configuration details of an executable that is invoked by Genie to submit jobs to the clusters. This includes the path to the executable, the environment variables, configuration files, etc. Some examples are Hive, Pig, Presto and Sqoop. If the executable is already installed on the Genie node, configuring a command is all that is required. If the executable isn’t installed, a command can be linked to an application in order to install it at runtime.

Application: It provides all the components required to install a command executable on Genie instances at runtime. This includes the location of the jars and binaries, additional configuration files, an environment setup file, etc. Internally we have our Presto client binary configured as an application. A more thorough explanation is provided in the “Our Current Deployment” section below.

Job: It contains all the details of a job request and execution including any command line arguments. Based on the request parameters, a cluster and command combination is selected for execution. Job requests can also supply necessary files to Genie either as attachments or via the file dependencies field, if they already exist in an accessible file system. As a job executes, its details are recorded in the job record.

All the above entities support a set of tags that can provide additional metadata. The tags are used for cluster and command resolution as described in the next section.

Job Execution Environment Selection

Genie now supports a highly flexible method to select the cluster to run a job on and the command to execute, collectively known as the execution environment. A job request specifies two sets of tags to Genie:

  • Command Tags: A set of tags that maps to zero or more commands.
  • Cluster Tags: A priority ordered list of sets of tags that maps to zero or more clusters.

Genie iterates through the cluster tags list, and attempts to use each set of tags in combination with the command tags to find a viable execution environment. The ordered list allows clients to specify fallback options for cluster selection if a given cluster is not available.

At Netflix, nightly ETL jobs leverage this feature. Two sets of cluster tags are specified for these jobs. The first set matches our bonus clusters, which are spun up every night to help with our ETL load. These clusters use some of our excess, pre-reserved capacity available during lower traffic hours for Netflix. The other set of tags match the production cluster and act as the fallback option. If the bonus clusters are out of service when the ETL jobs are submitted, the jobs are routed to the main production cluster by Genie.

Richer API Support

Genie 1.0 exposes a limited set of REST APIs. Any updates to the contents of the resources had to be done by sending requests, containing the entire object, to the Genie service. In contrast, Genie 2.0 supports fine grained APIs, including the ability to directly manipulate the collections that are part of the entities. For a complete list of available APIs, please see the Genie API documentation.

Code Enhancements

An examination of the Genie 1.0 codebase revealed aspects that needed to be modified in order to provide the flexibility and standards compliance desired going forward.
Some of the goals to improve the Genie codebase were to:

  • Decouple the layers of the application to follow a more traditional three tiered model.
  • Remove unnecessary boilerplate code.
  • Standardize and extend REST APIs.
  • Improve deployment flexibility.
  • Improve test coverage.

Tools such as Spring, JPA 2.0, Jersey, JUnit, Mockito, Swagger, etc. were leveraged to solve most of the known issues and better position the software to handle new ones in the future.

Genie 2.0 was completely rewritten to take advantage of these frameworks and tools. Spring features such as dependency injection, JPA support, transactions, profiles and more are utilized to produce a more dynamic and robust architecture. In particular, dependency injection for various components allows Genie to be more easily modified and deployed both inside and outside Netflix. Swagger based annotations on top of the REST APIs provide not only improved documentation, but also a mechanism for generating clients in various languages. We used Swagger codegen to generate the core of our Python client, which has been uploaded to Pypi. Almost six hundred tests have also been added to the Genie code base, making the code more reliable and maintainable.

Our Current Deployment
Genie 2.0 has been deployed at Netflix for a couple of months, and all Genie 1.0 jobs have been migrated over. Genie currently provides access to all the Hadoop and Presto clusters, in our production, test and ad hoc environments. In production, Genie currently autoscales between twelve to twenty i2.2xlarge AWS instances, allowing several hundred jobs to run at any given time. This provides horizontal scaling of clients for our clusters with no additional configuration or overhead.

Presto and Sqoop commands are each configured with a corresponding application that points to locations in S3, where all the jars and binaries necessary to execute these commands are located. Every time one of these commands run, the necessary files are downloaded and installed. This allows us to continuously deploy updates to our Presto and Sqoop clients without redeploying Genie. We’re planning to move our other commands, like Pig and Hive, to this pattern as well.

At Netflix launching a new cluster is done via a configuration based launch script. After a cluster is up in AWS, the cluster configuration is registered with Genie. Commands are then linked to the cluster based on predefined configurations. After it is properly configured in Genie, the cluster will be marked as “available”. When we need to take down a cluster, it is marked as “out of service” in Genie so the cluster can no longer accept new jobs. Once all running jobs are complete, the cluster is marked as “terminated” in Genie and instances are shut down in AWS.

With Genie 2.0 going live in our environment, it has allowed us to bring together all the new tools and services we’ve added to the big data platform over the last year. We have already seen many benefits from Genie 2.0. We were able to add Presto support to Genie in a few days and Sqoop in less than an hour. Theses changes would have required code modification and redeployment with Genie 1.0, but were merely configuration changes in Genie 2.0.
Below is our new big data platform architecture with Genie at its core.


Future Work

There is always more to be done. Some enhancements that can be made going forward

  • Improving the job execution and monitoring components for better fault tolerance, efficiency on hosts and more granular status feedback.
  • Abstracting Genie’s use of Netflix OSS components to allow adopters to implement their own functionality for certain components to ease adoption.
  • Improving the admin UI to expose more data to users. e.g. Show all clusters a given command is registered with.

We’re always looking for feedback and input from the community on how to improve and evolve Genie. If you have questions or want to share your experience with running Genie in your environment, you can join our discussion forum. If you’re interested in helping out, you can visit our Github page to fork the project or request features.