Monday, December 9, 2013

Announcing Suro: Backbone of Netflix's Data Pipeline




Suro: Netflix's Data Pipeline
To make the best business and technical decisions, it is critical for Netflix to reliably collect application specific data in a timely fashion. At Netflix we deploy a fairly large number of AWS EC2 instances that host our web services and applications. They collectively emit more than 1.5 million events per second during peak hours, or around 80 billion events per day. The events could be log messages, user activity records, system operational data, or any arbitrary data that our systems need to collect for business, product, and operational analysis.

Given that data is critical to our operations and yet we allow applications to generate arbitrary events, our data pipeline infrastructure needs to be highly scalable, always available, and deliver events with minimal latency, which is measured as elapsed time between the moment when an event is emitted and when the event is available for consumption by its consumers. And yes, the data pipeline needs to be resilient to our own Simian Army, particularly the Chaos Monkeys.

While various web services and applications produce events to Suro, many kinds of consumers may process such data differently. For example, our Hadoop clusters run MapReduce jobs on the collected events to generate offline business reports. Our event stream clusters generate operational reports to reflect real-time trends. Since we may dispatch events to different consumers based on changing needs, our data pipeline also needs to be dynamically configurable.  

Suro, which we are proud to announce as our latest offering as part of the NetflixOSS family, serves as the backbone of our data pipeline. It consists of a producer client, a collector server, and plugin framework that allows events to be dynamically filtered and dispatched to multiple consumers.

History of Suro

Suro has its roots in Apache Chukwa, which was initially adopted by Netflix. The current incarnation grew out of what we learned from meeting the operational requirements of running in production over the past few years. The following are notable modifications compared to Apache Chukwa:
  • Suro supports arbitrary data formats. Users can plug in their own serialization and deserialization code
  • Suro instruments many tagged monitoring metrics to make itself operations friendly
  • Suro integrates with NetflixOSS to be cloud friendly
  • Suro supports dispatching events to multiple destinations with dynamic configuration
  • Suro supports configurable store-and-forward on both client and collector

Overall Architecture

The figure below illustrates the overall architecture of Suro. It is the single data pipeline that collects events generated by Netflix applications running in either AWS cloud or Netflix data centers. Suro also dispatches events to multiple destinations for further processing.



Such arrangement supports two typical use cases: batched processing, and real-time computation.


Batch Processing


Many analytical reports are generated by Hadoop jobs. In fact, Suro was initially deployed just to collect data for our Big Data Platform team’s Hadoop clusters. For this, Suro aggregates data into Hadoop sequence files, and uploads them into designated S3 buckets. A distributed demuxing cluster demuxes the events in the S3 buckets to prepare them for further processing by Hadoop jobs. We hope to open source the demuxer in the next few months. Our Big Data Platform team has already open sourced pieces of our data infrastructure, such as Lipstick and Genie. Others will be coming soon. A previous post titled Hadoop Platform as a Service in the Cloud covers this in detail.

Real-Time Computation


While offline/batch processing the events still form the bulk of our consumer use cases, the more recent trend has been in the area of real-time stream processing. Stream consumers are typically employed to generate instant feedback, exploratory analysis, and operational insights. Log Summaries of application-generated log data is an example that falls under this bucket. The following graph summarizes how log events flow from applications to two different classes of consumers.



  1. Applications emits events to Suro. The events include log lines.
  2. Suro dispatches all the events to S3 by default. Hadoop jobs will process these events.
  3. Based on a dynamically configurable routing rule, Suro also dispatches these log events to a designated Kafka cluster under a mapped topic. 
  4. Druid cluster indexes the log lines on the fly, making them immediately available for querying. For example, our service automatically detects error surges for each application within a 10 minute window and sends out alerts to application owners:
    Application owners can then go to Druid’s UI to explore such errors:
  5. A customized ElasticSearch cluster also ingests the same sets of log lines, de-duplicates them, and makes them immediately available for querying. Users are able to jump from an aggregated view on the Druid UI to individual records on ElasticSearch’s Kibana UI to see exactly what went wrong.


Of course, this is just one example of making use of real-time analysis. We are also actively looking into various technologies such as Storm and Apache Samza to apply iterative machine learning algorithms on application events.


Suro Collector In Detail

The figure below zooms into the design of the Suro Collector service. The design is similar to SEDA. Events are processed asynchronously in stages. Events are offered to a queue first in each stage, and a pool of threads consume the events asynchronously from the queue, process them, and sends them off to the next stage.

The main processing flow is as follows:
  1. Client buffers events in a buffer called message set, and sends buffered messages to Suro Collector.
  2. Suro Collector takes each incoming message set, deflates it if possible, and immediately returns after handing the message set to Message Set Processor.
  3. The Message Set Processor puts the message set into a queue, and has a pool of  Message Router threads that routes messages asynchronously.
  4. A Message Router determines which sink a message should go to. If there’s a filter configured for a message, the message payload will be deserialized.
  5. Each sink maintains its own queue, and sends messages to designated configurations asynchronously.

Performance Measurement

Here are some results from simple stress tests.

Test setup


Collector Cluster Size
Client Cluster Size
Message Batch Size
Message Size
Sink Config
- 3 m1.xlarge instances
- OS: Linux
- JDK: 1.7.0_25
- 6 m1.xlarge instances
- OS: Linux
- JDK: 1.7.0_25
1000
300 Bytes
- S3 Sink
- Batch: 1000
- Format: sequence file
- Compression: LZO
- Write to disk first: Yes
- Disk Type: EBS
- Notice type: SQS


Test Results


The following table summarizes the test result after warm-up:
Total Message Count
Total Message Size
Peak Throughput Per instance
Average Throughput
705 Million
211 GB
66 K msg/sec
60 K msg/sec


Suro Roadmap

The version open sourced today has the following components.
  • Suro Client
  • Suro Server
  • Kafka Sink and S3 Sink
  • Three Types of Message Filters


In the coming months, we will describe and open source other parts of the pipeline. We would love to collaborate with other solutions in the community in this domain and hope that Suro, Genie, Lipstick etc. provide some of the answers in this highly evolving and popular technology space.

Other Event Pipelines  

Suro evolved over the past few years alongside many other powerful data pipeline solutions such as Apache Flume and Facebook Scribe. Suro has overlapping features with these systems. The strength of Suro is that it is well integrated into AWS and especially the ecosystem of NetflixOSS, to support Amazon Auto Scaling, Netflix Chaos Monkey, and dynamic dispatching of events based on user defined rules. In particular,
  • Suro client has built-in load balancer that is aware of Netflix Eureka, while Suro server also integrates with Netflix Eureka. Therefore, both Suro servers and applications that use Suro client can be auto scaled.
  • Suro server uses EBS and file-backed queues to minimize message loss during unexpected EC2 termination.
  • Suro server is able to push messages to arbitrary consumers at runtime with the help of Netflix Archaius. Users can declaratively configure Suro server at runtime to dispatch events to multiple destinations, such as Apache Kafka, SQS, S3, and any HTTP endpoint. The dispatching can be done in either batches or real time.


Summary

Suro has been the backbone of the data pipeline at Netflix for a few years and has evolved to handle many of the typical use cases that any Big Data infrastructure aims to solve.
In this article, we have described the top level architecture, the use cases, and some of the components that form the overall data pipeline infrastructure at Netflix. We are happy to open source Suro and welcome inputs, comments and involvement from the open source community.


If building critical big data infrastructure is your interest and passion, please take a look at http://jobs.netflix.com.