Wednesday, March 11, 2015

Can Spark Streaming survive Chaos Monkey?

Netflix is a data-driven organization that places emphasis on the quality of data collected and processed. In our previous blog post, we highlighted our use cases for real-time stream processing in the context of online recommendations and data monitoring. With Spark Streaming as our choice of stream processor, we set out to evaluate and share the resiliency story for Spark Streaming in the AWS cloud environment.  A Chaos Monkey based approach, which randomly terminated instances or processes, was employed to simulate failures.

Spark on Amazon Web Services (AWS) is relevant to us as Netflix delivers its service primarily out of the AWS cloud. Stream processing systems need to be operational 24/7 and be tolerant to failures. Instances on AWS are ephemeral, which makes it imperative to ensure Spark’s resiliency.

Spark Components

Apache Spark is a fast and general-purpose cluster computing system. Spark can be deployed on top of Mesos, Yarn or Spark's own cluster manager, which allocates worker node resources to an application. Spark Driver connects to the cluster manager and is responsible for converting an application to a directed graph (DAG) of individual tasks that get executed within an executor process on the worker nodes.

Creating Chaos

Netflix streaming devices periodically send events that capture member activities, which plays a significant role in personalization. These events flow to our server side applications and are routed to Kafka. Our Spark streaming application consumes these events from Kafka and computes metrics. The deployment architecture is shown below:

Fig 2: Deployment Architecture

Our goal is to validate that there is no interruption in computing metrics when the different Spark components fail. To simulate such failures, we employed a whack-a-mole approach and killed the various Spark components.

We ran our spark streaming application on Spark Standalone. The resiliency exercise was run with Spark v1.2.0, Kafka v0.8.0 and Zookeeper v3.4.5.

Spark Streaming Resiliency

Driver Resiliency: Spark Standalone supports two modes for launching the driver application. In client mode, the driver is launched in the same process as the one where the client submits the application.  When this process dies, the application is aborted.  In cluster mode, the driver is launched from one of the worker process in the cluster.  Additionally, standalone cluster mode supports a supervise option that allows restarting the application automatically on non-zero exit codes.

Master Resiliency:  Spark scheduler uses the Master to make scheduling decisions.  To avoid single point of failure, it is best to setup a multi master standalone cluster. Spark uses Zookeeper for leader election. One of the master nodes becomes the ACTIVE node and all Worker nodes get registered to it. When this master node dies, one of the STANDBY master nodes becomes the ACTIVE node and all the Worker nodes get automatically registered to it. If there are any applications running on the cluster during the master failover, they still continue to run without a glitch.

Worker Process Resiliency: Worker process launches and monitors the Executor and Driver as child processes. When the Worker process is killed, all its child processes are also killed.  The Worker process gets automatically relaunched, which in turn restarts the Driver and/or the Executor process.

Executor Resiliency: When the Executor process is killed, they are automatically relaunched by the Worker process and any tasks that were in flight are rescheduled.

Receiver Resiliency: Receiver runs as a long running task within an Executor and follows the same resiliency characteristics of an executor.

The effect on the computed metrics due to the termination of various Spark components is shown below.

Fig 3: Behavior on Receive/Driver/Master failure

Driver Failure: The main impact is back-pressure built up due to a node failure, which results in a sudden drop in message processing rate, followed by a catch up spike, before the graph settles into steady state.

Receiver Failure: The dip in computed metrics was due to the fact that default Kafka receiver is an unreliable receiver.  Spark streaming 1.2 introduced an experimental feature called write ahead logs that would make the kafka receiver reliable.  When this is enabled, applications would incur a hit to Kafka receiver throughput.  However, this could be addressed by increasing the number of receivers.


The following table summarizes the resiliency characteristics of different Spark components:

Behaviour on Component Failure
Client Mode: The entire application is killed
Cluster Mode with supervise: The Driver is restarted on a different Worker node
Single Master: The entire application is killed
Multi Master: A STANDBY master is elected ACTIVE
Worker Process
All child processes (executor or driver) are also terminated and a new worker process is launched
A new executor is launched by the Worker process
Same as Executor as they are long running tasks inside the Executor
Worker Node
Worker, Executor and Driver processes run on Worker nodes and the behavior is same as killing them individually

We uncovered a few issues (SPARK-5967, SPARK-3495, SPARK-3496, etc.) during this exercise, but Spark Streaming team was helpful in fixing them in a timely fashion. We are also in the midst of performance testing Spark and will follow up with a blog post.

Overall, we are happy with the resiliency of spark standalone for our use cases and excited to take it to the next level where we are working towards building a unified Lambda Architecture that involves a combination of batch and real-time streaming processing.  We are in early stages of this effort, so if you interested in contributing in this area, please reach out to us.

Monday, March 9, 2015

Netflix Hack Day - Winter 2015

