Wednesday, March 11, 2015

Can Spark Streaming survive Chaos Monkey?

Netflix is a data-driven organization that places emphasis on the quality of data collected and processed. In our previous blog post, we highlighted our use cases for real-time stream processing in the context of online recommendations and data monitoring. With Spark Streaming as our choice of stream processor, we set out to evaluate and share the resiliency story for Spark Streaming in the AWS cloud environment.  A Chaos Monkey based approach, which randomly terminated instances or processes, was employed to simulate failures.

Spark on Amazon Web Services (AWS) is relevant to us as Netflix delivers its service primarily out of the AWS cloud. Stream processing systems need to be operational 24/7 and be tolerant to failures. Instances on AWS are ephemeral, which makes it imperative to ensure Spark’s resiliency.

Spark Components

Apache Spark is a fast and general-purpose cluster computing system. Spark can be deployed on top of Mesos, Yarn or Spark's own cluster manager, which allocates worker node resources to an application. Spark Driver connects to the cluster manager and is responsible for converting an application to a directed graph (DAG) of individual tasks that get executed within an executor process on the worker nodes.

Creating Chaos

Netflix streaming devices periodically send events that capture member activities, which plays a significant role in personalization. These events flow to our server side applications and are routed to Kafka. Our Spark streaming application consumes these events from Kafka and computes metrics. The deployment architecture is shown below:

Fig 2: Deployment Architecture

Our goal is to validate that there is no interruption in computing metrics when the different Spark components fail. To simulate such failures, we employed a whack-a-mole approach and killed the various Spark components.

We ran our spark streaming application on Spark Standalone. The resiliency exercise was run with Spark v1.2.0, Kafka v0.8.0 and Zookeeper v3.4.5.

Spark Streaming Resiliency

Driver Resiliency: Spark Standalone supports two modes for launching the driver application. In client mode, the driver is launched in the same process as the one where the client submits the application.  When this process dies, the application is aborted.  In cluster mode, the driver is launched from one of the worker process in the cluster.  Additionally, standalone cluster mode supports a supervise option that allows restarting the application automatically on non-zero exit codes.

Master Resiliency:  Spark scheduler uses the Master to make scheduling decisions.  To avoid single point of failure, it is best to setup a multi master standalone cluster. Spark uses Zookeeper for leader election. One of the master nodes becomes the ACTIVE node and all Worker nodes get registered to it. When this master node dies, one of the STANDBY master nodes becomes the ACTIVE node and all the Worker nodes get automatically registered to it. If there are any applications running on the cluster during the master failover, they still continue to run without a glitch.

Worker Process Resiliency: Worker process launches and monitors the Executor and Driver as child processes. When the Worker process is killed, all its child processes are also killed.  The Worker process gets automatically relaunched, which in turn restarts the Driver and/or the Executor process.

Executor Resiliency: When the Executor process is killed, they are automatically relaunched by the Worker process and any tasks that were in flight are rescheduled.

Receiver Resiliency: Receiver runs as a long running task within an Executor and follows the same resiliency characteristics of an executor.

The effect on the computed metrics due to the termination of various Spark components is shown below.

Fig 3: Behavior on Receive/Driver/Master failure

Driver Failure: The main impact is back-pressure built up due to a node failure, which results in a sudden drop in message processing rate, followed by a catch up spike, before the graph settles into steady state.

Receiver Failure: The dip in computed metrics was due to the fact that default Kafka receiver is an unreliable receiver.  Spark streaming 1.2 introduced an experimental feature called write ahead logs that would make the kafka receiver reliable.  When this is enabled, applications would incur a hit to Kafka receiver throughput.  However, this could be addressed by increasing the number of receivers.


The following table summarizes the resiliency characteristics of different Spark components:

Behaviour on Component Failure
Client Mode: The entire application is killed
Cluster Mode with supervise: The Driver is restarted on a different Worker node
Single Master: The entire application is killed
Multi Master: A STANDBY master is elected ACTIVE
Worker Process
All child processes (executor or driver) are also terminated and a new worker process is launched
A new executor is launched by the Worker process
Same as Executor as they are long running tasks inside the Executor
Worker Node
Worker, Executor and Driver processes run on Worker nodes and the behavior is same as killing them individually

We uncovered a few issues (SPARK-5967, SPARK-3495, SPARK-3496, etc.) during this exercise, but Spark Streaming team was helpful in fixing them in a timely fashion. We are also in the midst of performance testing Spark and will follow up with a blog post.

Overall, we are happy with the resiliency of spark standalone for our use cases and excited to take it to the next level where we are working towards building a unified Lambda Architecture that involves a combination of batch and real-time streaming processing.  We are in early stages of this effort, so if you interested in contributing in this area, please reach out to us.