Monday, February 27, 2012
Aegisthus - A Bulk Data Pipeline out of Cassandra
By Charles Smith and Jeff Magnusson
Our job in Data Science and Engineering is to consume all the data that Netflix produces and provide an offline batch-processing platform for analyzing and enriching it. As has been mentioned in previous posts, Netflix has recently been engaged in making the transition to serving a significant amount of data from Cassandra. As with any new data storage technology that is not easily married to our current analytics and reporting platforms, we needed a way to provide a robust set of tools to process and access the data.
With respect to the requirement of bulk processing, there are a couple very basic problems that we need to avoid when acquiring data. First, we don’t want to impact production systems. Second, Netflix is creating an internal infrastructure of decoupled applications, several of which are backed by their own Cassandra cluster. Data Science and Engineering needs to be able to obtain a consistent view of the data across the various clusters.
With these needs in mind and many of our key data sources rapidly migrating from traditional relational database systems into Cassandra, we set out to design a process to extract data from Cassandra and make it available in a generic form that is easily consumable by our bulk analytics platform. Since our desire is to retrieve the data in bulk, we rejected any attempts to query the production clusters directly. While Cassandra is very efficient at serving point queries and we have a lot of great APIs for accessing data here at Netflix, trying to ask a system for all of its data is generally not good for its long or short-term health.
Instead, we wanted to build an offline batch process for extracting the data. A big advantage to hosting our infrastructure on AWS is that we have access to effectively infinite, shared storage on S3. Our production Cassandra clusters are continuously backed up into a S3 bucket using our backup and recovery tool, Priam. Initially we intended to simply bring up a copy of each production cluster from Priam’s backups and extract the data via a Hadoop map-reduce job running against the restored Cassandra cluster. Working forward from that approach, we soon discovered that while it may be a feasible for one or two clusters, maintaining the number of moving parts required to deploy this solution to all of our production clusters was going to quickly become unmaintainable. It just didn’t scale.
So, is there a better way to do it?
Taking at step back, it became evident to us that the problem of achieving scale in this architecture was two-fold. First, the overhead of spinning up a new cluster in AWS and restoring it from a backup did not scale well with the number of clusters being pushed to production. Second, we were operating under the constraint that backups have to be restored into a cluster equal in size to production. As data sizes grow, there is not necessarily any motivation for production data sources to increase the number of nodes in their clusters (remember, they are not bulk querying the data – their workloads don’t scale linearly with respect to data size).
Thus, we were unable to leverage a key benefit of processing data on Hadoop – the ability to easily scale computing resources horizontally with respect to the size of your data.
We realized that Hadoop was an excellent fit for processing the data. The problem was that the Cassandra data was stored in a format that was not natively readable (sstables). Standing up Cassandra clusters from backups was simply a way to circumvent that problem. Rather than try to avoid the real issue, we decided to attack it head-on.
Aegisthus is Born
The end result was an application consisting of a constantly running Hadoop cluster capable of processing sstables as they are created by any Cassandra data source in our architecture. We call it Aegisthus, named in honor of Aegisthus’s relationship with Cassandra in Greek mythology.
Running on a single Hadoop cluster gives us the advantage of being able to easily and elastically scale a single computing resource. We were able to reduce the number of moving parts in our architecture while vastly increasing the speed at which we could process Cassandra’s data.
How it Works
A single map-reduce job is responsible for the bulk of the processing Aegisthus performs. The inputs to this map reduce job are sstables from Cassandra – either a full snapshot of a cluster (backup), or batches of sstables as they are incrementally archived by Priam from the Cassandra cluster. We process the sstables, reduce them into a single consistent view of the data, and convert it to JSON formatted rows that we stage out to S3 to be picked up by the rest of our analytics pipeline.
A full snapshot of a Cassandra cluster consists of all the sstables required to reconstitute the data into a new cluster that is consistent to the point at which the snapshot was taken. We developed an input format that is able to split the sstables across the entire Hadoop cluster, allowing us to control the amount of compute power we want to throw at the processing (horizontal scale). This was a welcome relief after trying to deal with timeout exceptions when directly using Cassandra as a data source for Hadoop input.
Each row-column value written to Cassandra is replicated and stored in sstables with a corresponding timestamp. The map phase of our job reads the sstables and converts them into JSON format. The reduce phase replicates the internal logic Cassandra uses to return data when it is queried with a consistency level of ALL (i.e. it reduces each row-column value based on the max timestamp).
Aegisthus is currently running on a Hadoop cluster consisting of 24 m2.4xlarge EC2 instances. In the table below, we show some benchmarks for a subset of the Cassandra clusters from which we are pulling data. The table below shows the number of nodes in the Cassandra cluster that houses the data, the average size of data per node, the total number of rows in the dataset, the time our map/reduce job takes to run, and the number or row/sec we are able to process.
The number of rows/sec is highly variable across data sources. This is due to a number of reasons, notably the average size of the rows and the average number of times a row is replicated across the sstables. Further, as can be seen in the last entry in the table, smaller datasets incur a noticeable penalty due to overhead in the map/reduce framework.
We’re constantly optimizing our process and have tons of other interesting and challenging problems to solve. Like what you see? We’re hiring!