Wednesday, January 28, 2015

Netflix Likes React


We are making big changes in the way we build the Netflix experience with Facebook’s React library. Today, we will share our thoughts on what makes React so compelling and how it is evolving our approach to UI development.

At the beginning of last year, Netflix UI engineers embarked on several ambitious projects to dramatically transform the user experience on our desktop and mobile platforms. Given a UI redesign of a scale similar to that undergone by TVs and game consoles, it was essential for us to re-evaluate our existing UI technology stack and to determine whether to explore new solutions. Do we have the right building blocks to create best-in-class single-page web applications? And what specific problems are we looking to solve?
Much of our existing front-end infrastructure consists of hand-rolled components optimized for the current website and iOS application. Our decision to adopt React was influenced by a number of factors, most notably: 1) startup speed, 2) runtime performance, and 3) modularity.

Startup Speed

We want to reduce the initial load time needed to provide Netflix members with a much more seamless, dynamic way to browse and watch individualized content. However, we find that the cost to deliver and render the UI past login can be significant, especially on our mobile platforms where there is a lot more variability in network conditions.

In addition to the time required to bootstrap our single-page application (i.e. download and process initial markup, scripts, stylesheets), we need to fetch data, including movie and show recommendations, to create a personalized experience. While network latency tends to be our biggest bottleneck, another major factor affecting startup performance is in the creation of DOM elements based on the parsed JSON payload containing the recommendations. Is there a way to minimize the network requests and processing time needed to render the home screen? We are looking for a hybrid solution that will allow us to deliver above-the-fold static markup on first load via server-side rendering, thereby reducing the tax incurred in the aforementioned startup operations, and at the same time enable dynamic elements in the UI through client-side scripting.

Runtime Performance

To build our most visually-rich cinematic Netflix experience to date for the website and iOS platforms, efficient UI rendering is critical. While there are fewer hardware constraints on desktops (compared to TVs and set-top boxes), expensive operations can still compromise UI responsiveness. In particular, DOM manipulations that result in reflows and repaints are especially detrimental to user experience.

Modularity

Our front-end infrastructure must support the numerous A/B tests we run in terms of the ability to rapidly build out new features and designs that code-wise must co-exist with the control experience (against which the new experiences are tested). For example, we can have an A/B test that compares 9 different design variations in the UI, which could mean maintaining code for 10 views for the duration of the test. Upon completion of the test, it should be easy for us to productize the experience that performed the best for our members and clean up code for the 9 other views that did not.

Advantages of React

React stood out in that its defining features not only satisfied the criteria set forth above, but offered other advantages including being relatively easy to grasp and ability to opt-out, for example, to handle custom user interactions and rendering code. We were able to leverage the following features to improve our application’s initial load times, runtime performance, and overall scalability: 1) isomorphic JavaScript, 2) virtual DOM rendering, and 3) support for compositional design patterns.

Isomorphic JavaScript

React enabled us to build JavaScript UI code that can be executed in both server (e.g. Node.js) and client contexts. To improve our start up times, we built a hybrid application where the initial markup is rendered server-side and the resulting UI elements are subsequently manipulated as done in a single-page application. It was possible to achieve this with React as it can render without a live DOM, e.g. via React.renderToString, or React.renderToStaticMarkup. Furthermore, the UI code written using the React library that is responsible for generating the markup could be shared with the client to handle cases where re-rendering was necessary.

Virtual DOM

To reduce the penalties incurred by live DOM manipulation, React applies updates to a virtual DOM in pure JavaScript and then determines the minimal set of DOM operations necessary via a diff algorithm. The diffing of virtual DOM trees is fast relative to actual DOM modifications, especially using today’s increasingly efficient JavaScript engines such as WebKit’s Nitro with JIT compilation. Furthermore, we can eliminate the need for traditional data binding, which has its own performance implications and scalability challenges.

React Components and Mixins