Last week, we hosted the latest Netflix Hack Day. Hack Day is a way for our product development teams to get away from everyday work. It's a fun, experimental, collaborative, and creative outlet.

This time, we had about 70 hacks that were produced by more than 150 engineers and designers. We shared a few examples below to give you a taste, and you can see some of our past hacks in our posts for Feb. 2014 & Aug. 2014. Note that while we think these hacks are very cool and fun, they may never become part of the Netflix product, internal infrastructure, or otherwise be used beyond Hack Day. We are surfacing them here publicly to share the spirit of the event.

Thanks to all of the hackers for putting together some incredible work in just 24 hours. If you’re interested in being a part of our next Hack Day, let us know by checking out our jobs site!

Netflix Earth
Netflix Earth is an animated 3D globe showing real-time global playback activity.

BEEP (Binge Encouragement and Enforcement Platform)
BEEP actively and abrasively encourages users to continue watching Netflix when their attention starts to stray.

In a world... where devices proliferate… darNES digs back in time to provide Netflix access to the original Nintendo Entertainment System.

Say Whaaat!!!
Say Whaaat!!! provides a more convenient way to to catch missed dialogue as you watch Netflix by displaying subtitles when you pause playback. It also provides the ability to navigate a film's timeline, caption by caption.

Net the Netflix Cheats
Don’t let your partners watch when you aren't around. Net the Netflix Cheats requires dual PIN access to watch titles that you and your partner have agreed to watch together.

And here are some pictures taken during the event.

Thursday, February 19, 2015

RAD - Outlier Detection on Big Data

Outlier detection can be a pain point for all data driven companies, especially as data volumes grow. At Netflix we have multiple datasets growing by 10B+ record/day and so there’s a need for automated anomaly detection tools ensuring data quality and identifying suspicious anomalies. Today we are open-sourcing our outlier detection function, called Robust Anomaly Detection (RAD), as part of our Surus project.

As we built RAD we identified four generic challenges that are ubiquitous in outlier detection on “big data.”

  • High cardinality dimensions: High cardinality data sets - especially those with large combinatorial permutations of column groupings - makes human inspection impractical.
  • Minimizing False Positives: A successful anomaly detection tool must minimize false positives. In our experience there are many alerting platforms that “sound an alarm” that goes ultimately unresolved. The goal is to create alerting mechanisms that can be tuned to appropriately balance noise and information.
  • Seasonality: Hourly/Weekly/Bi-weekly/Monthly seasonal effects are common and can be mis-identified as outliers deserving attention if not handled properly. Seasonal variability needs to be ignored.
  • Data is not always normally distributed: This has been a particular challenge since Netflix has been growing over the last 24 months. Generally though, an outlier tool must be robust so that it works on data that is not normally distributed.

In addition to addressing the challenges above, we wanted a solution with a generic interface (supporting application development). We met these objectives with a novel algorithm encased in a wrapper for easy deployment in our ETL environment.


We initially tested techniques like moving averages with standard deviations and time series/regression models (ARIMAX) but found that these simpler methods were not robust enough in high cardinality data.

The algorithm we finally settled on uses Robust Principal Component Analysis (RPCA) to detect anomalies. PCA uses the Singular Value Decomposition (SVD) to find low rank representations of the data. The robust version of PCA (RPCA) identifies a low rank representation, random noise, and a set of outliers by repeatedly calculating the SVD and applying “thresholds” to the singular values and error for each iteration. For more information please refer to the original paper by Candes et al. (2009).

Below is an interactive visualization of the algorithm at work on a simple/random dataset and on public climate data.

Pig Wrapper

Since Apache Pig is the primary ETL language at Netflix, we wrapped this algorithm in a Pig function enabling engineers to easily use it with just a few additional lines of code. We’ve open-sourced both the Java function that implements the algorithm and the Pig wrapper. The details and a sample application (with code) can be found here.

Business Application

The following are two popular applications where we initially implemented this anomaly detection system at Netflix with great success.

Netflix processes millions of transactions every day across tens of thousands of banking institutions/infrastructures in both real-time and batch environments. We’ve used the above solution to detect anomalies in failures in the payment network at a bank level. With the above system, business managers were able to follow up with their counterparts in the payment industry and thereby reducing the impact on Netflix customers

Our signup flow was another important point of application. Today Netflix customers sign up across the world on hundreds of different types of browsers or devices. Identifying anomalies across unique combinations of country, browser/device and language helps our engineers understand and react to customer sign up problems in a timely manner.


A robust algorithm is paramount to the success of any anomaly detection system and RPCA has worked very well for detecting anomalies. Along with the algorithm, our focus on simplifying the implementation with a Pig wrapper made the tool a great success. The applications listed above have helped the Netflix data teams understand and react to anomalies faster--which reduces the impact to Netflix customers and our overall business.

Wednesday, February 18, 2015

A Microscope on Microservices

