Friday, December 12, 2014

Introducing Atlas: Netflix's Primary Telemetry Platform

Various previous Tech Blog posts have referred to our centralized monitoring system, and we've presented at least one talk about it previously. Today, we want to both discuss the platform and ecosystem we built for time-series telemetry and its capabilities and announce the open-sourcing of its underlying foundation.

How We Got Here

While working in the datacenter, telemetry was split between an IT-provisioned commercial product and a tool a Netflix engineer wrote that allowed engineers to send in arbitrary time-series data and then query that data. This tool's flexibility was very attractive to engineers, so it became the primary system of record for time series data. Sadly, even in the datacenter we found that we had significant problems scaling it to about two million distinct time series. Our global expansion, increase in platforms and customers and desire to improve our production systems' visibility required us to scale much higher, by an order of magnitude (to 20M metrics) or more. In 2012, we started building Atlas, our next-generation monitoring platform. In late 2012, it started being phased into production, with production deployment completed in early 2013. 

Our Goals

Common API

Our previous home-grown tool, internally known as Epic, did a number of things really well that we didn't want to lose when transitioning. In particular:
  • Normalization and consolidation 
  • Flexible legends that scale independently of the chart 
  • Math, especially handling of NaN values representing no data 
  • Holt-Winters used for alerting 
  • Visualization options 
  • Deep linking 
Many of these are capabilities that are provided by the RRDTool library Epic was using, but most alternatives we looked at fell short in these categories. In addition, we have uses for other 3rd party services like CloudWatch and it is desirable to have common query capability for that data.


As noted above, metrics volume was growing and we needed a system that could keep up. For a long time our biggest concern was write volume, however, we also wanted to scale in terms of the amount of data we could read or aggregate as part of a graph request. Since then, we've scaled up Atlas significantly: This graph is substantially smoothed; during failover exercises when we send all North American traffic to one AWS region, we can easily see Atlas sustain greater than 1.2 billion time series (corresponding to publishing of billions of data points per minute)


Most time-series systems rely on metric identity which is, essentially, a string. As users of the system want the metric name to be more meaningful, they make it longer, more complicated, and include more data. For example, one metric name we had in the old system is:
This metric name encodes in itself some information:
Key Value
nccprt (Request Type)authorization
devtypid (Device Type ID)101
clver (Client Version)PHL_0AB
uiver (UI Version)UI_169_mid
geo (Geographic location)US

This created two types of problems:
  • Since it was all mangled into a name with different conventions by team, users would have to resort to complex regular expressions to try and slice/dice the data based on the dimensions.
  • It provided us with still-insufficient dimensionality. As we were running into metric length limits (255 characters in the old system), we could not, for example, differentiate metrics coming from one AWS availability zone vs another.
We wanted the ability to support many, many more dimensions, in a way that was predictable, easy to discovery for users, and possible to scale for us, the engineering team responsible for the care and feeding of this system.

In Atlas, a metric's identity is an arbitrary unique set of key-value pairs; a few of the keys are set automatically by the telemetry client library (e.g. server name, AWS zone, ASG, cluster, application, region, etc), with significant flexibility for the user to specify the keys that make sense for their use case, and essentially-unlimited unique values supported for any key.

What We Built

The Query Layer

In order to get a common API, have flexibility for backend implementations, and provide merged views across backends we built a query layer that can be hierarchically composed. The diagram below shows the main Netflix setup: We have isolated regional deployments in each operational region as well as a global deployment that can combine the results from multiple regions. The query and aggregation operations can be performed on the fan-out so most of the big summarization operations will distribute the computation across the tree and typically to an optimized storage layer at some point.

Allowing the query and rendering layer to work on multiple backends also makes it easier for us to consider transitioning to other backends in the future such as OpenTSDB or InfluxDB. Compatibility was one of the biggest hurdles we had in switching to Atlas, and this approach allows us to abstract parts of the system and avoid further transitioning pain (as experienced by our users).

The Stack Language

One of the key requirements for us was to be able to have deep links to a particular chart and be able to reliably pass around or embed these images via email, wikis, html pages, etc. In addition, the user who receives it should be able to tweak the result. Atlas uses a simple stack language that has a minimal punctuation and allows arbitrarily complex graph expressions to be encoded in a URL friendly way. This means that all images can be accessed using a GET request. It is also simple to parse and interpret so it can be easily consumed from a variety of tools. The core features:
  • Embedding and linking using GET request
  • URL friendly stack language
    • Few special symbols (comma, colon, parentheses)
    • Easy to extend
  • Basic operations
    • Query: and, or, equal, regex, has key, not
    • Aggregation: sum, count, min, max, group by
    • Consolidation: aggregate across time
    • Math: add, subtract, multiply, etc
    • Boolean: and, or, lt, gt, etc
    • Graph settings: legends, area, transparency

In-Memory Storage

Finding the right storage solution for Atlas -- and attempting to identify the best balance of cost with the necessary capabilities for speed and scale -- has in some respects been our biggest challenge. We tried many backends and ended up moving more and more to a model where pretty much all data available for querying is stored in memory either in or off the JVM heap.

Engineering for Performance

The primary goal for Atlas is to support queries over dimensional time series data so we can slice and dice to drill down into problems. This means we frequently have a need to perform a large aggregations that involve many data points even though the final result set might be small.