React provides powerful Component and Mixin APIs that we relied on heavily to create reusable views, share common functionality, and patterns to facilitate feature extension. When A/B testing different designs, we can implement the views as separate React subcomponents that get rendered by a parent component depending on the user’s allocation in the test. Similarly, differences in behavioral logic can be abstracted into React mixins. Although it is possible to achieve modularity with a classical inheritance pattern, frequent changes in superclass interfaces to support new features affects existing subclasses and increases code fragility. React’s compositional pattern is ideal for overall maintenance and scalability of our front-end codebase as it isolates much of the A/B test code.

React has exceeded our requirements and enabled us to build a tremendous foundation on which to innovate the Netflix experience. Stay tuned in the coming months, as we will dive more deeply into how we are using React to transform traditional UI development!

By Jordanna Kwok


Tuesday, January 27, 2015

Netflix's Viewing Data: How We Know Where You Are in House of Cards

Over the past 7 years, Netflix streaming has expanded from thousands of members watching occasionally to millions of members watching over two billion hours every month.  Each time a member starts to watch a movie or TV episode, a “view” is created in our data systems and a collection of events describing that view is gathered.  Given that viewing is what members spend most of their time doing on Netflix, having a robust and scalable architecture to manage and process this data is critical to the success of our business.  In this post we’ll describe what works and what breaks in an architecture that processes billions of viewing-related events per day.

Use Cases

By focusing on the minimum viable set of use cases, rather than building a generic all-encompassing solution, we have been able to build a simple architecture that scales.  Netflix’s viewing data architecture is designed for a variety of use cases, ranging from user experiences to data analytics.  The following are three key use cases, all of which affect the user experience:

What titles have I watched?

Our system needs to know each member’s entire viewing history for as long as they are subscribed.  This data feeds the recommendation algorithms so that a member can find a title for whatever mood they’re in.  It also feeds the “recent titles you’ve watched” row in the UI.  What gets watched provides key metrics for the business to measure member engagement and make informed product and content decisions.

Where did I leave off in a given title?

For each movie or TV episode that a member views, Netflix records how much was watched and where the viewer left off.   This enables members to continue watching any movie or TV show on the same or another device.

What else is being watched on my account right now?

Sharing an account with other family members usually means everyone gets to enjoy what they like when they’d like.  It also means a member may have to have that hard conversation about who has to stop watching if they’ve hit their account’s concurrent screens limit.  To support this use case, Netflix’s viewing data system gathers periodic signals throughout each view to determine whether a member is or isn’t still watching.

Current Architecture

Our current architecture evolved from an earlier monolithic database-backed application (see this QCon talk or slideshare for the detailed history).  When it was designed, the primary requirements were that it must serve the member-facing use cases with low latency and it should be able to handle a rapidly expanding set of data coming from millions of Netflix streaming devices.  Through incremental improvements over 3+ years, we’ve been able to scale this to handle low billions of events per day.

Current Architecture Diagram

The current architecture’s primary interface is the viewing service, which is segmented into a stateful and stateless tier.  The stateful tier has the latest data for all active views stored in memory.  Data is partitioned into N stateful nodes by a simple mod N of the member’s account id.  When stateful nodes come online they go through a slot selection process to determine which data partition will belong to them.  Cassandra is the primary data store for all persistent data.  Memcached is layered on top of Cassandra as a guaranteed low latency read path for materialized, but possibly stale, views of the data.


We started with a stateful architecture design that favored consistency over availability in the face of network partitions (for background, see the CAP theorem).  At that time, we thought that accurate data was better than stale or no data.  Also, we were pioneering running Cassandra and memcached in the cloud so starting with a stateful solution allowed us to mitigate risk of failure for those components.  The biggest downside of this approach was that failure of a single stateful node would prevent 1/nth of the member base from writing to or reading from their viewing history.


After experiencing outages due to this design, we reworked parts of the system to gracefully degrade and provide limited availability when failures happened.  The stateless tier was added later as a pass-through to external data stores. This improved system availability by providing stale data as a fallback mechanism when a stateful node was unreachable.

Breaking Points

