Today we are happy to announce that Aegisthus, our map/reduce program for reading Cassandra SSTables, is open source. Aegisthus is the tool we use daily to make the data stored in Cassandra available for analysis on our big data platform. We originally wrote about Aegisthus nearly two years ago, so today we want to talk a little about what has changed since then.
The initial iteration of Aegisthus was designed in a manner very similar to other typical applications in the Netflix infrastructure. Thus Aegisthus was not just a map/reduce job, but also a Hadoop cluster that could be launched via Asgard, as well as all the tools used to configure and schedule the data conversion. Aegisthus was used as a pipeline for the dimensional data we hold in Cassandra, with the results being data that we would consume in our big data environment. Conceptually, we thought about Aegisthus as similar to Suro, our recently open sourced event data pipeline, when thinking about making data available for batch processing.
Since our initial blog post describing Aegisthus, we have invested heavily in building and refining a data platform that facilitates easy and efficient access to and analysis of our data. Genie and Franklin are two of the core services of this platform. Genie- also part of NetflixOSS - is a building block for many of our higher level applications that need to launch Hadoop jobs in our architecture. Franklin is the internal name for our metadata service, and is highly leveraged by tools we build to provide access to various datasets. It provides a standard set of metadata for data across a variety of sources (e.g. MySQL, S3, Hive, Cassandra, etc) which includes schema, location, and serialization format. However, it also allows adding arbitrary additional information as needed. Using these two building blocks we constructed a platform of tools that generalize the different needs of our users.
Earlier this year when we revisited Aegisthus, we realized that the process had much more in common with the tools that we were writing on top of our core data platform services than with a stand-alone application. First, we refactored Aegisthus to leverage Genie to launch as a job on our EMR based Hadoop clusters rather than run on a dedicated Hadoop cluster. This gives us much better flexibility in preparing for and scaling to additional load. Additionally, it brings Aegisthus in line with our other batch processing jobs, giving a central place to find information about the jobs that have run.
Next, we refactored the configuration of Aegisthus jobs to become additional metadata stored in Franklin. Franklin gives us two benefits. One is that the information about where a dataset originates is stored with the metadata on how to consume the dataset. This gives us a clear data lineage whenever there are questions that need to be directed to the owners of the source systems. Secondly, since Franklin provides the metadata that explains how to consume the datasets that Aegisthus produces, we can utilize the same names both for scheduling the Aegisthus map/reduce job and for the downstream consumers of the datasets in Pig.
The flow of Cassandra data
To get an idea of how this all fits together it is best to consider how information moves from Cassandra into our big data environment. The figure below shows the data flow of our data from Cassandra SSTables to json formatted data available in S3 for downstream batch analytics.
Priam is responsible for backing up Cassandra SSTables. It does that both incrementally, as SSTables are flushed to disk, and nightly as a full snapshot of all nodes.
Aegisthus reads the SSTables required to create the dataset. For full backup we will choose the SSTables for a single day. For an incremental backup we will use the most recently flushed files and apply them to our latest json dataset.
Aegisthus compacts all the records into a single view of the data by removing all duplicate records. To handle eventual consistency, where data differs, Aegisthus keeps the column values with the most recent timestamp. Finally, it serializes the data into a format that can be consumed by our batch processing systems. That format is currently a line of json per record.
The workflow for running Aegisthus map/reduce jobs is fairly simple using Franklin and Genie:
The scheduler queries Franklin for information about the job.
The scheduler submits the Aegisthus job to Genie.
Genie chooses a cluster to run the job on and the data gets transformed.
Together these tools make it simple for the person who needs to use the data to convert it into a format more suitable for additional processing.
How Aegisthus handles Cassandra’s data
At the heart of Aegisthus is the SSTableReader. We wanted a tool that acted like the sstable2json utility that ships with Cassandra, but would work on streams of data. Additionally, we needed to compact the data to eliminate the duplication caused by Cassandra keeping multiple replicas of each row. Rather than using Cassandra’s own internal file reader, we instead wrote our own process to read the files. While this does mean that we have to maintain some additional code as the file formats change, we gain a speed increase (which in practice seems to be about 2-3 times) as we don’t have the overhead of the work Cassandra does to optimize for its own access patterns.
In our map/reduce job we create an input format that wraps the SSTableReader. The main purpose of the input format is to split large SSTables, which significantly speeds processing.
Most of the time we don’t want to process the full results of a Priam backup, since most of the data doesn’t change from day to day. Instead, we process the incremental files that Priam archives, much as Priam would itself if recovering a Cassandra node from backup. To achieve this, the input format for Aegisthus is also capable of reading the json formatted dataset produced as a result of the previous Aegisthus job. In our experience, by compacting together the current json dataset with the incremental data, we process about 10-20 times less data each day.
There is a downside to processing incrementally. Currently, to be as efficient as possible, we try to avoid inspecting the data when not needed. For example, when there is no new incremental data for a given row, there typically would be no need to check any of the columns. However, this does not account for expiring tombstones or data that has a TTL (time to live). To deal with this, our solution has been to periodically resync the data back to a full snapshot produced by Priam. While this isn’t ideal, it is a relatively painless solution and something we would need to do anyway as a verification of the data.
Aegisthus has been a fun project. It has become considerably smaller than its original implementation, but it is even more important today than when we first built it during our migration from Oracle to Cassandra.
Aegisthus is now processing more than 100 datasets daily, representing more than 20TB of incremental SSTables. If you would like to get involved with or contribute to the project, please check it out on Github. Or, if you have a passion for movies and big data infrastructure, check out http://jobs.netflix.com.