Thursday, February 19, 2015

RAD - Outlier Detection on Big Data

Outlier detection can be a pain point for all data driven companies, especially as data volumes grow. At Netflix we have multiple datasets growing by 10B+ record/day and so there’s a need for automated anomaly detection tools ensuring data quality and identifying suspicious anomalies. Today we are open-sourcing our outlier detection function, called Robust Anomaly Detection (RAD), as part of our Surus project.

As we built RAD we identified four generic challenges that are ubiquitous in outlier detection on “big data.”

  • High cardinality dimensions: High cardinality data sets - especially those with large combinatorial permutations of column groupings - makes human inspection impractical.
  • Minimizing False Positives: A successful anomaly detection tool must minimize false positives. In our experience there are many alerting platforms that “sound an alarm” that goes ultimately unresolved. The goal is to create alerting mechanisms that can be tuned to appropriately balance noise and information.
  • Seasonality: Hourly/Weekly/Bi-weekly/Monthly seasonal effects are common and can be mis-identified as outliers deserving attention if not handled properly. Seasonal variability needs to be ignored.
  • Data is not always normally distributed: This has been a particular challenge since Netflix has been growing over the last 24 months. Generally though, an outlier tool must be robust so that it works on data that is not normally distributed.

In addition to addressing the challenges above, we wanted a solution with a generic interface (supporting application development). We met these objectives with a novel algorithm encased in a wrapper for easy deployment in our ETL environment.


We initially tested techniques like moving averages with standard deviations and time series/regression models (ARIMAX) but found that these simpler methods were not robust enough in high cardinality data.

The algorithm we finally settled on uses Robust Principal Component Analysis (RPCA) to detect anomalies. PCA uses the Singular Value Decomposition (SVD) to find low rank representations of the data. The robust version of PCA (RPCA) identifies a low rank representation, random noise, and a set of outliers by repeatedly calculating the SVD and applying “thresholds” to the singular values and error for each iteration. For more information please refer to the original paper by Candes et al. (2009).

Below is an interactive visualization of the algorithm at work on a simple/random dataset and on public climate data.

Pig Wrapper

Since Apache Pig is the primary ETL language at Netflix, we wrapped this algorithm in a Pig function enabling engineers to easily use it with just a few additional lines of code. We’ve open-sourced both the Java function that implements the algorithm and the Pig wrapper. The details and a sample application (with code) can be found here.

Business Application

The following are two popular applications where we initially implemented this anomaly detection system at Netflix with great success.

Netflix processes millions of transactions every day across tens of thousands of banking institutions/infrastructures in both real-time and batch environments. We’ve used the above solution to detect anomalies in failures in the payment network at a bank level. With the above system, business managers were able to follow up with their counterparts in the payment industry and thereby reducing the impact on Netflix customers

Our signup flow was another important point of application. Today Netflix customers sign up across the world on hundreds of different types of browsers or devices. Identifying anomalies across unique combinations of country, browser/device and language helps our engineers understand and react to customer sign up problems in a timely manner.


A robust algorithm is paramount to the success of any anomaly detection system and RPCA has worked very well for detecting anomalies. Along with the algorithm, our focus on simplifying the implementation with a Pig wrapper made the tool a great success. The applications listed above have helped the Netflix data teams understand and react to anomalies faster--which reduces the impact to Netflix customers and our overall business.

Wednesday, February 18, 2015

A Microscope on Microservices

by Coburn Watson, Scott Emmons, and Brendan Gregg

At Netflix we pioneer new cloud architectures and technologies to operate at massive scale - a scale which breaks most monitoring and analysis tools. The challenge is not just handling a massive instance count but to also provide quick, actionable insight for a large-scale, microservice-based architecture. Out of necessity we've developed our own tools for performance and reliability analysis, which we've also been open-sourcing (e.g., Atlas for cloud-wide monitoring). In this post we’ll discuss tools that the Cloud Performance and Reliability team have been developing, which are used together like a microscope switching between different magnifications as needed.

Request Flow (10X Magnification)

We'll start at a wide scale, visualizing the relationship between microservices to see which are called and how much time is spent in each:

Using an in-house dapper-like framework, we are able to layer the request demand through the aggregate infrastructure onto a simple visualization. This internal utility, Slalom, allows a given service to understand upstream and downstream dependencies, their contribution on service demand, and the general health of said requests. Data is initially represented through d3-based Sankey diagrams, with a detailed breakdown on absolute service demand and response status codes.
This high-level overview gives a general picture of all the distributed services that are composed to satisfy a request. The height of each service node shows the amount of demand on that service, with the outgoing links showing demand on a downstream service relative to its siblings.

Double-clicking a single service exposes the bi-directional demand over the time window:

The macro visualization afforded by Slalom is limited based on the data available in the underlying metrics that are sampled. To bring into focus additional metric dimensions beyond simple IPC interactions we built another tool, Mogul.

Show me my bottleneck! (100X)

The ability to decompose where time is spent both within and across the fleet of microservices can be a challenge given the number of dependencies.  Such information can be leveraged to identify the root cause of performance degradation or identify areas ripe for optimization within a given microservice.  Our Mogul utility consumes data from Netflix’s recently open-sourced Atlas monitoring framework, applies correlation between metrics, and selects those most likely to be responsible for changes in demand on a given microservice. The different resources evaluated include:
  • System resource demand (CPU, network, disk)
  • JVM pressure (thread contention, garbage collection)
  • Service IPC calls
  • Persistency-related calls (EVCache, Cassandra)
  • Errors and timeouts
It is not uncommon for a mogul query to pull thousands of metrics, subsequently reducing to tens of metrics through correlation with system demand. In the following example, we were able to quickly identify which downstream service was causing performance issues for the service under study. This particular microservice has over 40,000 metrics. Mogul reduced this internally to just over 2000 metrics via pattern matching, then correlated the top 4-6 interesting metrics grouped into classifications.

The following diagram displays a perturbation in the microservice response time (blue line) as it moves from ~125 to over 300 milliseconds.  The underlying graphs identifies those downstream calls that have a time-correlated increase in system demand.

Like Slalom, Mogul uses Little’s Law - the product of response time and throughput - to compute service demand.

My instance is bad ... or is it? (1000X)

Those running on the cloud or virtualized environments are not unfamiliar with the phrase “my instance is bad.” To evaluate if a given host is unstable or under pressure it is important to have the right metrics available on-demand and at a high resolution (5 seconds or less).  Enter Vector, our on-instance performance monitoring framework which exposes hand-picked, high-resolution system metrics to every engineer’s browser.  Leveraging the battle tested system monitoring framework Performance Co-Pilot (pcp) we are able to layer on a UI that polls instance-level metrics between every 1 and 5 seconds.  

This resolution of system data exposes possible multi-modal performance behavior not visible with higher-level aggregations.  Many times a runaway thread has been identified as a root cause of performance issues while overall CPU utilization remains low.  Vector abstracts away the complexity typically required with logging onto a system and running a large number of commands from the shell.

A key feature of Vector and pcp is extensibility.  We have created multiple custom pcp agents to expose additional key performance views.  One example is a flamegraph generated by sampling the on-host Java process using jstack.  This view allows an engineer to quickly drill on where the Java process is spending CPU time.

Next Steps..To Infinity and Beyond

The above tools have proved invaluable in the domain of performance and reliability analysis at Netflix, and  we are looking to open source Vector in the coming months. In the meantime we continue to extend our toolset by improving instrumentation capabilities at a base level. One example is a patch on OpenJDK which allows the generation of extended stack trace data that can be used to visualize system-through-user space time in the process stack.


It quickly became apparent at Netflix’s scale that viewing the performance of the aggregate system through a single lens would be insufficient.  Many commercial tools promise a one-stop shop but have rarely scaled to meet our needs.  Working from a macro-to-micro view, our team developed tools based upon the use cases we most frequently analyze and triage.  The result is much like a microscope which lets engineering teams select the focal length that most directly targets their dimension of interest.

As one engineer on our team puts it, “Most current performance tool methodologies are so 1990’s.” Finding and dealing with future observability challenges is key to our charter, and we have the team and drive to accomplish it.  If you would like to join us in tackling this kind of work, we are hiring!