Our stateful tier uses a simple sharding technique (account id mod N) that is subject to hot spots, as Netflix viewing usage is not evenly distributed across all current members.  Our Cassandra layer is not subject to these hot spots, as it uses consistent hashing with virtual nodes to partition the data.  Additionally, when we moved from a single AWS region to running in multiple AWS regions, we had to build a custom mechanism to communicate the state between stateful tiers in different regions.  This added significant, undesirable complexity to our overall system.


We created the viewing service to encapsulate the domain of viewing data collection, processing, and providing.  As that system evolved to include more functionality and various read/write/update use cases, we identified multiple distinct components that were combined into this single unified service.  These components would be easier to develop, test, debug, deploy, and operate if they were extracted into their own services.


Memcached offers superb throughput and latency characteristics, but isn’t well suited for our use case.  To update the data in memcached, we read the latest data, append a new view entry (if none exists for that movie) or modify an existing entry (moving it to the front of the time-ordered list), and then write the updated data back to memcached.  We use an eventually consistent approach to handling multiple writers, accepting that an inconsistent write may happen but will get corrected soon after due to a short cache entry TTL and a periodic cache refresh.  For the caching layer, using a technology that natively supports first class data types and operations like append would better meet our needs.


We created the stateful tier because we wanted the benefit of memory speed for our highest volume read/write use cases.  Cassandra was in its pre-1.0 versions and wasn’t running on SSDs in AWS.  We thought we could design a simple but robust distributed stateful system exactly suited to our needs, but ended up with a complex solution that was less robust than mature open source technologies.  Rather than solve the hard distributed systems problems ourselves, we’d rather build on top of proven solutions like Cassandra, allowing us to focus our attention on solving the problems in our viewing data domain.


Next Generation Architecture

In order to scale to the next order of magnitude, we’re rethinking the fundamentals of our architecture.  The principles guiding this redesign are:
  • Availability over consistency - our primary use cases can tolerate eventually consistent data, so design from the start favoring availability rather than strong consistency in the face of failures.
  • Microservices - Components that were combined together in the stateful architecture should be separated out into services (components as services).
    • Components are defined according to their primary purpose - either collection, processing, or data providing.
    • Delegate responsibility for state management to the persistence tiers, keeping the application tiers stateless.
    • Decouple communication between components by using signals sent through an event queue.
  • Polyglot persistence - Use multiple persistence technologies to leverage the strengths of each solution.
    • Achieve flexibility + performance at the cost of increased complexity.
    • Use Cassandra for very high volume, low latency writes.  A tailored data model and tuned configuration enables low latency for medium volume reads.
    • Use Redis for very high volume, low latency reads.  Redis’ first-class data type support should support writes better than how we did read-modify-writes in memcached.


Our next generation architecture will be made up of these building blocks:


Re-architecting a critical system to scale to the next order of magnitude is a hard problem, requiring many months of development, testing, proving out at scale, and migrating off of the previous architecture.  Guided by these architectural principles, we’re confident that the next generation that we are building will give Netflix a strong foundation to meet the needs of our massive and growing scale, enabling us to delight our global audience.  We are in the early stages of this effort, so if you are interested in helping, we are actively hiring for this work.   In the meantime, we’ll follow up this post with a future one focused on the new architecture.



Tuesday, January 20, 2015

Introducing Surus and ScorePMML

Today we’re announcing a new Netflix-OSS project called Surus. Over the next year we plan to release a handful of our internal user defined functions (UDF’s) that have broad adoption across Netflix. The use cases for these functions are varied in nature (e.g. scoring predictive models, outlier detection, pattern matching, etc.) and together extend the analytical capabilities of big data.

The first function we’re releasing allows for efficient scoring of predictive models in Apache Pig using Predictive Modeling Markup Language. PMML is an open source standard that supports a concise representation of predictive models in XML and hence the name of the new function, ScorePMML.

ScorePMML


At Netflix, we use predictive models everywhere. Although the applications for each model are different, the process by which each of these predictive models is built and deployed is consistent. The process usually looks like this:

  1. Someone proposes an idea and builds a model on “small” data
  2. We decide to “scale-up” the prototype to see how well the model generalizes to a larger dataset
  3. We may eventually put the model into “production”