As an example consider a simple graph showing the number of requests per second hitting a service for the last 3 hours. Assuming minute resolution that is 180 datapoints for the final output. On a typical service we would get one time series per node showing the number of requests so if we have 100 nodes the intermediate result set is around 18k datapoints. For one service users went hog-wild with dimensions breaking down requests by device (~1000s) and country (~50) leading to about 50k time series per node. If we still assume 100 nodes that is about 900M datapoints for the same 3h line.

Though obviously we have to be mindful about the explosion of dimensions, we also want that where possible to be a decision based on cost and business value rather than a technical limitation.

We routinely see Atlas fetch and graph many billions of datapoints per second.

Engineering for Resilience

What has to be working in order for the monitoring system to work? If it falls over what is involved in getting it back up? Our focus is primarily operational insight so the top priority is to be able to determine what is going on right now. This leads to the following rules of thumb:
  • Data becomes exponentially less important as it gets older
  • Restoring service is more important than preventing data loss
  • Try to degrade gracefully
As a result the internal Atlas deployment breaks up the data into multiple windows based on the window of data they contain: With this setup we can show the last 6h of data as long as clients can successfully publish. The data is all in memory sharded across machines in the 6h clusters. Because the data and index are all in memory on the local node, each instance is self contained and doesn't need any external service to function. We typically run multiple mirrors of the 6h cluster so data is replicated and we can handle loss of an instance. In AWS we run each mirror in a different zone so that a zone failure will only impact a single mirror.

The publish cluster needs to know all instances in the mirror cluster and takes care of splitting the traffic up so it goes to the correct shard. The set of mirror instances and shards are assigned based on slots from the Edda autoScalingGroups API. Since the set of instances for the mirrors changes rarely, the publish instances can cache the Edda response and still successfully publish most data if Edda fails. If an instance is replaced and we can't update data we would have partial loss for a single shard if the same shard was missing in another mirror.

Historical data can also fail in which case graphs would not be able to show data for some older windows. This doesn't have to be fully continuous, for example a common use case for us is to look at week-over-week (WoW) charts even though the span of the chart might only be a few hours. If the < 4d cluster fails but the < 16d cluster is functioning we could still serve that graph even though we couldn't show a continuous graph for the full week. A graph would still be shown but would be missing data in the middle.

After data is written to the mirrors, they will flush to a persistence layer that is responsible for writing the data to the long term storage in S3. The data at full resolution is kept in S3 and we use Hadoop (Elastic MapReduce) for processing the data to perform corrective merging of data from the mirrors, generate reports, and perform rollups into a form that can be loaded into the historical clusters.

As always, we're aided by the fact that engineers at Netflix are responsible for the operational care and feeding of their systems, eliminating potential impedance mismatch between engineering concerns and operational concerns.

Engineering for Cost

Keeping all data in memory is expensive -- in particular with the large growth rate of data. The combination of dimensionality and time based partitioning used for resilience give us a way to help manage costs.
The first way is in controlling the number of replicas. In most cases we are using replicas for redundancy rather than for additional query capacity. For historical data that can be reloaded from stable storage we typically run only one replica as the duration of partial downtime was not deemed to be worth the cost for an additional replica.

The second way is as part of the Hadoop processing we can compute rollups so that we have a much smaller data volume to load in historical clusters. At Netflix the typical policy is roughly:
< 6 hoursKeep all data received
< 4 daysKeep most data; we do early rollup by dropping the node dimension on some business metrics
< 16 daysRollup by dropping the node dimension on all metrics
olderExplicit whitelist; we generally recommend BI systems for longer-term use-cases

Using these policies we get greatly reduced index sizes for the number of distinct time series despite a significant amount of churn. With auto-scaling and red/black deployment models the set of instances change frequently so typically the intersection of distinct time series from one day to the next is less than 50%. Rollups target the dimensions which lead to that churn giving us much smaller index sizes. Also, in many cases dimensions like node name that lead to this increase become less relevant after the node goes away. Deep-dive or investigative use-cases can still access the data using Hadoop if needed.

The Ecosystem

We've built an extensive ecosystem of products around Atlas; virtually every operational visibility system or dashboard used within Netflix utilizes data from it. In addition to the components described above, we've also built:
  • User interfaces
    • Main UI for browsing data and constructing queries.
    • Dashboards
    • Alerts
  • Platform
    • Inline aggregation of reported data before storage layer
    • Storage options using off-heap memory and lucene
    • Percentile backend
    • Publish and persistence applications
    • EMR processing for computing rollups and analysis
    • Poller for SNMP, healthchecks, etc
  • Client
    • Supports integrating Servo with Atlas
    • Local rollups and alerting
  • Real-Time Analytics
    • Metrics volume report
    • Automated Canary Analysis
    • Outlier and anomaly detection
    • Automated server culling based on outlier characteristics
Today, we are open-sourcing the query layer and some of the in-heap memory structure capabilities. Our intention is to open-source more of the ecosystem as soon as feasibly possible, focusing on the components most likely to be relevant to people outside Netflix first.

You can now find Atlas on Github; you can also find a more detailed overview there.

Brian Harrington and Roy Rapoport