Tuesday, February 10, 2015

What's trending on Netflix?

Every day, millions of members across the globe, from thousands of devices, visit Netflix and generate millions of viewing hours. The majority of these viewing hours are generated through the videos that are recommended by our recommender systems. We continue to invest in improving our recommender systems that aid our members to discover and watch the specific content they love. We are constantly trying to improve the quality of the recommendations using the sound foundation of AB testing.

On that front, we recently AB tested introducing a new row of videos on the home screen called “Trending Now”, which shows the videos that are trending in Netflix infused with some personalization for our members. This post explains how we built the backend infrastructure that powers the Trending Now row.

Traditionally, we pre-compute many of the recommendations for our members based on a combination of explicit signals (viewing history, ratings, My List, etc.) and other implicit signals (scroll activity, navigation, etc.) within Netflix, in near-line fashion. However, the Trending Now row is computed as events happen in real time. This allows us to not only personalize this row based on the context like time of day and day of week, but also react to sudden changes in collective interests of members, due to a real-world events such as Oscars or Halloween.

Data Collection

There are primarily two data streams that are used to determine the trending videos:
  • Play events: Videos that are played by our member
  • Impression events: Videos seen by our members in their view port

Netflix embraces Service Oriented Architecture (SOA) composed of many small fine grained services that do one thing and one thing well. In that vein, Viewing History Service captures all the videos that are played by our members. Beacon is another service that captures all impression events and user activities within Netflix. The requirement of computing recommendations in real time, presents us with an exciting challenge to make our data collection/processing pipeline a low latency, highly scalable and resilient system. We chose Kafka, a distributed messaging system, for our data pipeline as it has proven to handle millions of events per second. All the data collected by the Viewing History and Beacon services are sent to Kafka.

Data Processing

We built a custom stream processor that consumes the play and impressions events from Kafka and computes the following aggregated data:
  • Play popularity: How many times is a video played
  • Take rate: Fraction of play events over impression events for a given video

The first step in the data processing layer is to join the play and impression streams. We join them by request id, which is a unique identifier used to tie the front end calls to the backend service calls. With this join, all the plays and impressions events are grouped together for a given request id as illustrated in the figure below.

This joined stream is then partitioned by video id, for all the plays and impression events of a given video to be processed at the same consumer instance. This way, each consumer will be able to atomically calculate the total number of plays and impressions data for every video. The aggregated play popularity and take rate data are persisted into Cassandra, as shown in the figure below.

Real Time Data Monitoring

Given the importance of the data quality to the recommendation system and the user experience, we continuously do canary analysis for the event streams. This involves simple validations such as the presence of mandatory attributes within an event to more complex validations such as finding the absence of an event within a time window. With appropriate alerting in place, within minutes of every UI push, we are able to catch any data regressions with this real time stream monitoring.

It is imperative that the Kafka consumers are able to keep up with the incoming load into Kafka. Processing an event that was minutes old will neither provide a real trending effect nor help find data regression issues soon.

Bringing it all together

On a live user request, the aggregated play popularity and take rate data along with other explicit signals such as members’ viewing history and past ratings are used to compute a personalized Trending now row. The following figure shows the end to end infrastructure for building Trending Now row.

Netflix has a data-driven culture that is key to our success.  With billions of member viewing events and tens of millions of categorical preferences, we have endless opportunities to improve our recommendations even further.

We are in the midst of replacing our custom stream processor with Spark Streaming. Stay tuned for an upcoming tech blog on our resiliency testing on Spark Streaming.

If you would like to join us in tackling these kinds of challenges, we are hiring!

Nicobar: Dynamic Scripting Library for Java

By James Kojo, Vasanth Asokan, George Campbell, Aaron Tull

The Netflix API is the front door to the streaming service, handling billions of requests per day from more than 1000 different device types around the world. To provide the best experience to our subscribers, it is critical that our UI teams have the ability to innovate at a rapid pace. As described in our blog post a year ago, we developed a Dynamic Scripting Platform that enables this rapid innovation.