At Netflix, we have different tools for each step above. When scoring data in our hadoop environment, we noticed a proliferation of custom scoring approaches operating in steps two and three. This implementation of custom scoring approaches added overhead as individual developers migrated models through the process. Our solution was to adopt PMML as a standard way to represent model output and to write ScorePMML as a UDF for scoring PMML files at scale.

ScorePMML aligns Netflix predictive modeling capabilities around the open-source PMML standard. By leveraging the open-source standard, we enable a flexible and consistent representation of predictive models for each of the steps mentioned above. By using the same PMML representation of the predictive model at each step in the modeling process, we save time/money by reducing both the risk and cost of custom code. PMML provides an effective foundation to iterate quickly for the modeling methods it supports. Our data scientists have started adopting ScorePMML where it allows them to iterate and deploy models more effectively than the legacy approach.

An Example


Now for the practical part. Let’s imagine that you’re building a model in R. You might do something like this….

# Required Dependencies
require(randomForest)
require(gbm)
require(pmml)
require(XML)
data(iris)

# Column Names must NOT contain periods
names(iris) <- gsub("\\.","_",tolower(names(iris)))

# Build Models
iris.rf  <- randomForest(Species ~ ., data=iris, ntree=5)
iris.gbm <- gbm(Species ~ ., data=iris, n.tree=5)

# Convert to pmml
# Output to File
saveXML(pmml(iris.rf) ,file="~/iris.rf.xml")
saveXML(pmml(iris.gbm, n.trees=5),file="~/iris.gbm.xml")

And, now let’s say that you want to score 100 billion rows…

REGISTER '~/scoring.jar';

DEFINE pmmlRF  com.netflix.pmml.ScorePMML('~/iris.rf.xml');
DEFINE pmmlGBM com.netflix.pmml.ScorePMML('~/iris.gbm.xml');

-- LOAD Data
iris = load '~/iris.csv' using PigStorage(',') as
      (sepal_length,sepal_width,petal_length,petal_width,species);

-- Score two models in one pass over the data
scored = foreach iris generate pmmlRF(*) as RF, pmmlGBM(*) as GBM;
dump scored;

That’s how easy it should be.

There are a couple of things you should think about though before trying to score 100 billion records in Pig.  

  • We throw a Pig FrontendException when the Pig/Hive data types and column names don’t match the data types and column names in PMML. This means that you don’t need to wait for the Hadoop MR job to start before getting the feedback that something is wrong.
  • The ScorePMML constructor accepts local or remote file locations. This means that you can reference an HDFS or S3 path, or you can reference a local path (see the example above).
  • We’ve made scoring multiple models in parallel trivial. Furthermore, models are only read into memory once, so there isn’t a penalty when processing multiple models at the same time.
  • When scoring big (and usually uncontrolled) datasets it’s important to handle errors gracefully. You don’t want to rescore 100 records because you fail on the 101st record. Rather than throwing an exception (and failing the job) we’ve added an indicator to the output tuple that can be used for alerting.
  • Although this is currently written to be run in Pig we may migrate in the future to different platforms.

Obviously, more can be done. We welcome ideas on how to make the code better.  Feel free to make a pull request!

Conclusion


We’re excited to introduce Surus and share with the world in the upcoming months various UDF’s we find helpful while analyzing data at Netflix. ScorePMML was a big win for Netflix as we sought to streamline our processing and to minimize the time to production for our models. We hope that with this function (and others soon to be released) that you’ll be able to spend more time making cool stuff and less time struggling with the mundane.

Known Issues/Limitations


  • ScorePMML is built on jPMML 1.0.19, which doesn’t fully support the 4.2 PMML specification (as defined by the Data Mining Group). At the time of this writing not all enumerated missing value strategies are supported. This caused problems when we wanted to implement GBMs in PMML, so we had to add extra nodes in each tree to properly handle missing values.
  • Hive 0.12.0 (and thus Pig) has strict naming conventions for columns/relations which are relaxed in PMML. Non alpha-numeric characters in column names are not supported in ScorePMML. Please see the Hive documentation for more details on column naming in the Hive metastore.