by Coburn Watson, Scott Emmons, and Brendan Gregg

At Netflix we pioneer new cloud architectures and technologies to operate at massive scale - a scale which breaks most monitoring and analysis tools. The challenge is not just handling a massive instance count but to also provide quick, actionable insight for a large-scale, microservice-based architecture. Out of necessity we've developed our own tools for performance and reliability analysis, which we've also been open-sourcing (e.g., Atlas for cloud-wide monitoring). In this post we’ll discuss tools that the Cloud Performance and Reliability team have been developing, which are used together like a microscope switching between different magnifications as needed.

Request Flow (10X Magnification)

We'll start at a wide scale, visualizing the relationship between microservices to see which are called and how much time is spent in each:

Using an in-house dapper-like framework, we are able to layer the request demand through the aggregate infrastructure onto a simple visualization. This internal utility, Slalom, allows a given service to understand upstream and downstream dependencies, their contribution on service demand, and the general health of said requests. Data is initially represented through d3-based Sankey diagrams, with a detailed breakdown on absolute service demand and response status codes.
This high-level overview gives a general picture of all the distributed services that are composed to satisfy a request. The height of each service node shows the amount of demand on that service, with the outgoing links showing demand on a downstream service relative to its siblings.

Double-clicking a single service exposes the bi-directional demand over the time window:

The macro visualization afforded by Slalom is limited based on the data available in the underlying metrics that are sampled. To bring into focus additional metric dimensions beyond simple IPC interactions we built another tool, Mogul.

Show me my bottleneck! (100X)

The ability to decompose where time is spent both within and across the fleet of microservices can be a challenge given the number of dependencies.  Such information can be leveraged to identify the root cause of performance degradation or identify areas ripe for optimization within a given microservice.  Our Mogul utility consumes data from Netflix’s recently open-sourced Atlas monitoring framework, applies correlation between metrics, and selects those most likely to be responsible for changes in demand on a given microservice. The different resources evaluated include:
  • System resource demand (CPU, network, disk)
  • JVM pressure (thread contention, garbage collection)
  • Service IPC calls
  • Persistency-related calls (EVCache, Cassandra)
  • Errors and timeouts
It is not uncommon for a mogul query to pull thousands of metrics, subsequently reducing to tens of metrics through correlation with system demand. In the following example, we were able to quickly identify which downstream service was causing performance issues for the service under study. This particular microservice has over 40,000 metrics. Mogul reduced this internally to just over 2000 metrics via pattern matching, then correlated the top 4-6 interesting metrics grouped into classifications.

The following diagram displays a perturbation in the microservice response time (blue line) as it moves from ~125 to over 300 milliseconds.  The underlying graphs identifies those downstream calls that have a time-correlated increase in system demand.

Like Slalom, Mogul uses Little’s Law - the product of response time and throughput - to compute service demand.

My instance is bad ... or is it? (1000X)

Those running on the cloud or virtualized environments are not unfamiliar with the phrase “my instance is bad.” To evaluate if a given host is unstable or under pressure it is important to have the right metrics available on-demand and at a high resolution (5 seconds or less).  Enter Vector, our on-instance performance monitoring framework which exposes hand-picked, high-resolution system metrics to every engineer’s browser.  Leveraging the battle tested system monitoring framework Performance Co-Pilot (pcp) we are able to layer on a UI that polls instance-level metrics between every 1 and 5 seconds.  

This resolution of system data exposes possible multi-modal performance behavior not visible with higher-level aggregations.  Many times a runaway thread has been identified as a root cause of performance issues while overall CPU utilization remains low.  Vector abstracts away the complexity typically required with logging onto a system and running a large number of commands from the shell.

A key feature of Vector and pcp is extensibility.  We have created multiple custom pcp agents to expose additional key performance views.  One example is a flamegraph generated by sampling the on-host Java process using jstack.  This view allows an engineer to quickly drill on where the Java process is spending CPU time.

Next Steps..To Infinity and Beyond

The above tools have proved invaluable in the domain of performance and reliability analysis at Netflix, and  we are looking to open source Vector in the coming months. In the meantime we continue to extend our toolset by improving instrumentation capabilities at a base level. One example is a patch on OpenJDK which allows the generation of extended stack trace data that can be used to visualize system-through-user space time in the process stack.


It quickly became apparent at Netflix’s scale that viewing the performance of the aggregate system through a single lens would be insufficient.  Many commercial tools promise a one-stop shop but have rarely scaled to meet our needs.  Working from a macro-to-micro view, our team developed tools based upon the use cases we most frequently analyze and triage.  The result is much like a microscope which lets engineering teams select the focal length that most directly targets their dimension of interest.

As one engineer on our team puts it, “Most current performance tool methodologies are so 1990’s.” Finding and dealing with future observability challenges is key to our charter, and we have the team and drive to accomplish it.  If you would like to join us in tackling this kind of work, we are hiring!