Today, we are happy to announce Nicobar, the open source script execution library that allows our UI teams to inject UI-specific adapter code dynamically into our JVM without the API team’s involvement. Named after a remote archipelago in the eastern Indian Ocean, Nicobar allows each UI team to have its own island of code to optimize the client/server interaction for each device, evolved at its own pace.


As of this post’s writing, a single Netflix API instance hosts hundreds of UI scripts, developed by a dozen teams. Together, they deploy anywhere between a handful to a hundred UI scripts per day. A strong, core scripting library is what allows the API JVM to handle this rate of deployment reliably and efficiently.

Our success with the scripting approach in the API platform led us to identify other applications that could benefit also from the ability to alter their behavior without a full scale deployment. Nicobar is a library that provides this functionality in a compact and reusable manner, with pluggable support for JVM languages.

Architecture Overview

Early implementations of dynamic scripting at Netflix used basic java classloader technology to host and sandbox scripts from one another. While this was a good start, it was not nearly enough. Standard Java classloaders can have only one parent, and thus allow only simple, flattened hierarchies. If one wants to share classloaders, this is a big limitation and an inefficient use of memory. Also, code loaded within standard classloaders is fully visible to downstream classloaders. Finer-grained visibility controls are helpful in restricting what packages are exported and imported into classloaders.

Given these experiences, we designed into Nicobar a script module loader that holds a graph of inter-dependent script modules. Under the hood, we use JBoss Modules (which is open source) to create java modules. JBoss modules represent powerful extensions to basic Java classloaders, allowing for arbitrarily complex classloader dependency graphs, including multiple parents. They also support sophisticated package filters that can be applied to incoming and outgoing dependency edges.

A script module provides an interface to retrieve the list of java classes held inside it. These classes can be instantiated and methods exercised on the instances, thereby “executing” the script module.

Script source and resource bundles are represented by script archives. Metadata for the archives is defined in the form of a script module specification, where script authors can describe the content language, inter-module dependencies, import and export filters for packages, as well as user specific metadata.

Script archive contents can be in source form and/or in precompiled form (.class files). At runtime, script archives are converted into script modules by running the archive through compilers and loaders that translate any source found into classes, and then loading up all classes into a module. Script compilers and loaders are pluggable, and out of the box, Nicobar comes with compilation support for Groovy 2, as well as a simple loader for compiled java classes.

Archives can be stored into and queried from archive repositories on demand, or via a continuous repository poller. Out of the box, Nicobar comes with a choice of file-system based or Cassandra based archive repositories. 

As the usage of a scripting system grows in scale, there is often the need for an administrative interface that supports publishing and modifying script archives, as well as viewing published archives. Towards this end, Nicobar comes with a manager and explorer subproject, based on Karyon and Pytheas.

Putting it all together

The diagram below illustrates how all the pieces work together.

Usage Example - Hello Nicobar!

Here is an example of initializing the Nicobar script module loader to support Groovy scripts.

Create a script archive

Create a simple groovy script archive, with the following groovy file:

Add a module specification file moduleSpec.json, along with the source:

Jar the source and module specification together as a jar file. This is your script archive.

Create a script module loader

Create an archive repository

If you have more than a handful of scripts, you will likely need a repository representing the collection. Let’s create a JarArchiveRepository, which is a repository of script archive jars at some file system path. Copy helloworld.jar into /tmp/archiveRepo to match the code below.

Hooking up the repository poller provides dynamic updates of discovered modules into the script module loader. You can wire up multiple repositories to a poller, which would poll them iteratively.

Execute script
Script modules can be retrieved out of the module loader by name (and an optional version). Classes can be retrieved from script modules by name, or by type. Nicobar itself is agnostic to the type of the classes held in the module, and leaves it to the application’s business logic to decide what to extract out and how to execute classes.

Here is an example of extracting a class implementing Callable and executing it:

At this point, any changes to the script archive jar will result in an update of the script module inside the module loader and new classes reflecting the update will be vended seamlessly!

More about the Module Loader