Additional Resources


  • The Data Mining Group PMML Spec: The 4.1.2 specification is currently supported. The 4.2 version of the PMML spec is not currently supported. The DMG page will give you a sense of which model types are supported and how they are described in PMML.
  • jPMML: A collection of GitHub projects that contain tools for using PMML. Including an alternative Pig implementation, jpmml-pig, written by Villu Ruusmann.
  • RPMML: An R-package for creating PMML files from common predictive modeling objects.

Friday, December 12, 2014

Introducing Atlas: Netflix's Primary Telemetry Platform

Various previous Tech Blog posts have referred to our centralized monitoring system, and we've presented at least one talk about it previously. Today, we want to both discuss the platform and ecosystem we built for time-series telemetry and its capabilities and announce the open-sourcing of its underlying foundation.


How We Got Here

While working in the datacenter, telemetry was split between an IT-provisioned commercial product and a tool a Netflix engineer wrote that allowed engineers to send in arbitrary time-series data and then query that data. This tool's flexibility was very attractive to engineers, so it became the primary system of record for time series data. Sadly, even in the datacenter we found that we had significant problems scaling it to about two million distinct time series. Our global expansion, increase in platforms and customers and desire to improve our production systems' visibility required us to scale much higher, by an order of magnitude (to 20M metrics) or more. In 2012, we started building Atlas, our next-generation monitoring platform. In late 2012, it started being phased into production, with production deployment completed in early 2013. 

Our Goals


Common API

Our previous home-grown tool, internally known as Epic, did a number of things really well that we didn't want to lose when transitioning. In particular:
  • Normalization and consolidation 
  • Flexible legends that scale independently of the chart 
  • Math, especially handling of NaN values representing no data 
  • Holt-Winters used for alerting 
  • Visualization options 
  • Deep linking 
Many of these are capabilities that are provided by the RRDTool library Epic was using, but most alternatives we looked at fell short in these categories. In addition, we have uses for other 3rd party services like CloudWatch and it is desirable to have common query capability for that data.

Scale 

As noted above, metrics volume was growing and we needed a system that could keep up. For a long time our biggest concern was write volume, however, we also wanted to scale in terms of the amount of data we could read or aggregate as part of a graph request. Since then, we've scaled up Atlas significantly: This graph is substantially smoothed; during failover exercises when we send all North American traffic to one AWS region, we can easily see Atlas sustain greater than 1.2 billion time series (corresponding to publishing of billions of data points per minute)

Dimensionality

Most time-series systems rely on metric identity which is, essentially, a string. As users of the system want the metric name to be more meaningful, they make it longer, more complicated, and include more data. For example, one metric name we had in the old system is:
com.netflix.eds.nccp.successful.requests.uiversion.nccprt-authorization.devtypid-101.clver-PHL_0AB.uiver-UI_169_mid.geo-US
This metric name encodes in itself some information:
Key Value
Namecom.netflix.eds.nccp.successful.requests.uiversion
nccprt (Request Type)authorization
devtypid (Device Type ID)101
clver (Client Version)PHL_0AB
uiver (UI Version)UI_169_mid
geo (Geographic location)US

This created two types of problems:
  • Since it was all mangled into a name with different conventions by team, users would have to resort to complex regular expressions to try and slice/dice the data based on the dimensions.
  • It provided us with still-insufficient dimensionality. As we were running into metric length limits (255 characters in the old system), we could not, for example, differentiate metrics coming from one AWS availability zone vs another.
We wanted the ability to support many, many more dimensions, in a way that was predictable, easy to discovery for users, and possible to scale for us, the engineering team responsible for the care and feeding of this system.

In Atlas, a metric's identity is an arbitrary unique set of key-value pairs; a few of the keys are set automatically by the telemetry client library (e.g. server name, AWS zone, ASG, cluster, application, region, etc), with significant flexibility for the user to specify the keys that make sense for their use case, and essentially-unlimited unique values supported for any key.

What We Built


The Query Layer