In addition to the ability to dynamically inject code, Nicobar’s module loading system also allows for multiple variants of a script module to coexist, providing for runtime selection of a variant. As an example, tracing code execution involves adding instrumentation code, which adds overhead. Using Nicobar, the application could vend classes from an instrumented version of the module when tracing is needed, while vending classes from the uninstrumented, faster version of the module otherwise. This paves the way for on demand tracing of code without having to add constant overhead on all executions. 

Module variants can also be leveraged to perform slow rollouts of script modules. When a module deployment is desired, a portion of the control flow can be directed through the new version of the module at runtime. Once confidence is gained in the new version, the update can be “completed”, by flushing out the old version and sending all control flow through the new module.

Static parts of an application may benefit from a modular classloading architecture as well. Large applications, loaded into a monolithic classloader can become unwieldy over time, due to an accumulation of unintended dependencies and tight coupling between various parts of the application. In contrast, loading components using Nicobar modules allows for well defined boundaries and fine-grained isolation between them. This, in turn, facilitates decoupling of components, thereby allowing them to evolve independently


We are excited by the possibilities around creating dynamic applications using Nicobar. As usage of the library grows, we expect to see various feature requests around access controls, additional persistence and query layers, and support for other JVM languages.

Project Jigsaw, the JDK’s native module loading system, is on the horizon too, and we are interested in seeing how Nicobar can leverage native module support from Jigsaw.

If these kinds of opportunities and challenges interest you, we are hiring and would love to hear from you!

Monday, February 2, 2015

SPS : the Pulse of Netflix Streaming

We want to provide an amazing experience to each member, winning the “moments of truth” where they decide what entertainment to enjoy.  To do that, we need to understand the health of our system.  To quickly and easily understand the health of the system, we need a simple metric that a diverse set of people can comprehend.  In this post we will discuss how we discovered and aligned everyone around one operational metric indicating service health, enabling us to streamline production operations and improve availability.  We will detail how we approach signal analysis, deviation detection, and alerting for that signal and for other general use cases.

Creating the Right Signal

In the early days of Netflix streaming, circa 2008, we manually tracked hundreds of metrics, relying on humans to detect problems.  Our approach worked for tens of servers and thousands of devices, but not for the thousands of servers and millions of devices that were in our future.  Complexity and human-reliant approaches don’t scale; simplicity and algorithm-driven approaches do.

We sought out a single indicator that closely approximated our most important activity: viewing.  We discovered that a server-side metric related to playback starts (the act of “clicking play”) had both a predictable pattern and fluctuated significantly when UI/device/server problems were happening.  The Netflix streaming pulse was created.  We named it “SPS” for “starts per second”.

Example of stream starts per second, comparing week over week (red = current week, black = prior week)

The SPS pattern is regular within each geographic region, with variations due to external influences like major regional holidays and events.  The regional SPS pattern is one cycle per day and oscillates in a rising and falling pattern.  The peaks occur in the evening and the troughs occur in the early morning hours.  On regional holidays, when people are off work and kids are home from school, we see increased viewing in the daytime hours, as our members have more free time to enjoy viewing a title on Netflix.

Because there is consistency in the streaming behaviors of our members, with a moderate amount of data we can establish reliable predictions on how many stream starts we should expect at any point of the week.  Deviations from the prediction are a powerful way to tell the entire company when there is a problem with the service.  We use these deviations to trigger alerts and help us understand when service has been fully restored.

Its simplicity allows SPS to be central to our internal vernacular.  Production problems are categorized as “SPS impacting” or “not SPS impacting,” indicating their severity.  Overall service availability is measured using expected versus actual SPS levels.

Deviation Detection Models

To maximize the power of SPS, we need reliable ways to find deviations in our actual stream starts relative to our expected stream starts. The following are a range of techniques that we have explored in detecting such deviations.

Static Thresholds

A common starting place when trying to detect change is to define a fixed boundary that characterizes normal behavior. This boundary can be described as a floor or ceiling which, if crossed, indicates a deviation from normal behavior. The simplicity of static thresholds makes them a popular choice when trying to detect a presence, or increase, in a signal. For example, detecting when there is an increase in CPU usage:

Example where a static threshold could be used to help detect high CPU usage

However, static thresholds are insufficient in accurately capturing deviations in oscillating signals. For example, a static threshold would not be suitable for accurately detecting a drop in SPS due to its oscillating nature.

Exponential Smoothing

Another technique that can be used to detect deviations is to use an exponential smoothing function, such as exponential moving average, to compute an upper or lower threshold that bounds the original signal. These techniques assign exponentially decreasing weights as the observations get older. The benefit of this approach is that the bound is no longer static and can “move” with the input signal, as shown in the image below:

Example of data smoothing using moving average

Another benefit is that exponential smoothing techniques take into account all past observations. In addition, exponential smoothing requires only the most recent observation to be kept. These aspects make it desirable for real-time alerting.

Double Exponential Smoothing

To detect a change in SPS behavior, we use Double Exponential Smoothing (DES) to define an upper and lower boundary that captures the range of acceptable behavior. This technique includes a parameter that takes into account any trend in the data, which works well for following the oscillating trend in SPS. There are more advanced smoothing techniques, such as triple exponential smoothing, which also take into account seasonal trends. However, we do not use these techniques as we are interested in detecting a deviation in behavior over a short period of time which does not contain a pronounced seasonal trend.

Before creating a DES model one must first select values for the data smoothing factor and the trend smoothing factor. To visualize the effect these parameters have on DES, see this interactive visualization. The estimation of these parameters is crucial as they can greatly affect accuracy. While these parameters are typically determined by an individual's intuition or trial and error, we have experimented with data-driven approaches to automatically initialize them (motivated by Gardner [1]). We are able to apply those identified parameters to signals that share similar daily patterns and trends, for example SPS at the device level.

The image below shows an example where DES has been used to create a lower bound in an attempt to capture a deviation in normal behavior. Shortly after 18:00 there is a drop in SPS which crosses the DES threshold, alerting us to a potential issue with our service. By alerting on this drop, we can respond and take actions to restore service health.

While DES accurately identifies a drop in SPS, it is unable to predict when the system has recovered. In the example below, the sharp recovery of SPS at approximately 20:00 is not accurately modeled by DES causing it to underpredict and generate false alarms for a short period of time:

In spite of these shortcomings, DES has been an effective mechanism for detecting actionable deviations in SPS and other operational signals.

Advanced Techniques

We have begun experimenting with Bayesian techniques in a stream mining setting to improve our ability to detect deviations in SPS. An example of this is Bayesian switchpoint detection and Markov Chain Monte Carlo (MCMC) [2]. See [3] for a concise introduction to using MCMC for anomaly detection and [4] for Bayesian switchpoint detection.

Bayesian techniques offer some advantages over DES in this setting. Those familiar with probabilistic programming techniques know that the underlying models can be fairly complex, but they can be made to be non-parametric by drawing parameters from uniform priors when possible. Using the posteriors from such calculations as priors for the next iteration allows us to create models that evolve as they receive more data.

Unfortunately, our experiments with Bayesian anomaly detection have revealed downsides compared to DES. MCMC is significantly more computationally intensive than DES, so much so that some are exploiting graphics cards in order to reduce the run time [5], a common technique for computationally intensive processes [6]. Furthermore the underlying model is not as easily interpreted by a human due to the complexity of the parameter interactions. These limitations, especially the performance related ones, restrict our ability to apply these techniques to a broad set of metrics in real time.

Bayesian techniques, however, do not solve the entire problem of data stream mining in a non-stationary setting. There exists a rich field of research on the subject of real-time data stream mining [7]. MCMC is by design a batch process, though it can be applied in a mini-batch fashion. Our current research is incorporating learnings from the stream-mining community in stream classification and drift detection. Additionally, our Data Science and Engineering team has been working on an approach based on Robust Principal Component Analysis (PCA) to deal with high cardinality data. We’re excited to see what comes from this research in 2015.


We have streamlined production operations and improved availability by creating a single directional metric that indicates service health: SPS. We have experimented with and used a number of techniques to derive additional insight from this metric including threshold-based alerting, exponential and double exponential smoothing, and bayesian and stream mining approaches. SPS is the pulse of Netflix streaming, focusing the minds at Netflix on ensuring streaming is working when you want it to be.

If you would like to join us in tackling these kinds of challenges, we are hiring!