In order to get a common API, have flexibility for backend implementations, and provide merged views across backends we built a query layer that can be hierarchically composed. The diagram below shows the main Netflix setup: We have isolated regional deployments in each operational region as well as a global deployment that can combine the results from multiple regions. The query and aggregation operations can be performed on the fan-out so most of the big summarization operations will distribute the computation across the tree and typically to an optimized storage layer at some point.

Allowing the query and rendering layer to work on multiple backends also makes it easier for us to consider transitioning to other backends in the future such as OpenTSDB or InfluxDB. Compatibility was one of the biggest hurdles we had in switching to Atlas, and this approach allows us to abstract parts of the system and avoid further transitioning pain (as experienced by our users).

The Stack Language

One of the key requirements for us was to be able to have deep links to a particular chart and be able to reliably pass around or embed these images via email, wikis, html pages, etc. In addition, the user who receives it should be able to tweak the result. Atlas uses a simple stack language that has a minimal punctuation and allows arbitrarily complex graph expressions to be encoded in a URL friendly way. This means that all images can be accessed using a GET request. It is also simple to parse and interpret so it can be easily consumed from a variety of tools. The core features:
  • Embedding and linking using GET request
  • URL friendly stack language
    • Few special symbols (comma, colon, parentheses)
    • Easy to extend
  • Basic operations
    • Query: and, or, equal, regex, has key, not
    • Aggregation: sum, count, min, max, group by
    • Consolidation: aggregate across time
    • Math: add, subtract, multiply, etc
    • Boolean: and, or, lt, gt, etc
    • Graph settings: legends, area, transparency

In-Memory Storage

Finding the right storage solution for Atlas -- and attempting to identify the best balance of cost with the necessary capabilities for speed and scale -- has in some respects been our biggest challenge. We tried many backends and ended up moving more and more to a model where pretty much all data available for querying is stored in memory either in or off the JVM heap.

Engineering for Performance

The primary goal for Atlas is to support queries over dimensional time series data so we can slice and dice to drill down into problems. This means we frequently have a need to perform a large aggregations that involve many data points even though the final result set might be small.

As an example consider a simple graph showing the number of requests per second hitting a service for the last 3 hours. Assuming minute resolution that is 180 datapoints for the final output. On a typical service we would get one time series per node showing the number of requests so if we have 100 nodes the intermediate result set is around 18k datapoints. For one service users went hog-wild with dimensions breaking down requests by device (~1000s) and country (~50) leading to about 50k time series per node. If we still assume 100 nodes that is about 900M datapoints for the same 3h line.

Though obviously we have to be mindful about the explosion of dimensions, we also want that where possible to be a decision based on cost and business value rather than a technical limitation.

We routinely see Atlas fetch and graph many billions of datapoints per second.

Engineering for Resilience

What has to be working in order for the monitoring system to work? If it falls over what is involved in getting it back up? Our focus is primarily operational insight so the top priority is to be able to determine what is going on right now. This leads to the following rules of thumb:
  • Data becomes exponentially less important as it gets older
  • Restoring service is more important than preventing data loss
  • Try to degrade gracefully
As a result the internal Atlas deployment breaks up the data into multiple windows based on the window of data they contain: With this setup we can show the last 6h of data as long as clients can successfully publish. The data is all in memory sharded across machines in the 6h clusters. Because the data and index are all in memory on the local node, each instance is self contained and doesn't need any external service to function. We typically run multiple mirrors of the 6h cluster so data is replicated and we can handle loss of an instance. In AWS we run each mirror in a different zone so that a zone failure will only impact a single mirror.

The publish cluster needs to know all instances in the mirror cluster and takes care of splitting the traffic up so it goes to the correct shard. The set of mirror instances and shards are assigned based on slots from the Edda autoScalingGroups API. Since the set of instances for the mirrors changes rarely, the publish instances can cache the Edda response and still successfully publish most data if Edda fails. If an instance is replaced and we can't update data we would have partial loss for a single shard if the same shard was missing in another mirror.

Historical data can also fail in which case graphs would not be able to show data for some older windows. This doesn't have to be fully continuous, for example a common use case for us is to look at week-over-week (WoW) charts even though the span of the chart might only be a few hours. If the < 4d cluster fails but the < 16d cluster is functioning we could still serve that graph even though we couldn't show a continuous graph for the full week. A graph would still be shown but would be missing data in the middle.

After data is written to the mirrors, they will flush to a persistence layer that is responsible for writing the data to the long term storage in S3. The data at full resolution is kept in S3 and we use Hadoop (Elastic MapReduce) for processing the data to perform corrective merging of data from the mirrors, generate reports, and perform rollups into a form that can be loaded into the historical clusters.

As always, we're aided by the fact that engineers at Netflix are responsible for the operational care and feeding of their systems, eliminating potential impedance mismatch between engineering concerns and operational concerns.

Engineering for Cost

Keeping all data in memory is expensive -- in particular with the large growth rate of data. The combination of dimensionality and time based partitioning used for resilience give us a way to help manage costs.
The first way is in controlling the number of replicas. In most cases we are using replicas for redundancy rather than for additional query capacity. For historical data that can be reloaded from stable storage we typically run only one replica as the duration of partial downtime was not deemed to be worth the cost for an additional replica.

The second way is as part of the Hadoop processing we can compute rollups so that we have a much smaller data volume to load in historical clusters. At Netflix the typical policy is roughly:
ClusterPolicy
< 6 hoursKeep all data received
< 4 daysKeep most data; we do early rollup by dropping the node dimension on some business metrics
< 16 daysRollup by dropping the node dimension on all metrics
olderExplicit whitelist; we generally recommend BI systems for longer-term use-cases

Using these policies we get greatly reduced index sizes for the number of distinct time series despite a significant amount of churn. With auto-scaling and red/black deployment models the set of instances change frequently so typically the intersection of distinct time series from one day to the next is less than 50%. Rollups target the dimensions which lead to that churn giving us much smaller index sizes. Also, in many cases dimensions like node name that lead to this increase become less relevant after the node goes away. Deep-dive or investigative use-cases can still access the data using Hadoop if needed.

The Ecosystem

We've built an extensive ecosystem of products around Atlas; virtually every operational visibility system or dashboard used within Netflix utilizes data from it. In addition to the components described above, we've also built:
  • User interfaces
    • Main UI for browsing data and constructing queries.
    • Dashboards
    • Alerts
  • Platform
    • Inline aggregation of reported data before storage layer
    • Storage options using off-heap memory and lucene
    • Percentile backend
    • Publish and persistence applications
    • EMR processing for computing rollups and analysis
    • Poller for SNMP, healthchecks, etc
  • Client
    • Supports integrating Servo with Atlas
    • Local rollups and alerting
  • Real-Time Analytics
    • Metrics volume report
    • Automated Canary Analysis
    • Outlier and anomaly detection
    • Automated server culling based on outlier characteristics
Today, we are open-sourcing the query layer and some of the in-heap memory structure capabilities. Our intention is to open-source more of the ecosystem as soon as feasibly possible, focusing on the components most likely to be relevant to people outside Netflix first.

You can now find Atlas on Github; you can also find a more detailed overview there.

Brian Harrington and Roy Rapoport

Monday, December 8, 2014

Version 7: The Evolution of JavaScript

By Jafar Husain


We held our last Netflix JavaScript Talks event of 2014 on Dec. 2nd, where we discussed some of the interesting features already available for use in JavaScript 6, and where things are headed with JavaScript 7. As a member of the JavaScript TC39 ECMAScript committee, Netflix has been working with other committee members to explore powerful new features like Object.observe, async functions and async generators. Many of these features have already started appearing in modern browsers and it’s not too early to start playing with them.

The video from the event is now available to watch below or on our Netflix UI Engineering channel on YouTube along with other talks from past events.


It’s been a lot of fun hosting our Netflix JavaScript Talks series this past year and we have several exciting talks planned for 2015. If you’re interested in being notified of our future events, just sign up on our notification list. We’ll also be sharing some interesting work we’re doing with React at the React.js Conf at the end of January.