Thursday, January 16, 2014

Improving Netflix’s Operational Visibility with Real-Time Insight Tools

By Ranjit Mavinkurve, Justin Becker and Ben Christensen



For Netflix to be successful, we have to be vigilant in supporting the tens of millions of connected devices that are used by our 40+ million members throughout 40+ countries. These members consume more than one billion hours of content every month and account for nearly a third of the downstream Internet traffic in North America during peak hours.
From an operational perspective, our system environments at Netflix are large, complex, and highly distributed. And at our scale, humans cannot continuously monitor the status of all of our systems. To maintain high availability across such a complicated system, and to help us continuously improve the experience for our customers, it is critical for us to have exceptional tools coupled with intelligent analysis to proactively detect and communicate system faults and identify areas of improvement.


In this post, we will talk about our plans to build a new set of insight tools and systems that create greater visibility into our increasingly complicated and evolving world.

Extending Our Current Insight Capabilities

Our existing insight tools include dashboards that display the status of our systems in near real time, and alerting mechanisms that notify us of major problems. While these tools are highly valuable, we have the opportunity to do even better.

Perspective
Many of our current insight tools are systems-oriented. They are built from the perspective of the system providing the metrics. This results in a proliferation of custom tools and views that require specialized knowledge to use and interpret. Also, some of these tools tend to focus more on system health and not as much on our customers’ streaming experience. 

Instead, what we need is a cohesive set of tools that provide relevant insight, effective visualization of that insight, and smooth navigability from the perspective of the tools’ consumers. The consumers of these tools are internal staff members such as engineers who want to view the health of a particular part of our system or look at some aspect of our customers’ streaming experience. To reduce the time needed to detect, diagnose, and resolve problems, it is important for these tools to be highly effective.

Context

When attempting to deeply understand a particular part of the system, or when troubleshooting a problem, it is invaluable to have access to accurate, up-to-date information and relevant context. Rich and relevant context is a highly desirable feature to have in our insight tools.


Consider the following example. Our current insight tools provide ways to visualize and analyze a metric and trigger an alert based on the metric. We use this to track stream-starts-per-second (SPS), a metric used to gauge user engagement. If SPS rises above or drops below its "normal" range, our monitoring and alerting system triggers an alert. This helps us detect that our customers are unable to stream. However, since the system does not provide context around the alert, we have to use a variety of other means to diagnose the problem, delaying resolution of the problem. Instead, suppose we had a tool tracking all changes to our environment. Now, if an alert for SPS were triggered, we could draw correlations between the system changes and the alert to provide context to help troubleshoot the problem. 

Alerts are by no means the only place where context is valuable. In fact, the insight provided by virtually any view or event is significantly more effective when relevant context is provided. 

Automation
While we need to be able to analyze and identify interesting or unusual behavior and surface any anomalies, we also need for this to happen automatically and continuously. Further, not only should it be possible to add new anomaly detection or analysis algorithms easily, but also the system itself should be able to create and apply new rules and actions for these algorithms dynamically via machine learning.

Ad Hoc Queries
In the streaming world, we have multiple facets like customer, device, movie, region, ISP, client version, membership type, etc., and we often need quick insight based on a combination of these facets. For example, we may want to quickly view the current time-weighted average bitrate for content delivered to Xbox 360s in Mexico on Megacable ISP. An insight system should be able to provide quick answers to such questions in order to enable quick analysis and troubleshooting.

Dynamic Visualizations
Our existing insight tools have dashboards with time-series graphs that are very useful and effective. With our new insight tools, we want to take our tools to a whole new level, with rich, dynamic data visualizations that visually communicate relevant, up-to-date details of the state of our environments for any operational facets of interest. For example, we want to surface interesting patterns, system events, and anomalies via visual cues within a dynamic timeline representation that is updated in near real-time.

Cohesive Insight
We have many disparate insight tools that are owned and used by separate teams. What we need is a set of tools that allow for shared communication, insights, and context in a cohesive manner. This is vital for the smooth operation of our complex, dynamic operational environments. We need a single, convenient place to communicate anomalous and informational changes as well as user-provided context.

The New Way

With our next generation of insight tools, we have the opportunity to create new and transformative ways to effectively deliver insights and extend our existing insight capabilities. We plan to build a new set of tools and systems for operational visibility that provide the insight features and capabilities that we need.

Timeliness Matters

Operational insight is much more valuable when delivered immediately. For example, when a streaming outage occurs, we want to take remedial action as soon as possible, to minimize downtime. We want anomaly detection to be automatic, but we also want it quickly, in near real-time. For ad hoc analysis as well, quick insight is the key to troubleshooting a problem and getting to a fast resolution.

Event Stream Processing

Given the importance of timeliness, a key piece of technology for the backend system for our new set of insight tools for operational visibility will be dynamic, real-time Event Stream Processing (aka Complex Event Processing) with the ability to run ad hoc dynamic queries on live event streams. Event processing enables one to identify meaningful events, such as opportunities or threats, and respond to them as quickly as possible. This fits the goals of our insight tools very well.

Scaling Challenge
Our massive scale poses an interesting challenge with regard to data scaling. How can we track many millions of metrics with fine-grained context and also support fast ad hoc queries on live data while remaining cost-effective? If we persisted every permutation of each metric along with all of its associated dimensions for all the events flowing through our system, we could end up with trillions of values, taking up several TBs of disk space every day. 

Since the value of fine-grained insight data for operational visibility is high when the data is fresh but diminishes over time, we need a system that exploits these characteristics of insight data and has the ability to make recent data available quickly, without having to persist all of the data. 

We have the opportunity to build an innovative and powerful stream processing system that meets our insight requirements, such as support for ad hoc faceted queries on live streams of big data, and has the ability to scale dynamically to handle multiple, simultaneous queries.

A Picture is Worth a Thousand Words

Good visualization helps to communicate and deliver insights effectively. As we develop our new insight tools for operational visibility, it is vital that the front-end interface to this system provide dynamic data visualizations that can communicate the insights in a very effective manner. As mentioned earlier, we want to tailor the insights and views to meet the needs of the tool’s consumers.


We envision the front-end to our new operational insight tool to be an interactive, single-page web application with rich and dynamic data visualizations and dashboards updated in real-time. The design below is a mockup (with fake data) for one of the views.


There are several components within the design: a top level navigation bar to switch between different views in the system, a breadcrumbs component highlighting the selected facets, a main view module (a map in this instance), a key metrics component, a timeline and an incident view, on the right side of the screen. The main view communicates data based on the selected facets.  


The mockup shown above (with fake data) represents another view in the system and displays routes in our edge tier with request rates, error rates and other key metrics for each route updated in near real-time.


All views in the system are dynamic and reflect the current operational state based on selected facets. A user can modify the facets and immediately see changes to the user interface.

Summary

Operational visibility with real-time insight enables us to deeply understand our operational systems, make product and service improvements, and find and fix problems quickly so that we can continue to innovate rapidly and delight our customers at every interaction. We are building a new set of tools and systems for operational visibility at Netflix with powerful insight capabilities.

Join Us!

Do you want to help design and build the next generation of innovative insight tools for operational visibility at Netflix? This is a greenfield project where you can have a large impact working in a small team. Are you interested in real-time stream processing or data visualization? We are looking for talented engineers www.netflix.com/jobs.

Thursday, January 9, 2014

S3mper: Consistency in the Cloud

by: Daniel C. Weeks

In previous posts, we discussed how the Hadoop platform at Netflix leverages AWS’s S3 offering (read more here). In short, Netflix considers S3 the “source of truth” for all data warehousing.  There are many attractive features that draw us to this service including: 99.999999999% durability, 99.99% availability, effectively infinite storage, versioning (data recovery), and ubiquitous access. In combination with AWS’s EMR, we can dynamically expand/shrink clusters, provision/decommission clusters based on need or availability of reserved capacity, perform live cluster swapping without interrupting processing, and explore new technologies all utilizing the same data warehouse.  In order to provide the capabilities listed above, S3 makes one particular concession which is the focus of this discussion: consistency.

The consistency guarantees for S3 vary by region and operation (details here), but in general, any list or read operation is susceptible to inconsistent information depending on preceding operations. For basic data archival, consistency is not a concern.  However, in a data centric computing environment where information flows through a complex workflow of computations and transformations, an eventually consistent model can cause problems ranging from insidious data loss to catastrophic job failure.

Over the past few years, sporadic inaccuracies presented which only after extensive investigation pointed to consistency as the culprit.  With the looming concern of data inaccuracy and no way to identify the scope or impact, we invested some time exploring how to diagnose issues resulting from eventual consistency and methods to mitigate the impact.  The result of this endeavor is a library that continues to evolve, but is currently in production here at Netflix: s3mper (latin: Always).

Netflix is pleased to announce that s3mper is now released as open source under the Apache License v2.0.  We hope that the availability of this library will inspire constructive discussion focusing on how to better manage consistency at scale with the Hadoop stack across the many cloud offerings currently available.

How Inconsistency Impacts Processing



The Netflix ETL Process is predominantly Pig and Hive jobs scheduled through enterprise workflow software that resolves dependencies and manages task execution.  To understand how eventual consistency affects processing, we can distill the process down to a simple example of two jobs where the results of one feed into another.  If we take a look at Pig-1 from the diagram, it consists of two MapReduce jobs in a pipeline.  The initial dataset is loaded from S3 due to the source location referencing an S3 path. All intermediate data is stored in HDFS since that is the default file system.  Consistency is not a concern for these intermediate stages.  However, the results from Pig-1 are stored directly back to S3 so the information is immediately available for any other job across all clusters to consume.

Pig-2 is activated based on the completion of Pig-1 and immediately lists the output directories of the previous task.  If the S3 listing is incomplete when the second job starts, it will proceed with incomplete data.  This is particularly problematic, as we stated earlier, because there is no indication that a problem occurred.  The integrity of resulting data is entirely at the mercy of how consistent the S3 listing was when the second job started.

A variety of other scenarios may result in consistency issues, but inconsistent listing is our primary concern.  If the input data is incomplete, there is no indication anything is wrong with the result.  Obviously it is noticeable when the expected results vary significantly from long standing patterns or emit no data at all, but if only a small portion of input is missing the results will appear convincing.  Data loss occurring at the beginning of a pipeline will have a cascading effect where the end product is wildly inaccurate.  Due to the potential impact, it is essential to understand the risks and methods to mitigate loss of data integrity.

Approaches to Managing Consistency

The Impractical


When faced with eventual consistency, the most obvious (and naive) approach is to simply wait a set amount of time before a job starts with the expectation that data will show up.  The problem is knowing how long “eventual” will last.  Injecting an artificial delay is detrimental because it defers processing even if requisite data is available and still misses data if it fails to materialize in time.  The result is a net loss for both processing time and confidence in the resulting data.

Staging Data


A more common approach to processing in the cloud is to load all necessary data into HDFS, complete all processing, and store the final results to S3 before terminating the cluster.  This approach works well if processing is isolated to a single cluster and performed in batches.  As we discussed earlier, having the ability to decouple the data from the computing resources provides flexibility that cannot be achieved within a single cluster.  Persistent clusters also make this approach difficult.  Data in S3 may far exceed the capacity of the HDFS cluster and tracking what data needs to be staged and when it expires is a particularly complex problem to solve.

Consistency through Convention

Conventions can be used to eliminate some cases of inconsistency.  Read and list inconsistency resulting from overwriting the same location can result in data corruption in that a listing may include old versions of data with new therefore producing an amalgam of two incomplete datasets.  Eliminating update inconsistency is achievable by imposing a convention where the same location is never overwritten.  Here at Netflix, we encourage the use of a batching pattern, where results are written into partitioned batches and the Hive metastore only references the valid batches.  This approach removes the possibility of inconsistency due to update or delete.  For all AWS regions except US Standard that provide “read-after-write” consistency, this approach may be sufficient, but relies on strict adherence.

Secondary Index


S3 is designed with an eventually consistent index, which is understandable in context of the scale and the guarantees it provides.  At smaller scale, it is possible to achieve consistency through use of a consistent, secondary index to catalog file metadata while backing the raw data on S3.  This approach becomes more difficult to achieve as the scale increases, but as long as the secondary index can handle the request rate and still provide guaranteed consistency, it will suffice.  There are costs to this approach.  The probability of data loss and the complexity increases while performance degrades due to relying on two separate systems.

S3mper: A Hybrid Approach

S3mper is an experimental approach to tracking file metadata through use of a secondary index that provides consistent reads and writes.  The intent is to identify when an S3 list operation returns inconsistent results and provide options to respond.  We implemented s3mper using aspects to advise methods on the Hadoop FileSystem interface and track file metadata with DynamoDB as the secondary index.  The reason we chose DynamoDB is that it provides capabilities similar to S3 (e.g. high availability, durability through replication), but also adds consistent operations and high performance.


What makes s3mper a hybrid approach is its use of the S3 listing for comparison and only maintaining a window of consistency.  The “source of truth” is still S3, but with an additional layer of checking added.  The window of consistency allows for falling back to the S3 listing without concern that the secondary index will fail and lose important information or risk consistency issues that arise from using tools outside the hadoop stack to modify data in S3.

The key features s3mper provides include (see here for more detailed design and options):
  • Recovery: When an inconsistent listing is identified, s3mper will optionally delay the listing and retry until consistency is achieved.  This will delay a job only long enough for data to become available without unnecessarily impacting job performance.
  • Notification: If listing cannot be achieved, a notification is sent immediately and a determination can be made as to whether to kill the job or let it proceed with incomplete data.
  • Reporting: A variety of events are sent to track the number of recoveries, files missed, what jobs were affected, etc.
  • Configurability:  Options are provided to control how long a job should wait, how frequently to recheck a listing, and whether to fail a job if the listing is inconsistent.
  • Modularity: The implementations for the metastore and notifications can be overridden based on the environment and services at your disposal.
  • Administration: Utilities are provided for inspecting the metastore and resolving conflicts between the secondary index in DynamoDB and the S3 index.

S3mper is not intended to solve every possible case where inconsistency can occur.  Deleting data from S3 outside of the hadoop stack will result in divergence of the secondary index and jobs being delayed unnecessarily.  Directory support is also limited such that recursive listings are still prone to inconsistency, but since we currently derive all our data locations from a Hive metastore, this does not impact us.  While this library is still in its infancy and does not support every case, using it in combination with the conventions discussed earlier will alleviate the concern for our workflow and allow for further investigation and development of new capabilities.


Performance in production

S3mper has been running in production at Netflix for a few months and the result is an interesting dataset with respect to consistency.  For context, Netflix operates out of the US Standard region where we run tens of thousands of Pig, Hive, and Hadoop jobs across multiple clusters of varying size and process several hundreds of terabytes of data every day.  The number of listings is hard to estimate because any given job will perform several listings depending on the number of partitions processed, but s3mper is tracking every interaction Hadoop has with S3 across all clusters and datasets.  At any given time, DynamoDB contains metadata on millions of files within our configured 24 hour sliding window of consistency.  We keep track of metrics on how frequently s3mper recovers a listing (i.e. postpones a job until it receives a complete listing) and when the delay is exceeded resulting in a job executing with data acquired through an inconsistent listing.  


It is clear from these numbers that inconsistent listings make up a tiny fraction of all S3 operations.  In many cases all files are available within a few minutes and s3mper can recover the listing.  In cases where listings are not recovered, notification goes out to the job owner and they can determine if a rerun is necessary.  We can only speculate at the variation seen over time because S3 is a shared resource and we have little knowledge of the underlying implementation.

After investigating a sample of affected jobs, patterns do emerge that appear to result in increased probability of inconsistent listing.  For example, a stage within a single job that produces tens of thousands of files and reads them immediately in the next stage appears to have a higher likelihood of consistency issues.  We also make use of versioned buckets, which track history through use of delete markers.  Jobs that experience slower consistency often overwrite the same location repeatedly, which may have some correlation to how quickly an updated listing is available.  These observations are based purely on the types of queries and access patterns that have resulted in inconsistent listings as reported by s3mper.

Conclusion

With the petabytes of data we store in S3 and several million operations we perform each day, our experience with eventual consistency demonstrates that only a very small percentage of jobs are impacted, but the severity of inaccurate results warrants attention.  Being able to identify when a consistency issue occurs is beneficial not only due to confidence in resulting data, but helps to exclude consistency in diagnosing where a problem exists elsewhere in the system.  There is still more to be learned and we will continue to investigate avenues to better identify and resolve consistency issues, but s3mper is a solution we use in production and will continue to provide insight into these areas.

Thursday, January 2, 2014

Introducing PigPen: Map-Reduce for Clojure

by: Matt Bossenbroek

It is our pleasure to release PigPen to the world today. PigPen is map-reduce for Clojure. It compiles to Apache Pig, but you don’t need to know much about Pig to use it.

What is PigPen?

  • A map-reduce language that looks and behaves like clojure.core
  • The ability to write map-reduce queries as programs, not scripts
  • Strong support for unit tests and iterative development

Note: If you are not familiar at all with Clojure, we strongly recommend that you try a tutorial here, here, or here to understand some of the basics.

Really, yet another map-reduce language?

If you know Clojure, you already know PigPen

The primary goal of PigPen is to take language out of the equation. PigPen operators are designed to be as close as possible to the Clojure equivalents. There are no special user defined functions (UDFs). Define Clojure functions, anonymously or named, and use them like you would in any Clojure program.

Here’s the proverbial word count:

(require '[pigpen.core :as pig])

(defn word-count [lines]
  (->> lines
    (pig/mapcat #(-> % first
                   (clojure.string/lower-case)
                   (clojure.string/replace #"[^\w\s]" "")
                   (clojure.string/split #"\s+")))
    (pig/group-by identity)
    (pig/map (fn [[word occurrences]] [word (count occurrences)]))))

This defines a function that returns a PigPen query expression. The query takes a sequence of lines and returns the frequency that each word appears. As you can see, this is just the word count logic. We don’t have to conflate external concerns, like where our data is coming from or going to.

Will it compose?

Yep - PigPen queries are written as function compositions - data in, data out. Write it once and avoid the copy & paste routine.

Here we use our word-count function (defined above), along with a load and store command, to make a PigPen query:

(defn word-count-query [input output]
  (->>
    (pig/load-tsv input)
    (word-count)
    (pig/store-tsv output)))

This function returns the PigPen representation of the query. By itself, it won’t do anything - we have to execute it locally or generate a script (more on that later).

You like unit tests? Yeah, we do that

With PigPen, you can mock input data and write a unit test for your query. No more crossing your fingers & wondering what will happen when you submit to the cluster. No more separate files for test input & output.

Mocking data is really easy. With pig/return and pig/constantly, you can inject arbitrary data as a starting point for your script.

A common pattern is to use pig/take to sample a few rows of the actual source data. Wrap the result with pig/return and you’ve got mock data.

(use 'clojure.test)

(deftest test-word-count
  (let [data (pig/return [["The fox jumped over the dog."]
                          ["The cow jumped over the moon."]])]
    (is (= (pig/dump (word-count data))
           [["moon" 1]
            ["jumped" 2]
            ["dog" 1]
            ["over" 2]
            ["cow" 1]
            ["fox" 1]
            ["the" 4]]))))

The pig/dump operator runs the query locally.

Closures (yes, the kind with an S)

Parameterizing your query is trivial. Any in-scope function parameters or let bindings are available to use in functions.

(defn reusable-fn [lower-bound data]
  (let [upper-bound (+ lower-bound 10)]
    (pig/filter (fn [x] (< lower-bound x upper-bound)) data)))

Note that lower-bound and upper-bound are present when we generate the script, and are made available when the function is executed within the cluster.

So how do I use it?

Just tell PigPen where to write the query as a Pig script:

(pig/write-script "word-count.pig"
                  (word-count-query "input.tsv" "output.tsv"))

And then you have a Pig script which you can submit to your cluster. The script uses pigpen.jar, an uberjar with all of the dependencies, so make sure that is submitted with it. Another option is to build an uberjar for your project and submit that instead. Just rename it prior to submission. Check out the tutorial for how to build an uberjar.

As you saw before, we can also use pig/dump to run the query locally and return Clojure data:

=> (def data (pig/return [["The fox jumped over the dog."]
                          ["The cow jumped over the moon."]]))
#'pigpen-demo/data

=> (pig/dump (word-count data))
[["moon" 1] ["jumped" 2] ["dog" 1] ["over" 2] ["cow" 1] ["fox" 1] ["the" 4]]

If you want to get started now, check out getting started & tutorials.

Why do I need Map-Reduce?

Map-Reduce is useful for processing data that won’t fit on a single machine. With PigPen, you can process massive amounts of data in a manner that mimics working with data locally. Map-Reduce accomplishes this by distributing the data across potentially thousands of nodes in a cluster. Each of those nodes can process a small amount of the data, all in parallel, and accomplish the task much faster than a single node alone. Operations such as join and group, which require coordination across the dataset, are computed by partitioning the data with a common join key. Each value of the join key is then sent to a specific machine. Once that machine has all of the potential values, it can then compute the join or do other interesting work with them.

To see how PigPen does joins, let’s take a look at pig/cogroup. Cogroup takes an arbitrary number of data sets and groups them all by a common key. Say we have data that looks like this:

foo:

  {:id 1, :a "abc"}
  {:id 1, :a "def"}
  {:id 2, :a "abc"}

bar:

  [1 42]
  [2 37]
  [2 3.14]

baz:

  {:my_id "1", :c [1 2 3]]}

If we want to group all of these by id, it looks like this:

(pig/cogroup (foo by :id)
             (bar by first)
             (baz by #(-> % :my_id Long/valueOf))
             (fn [id foos bars bazs] ...))

The first three arguments are the datasets to join. Each one specifies a function that is applied to the source data to select the key. The last argument is a function that combines the resulting groups. In our example, it would be called twice, with these arguments:

[1 ({:id 1, :a "abc"}, {:id 1, :a "def"})
   ([1 42])
   ({:my_id "1", :c [1 2 3]]})]
[2 ({:id 2, :a "abc"})
   ([2 37] [2 3.14])
   ()]

As you can see, this consolidates all of the values with an id of 1, all of the values with 2, etc. Each different key value can then be processed independently on different machines. By default, keys are not required to be present in all sources, but there are options that can make them required.

Hadoop provides a very low-level interface for doing map-reduce jobs, but it’s also very limited. It can only run a single map-reduce pair at a time as it has no concept of data flow or a complex query. Pig adds a layer of abstraction on top of Hadoop, but at the end of the day, it’s still a scripting language. You are still required to use UDFs (user defined functions) to do interesting tasks with your data. PigPen furthers that abstraction by making map-reduce available as a first class language.

If you are new to map-reduce, we encourage you to learn more here.

Motivations for creating PigPen

  • Code reuse. We want to be able to define a piece of logic once, parameterize it, and reuse it for many different jobs.
  • Consolidate our code. We don’t like switching between a script and a UDF written in different languages. We don’t want to think about mapping between differing data types in different languages.
  • Organize our code. We want our code in multiple files, organized how we want - not dictated by the job it belongs to.
  • Unit testing. We want our sample data inline with our unit tests. We want our unit tests to test the business logic of our jobs without complications of loading or storing data.
  • Fast iteration. We want to be able to trivially inject mock data at any point in our jobs. We want to be able to quickly test a query without waiting for a JVM to start.
  • Name what you want to. Most map-reduce languages force too many names and schemas on intermediate products. This can make it really difficult to mock data and test isolated portions of jobs. We want to organize and name our business logic as we see fit - not as dictated by the language.
  • We’re done writing scripts; we’re ready to start programming!

Note: PigPen is not a Clojure wrapper for writing Pig scripts you can hand edit. While it’s entirely possible, the resulting scripts are not intended for human consumption.

Design & Features

PigPen was designed to match Clojure as closely as possible. Map-reduce is functional programming, so why not use an awesome functional programming language that already exists? Not only is there a lower learning curve, but most of the concepts translate very easily to big data.

In PigPen, queries are manipulated as expression trees. Each operation is represented as a map of information about what behavior is desired. These maps can be nested together to build a tree representation of a complex query. Each command also contains references to its ancestor commands. When executed, that query tree is converted into a directed acyclic query graph. This allows for easy merging of duplicate commands, optimizing sequences of related commands, and instrumenting the query with debug information.

Optimization

De-duping

When we represent our query as a graph of operations, de-duping them is trivial. Clojure provides value-equality, meaning that if two objects have the same content, they are equal. If any two operations have the same representation, then they are in fact identical. No care has to be taken to avoid duplicating commands when writing the query - they’re all optimized before executing it.

For example, say we have the following query:

(let [even-squares (->>
                     (pig/load-clj "input.clj")
                     (pig/map (fn [x] (* x x)))
                     (pig/filter even?)
                     (pig/store-clj "even-squares.clj"))
      odd-squares (->>
                    (pig/load-clj "input.clj")
                    (pig/map (fn [x] (* x x)))
                    (pig/filter odd?)
                    (pig/store-clj "odd-squares.clj"))]
  (pig/script even-squares odd-squares))

In this query, we load data from a file, compute the square of each number, and then split it into even and odd numbers. Here’s what a graph of this operation would look like:

This matches our query, but it’s doing some extra work. It’s loading the same input.clj file twice and computing the squares of all of the numbers twice. This might not seem like a lot of work, but when you do it on a lot of data, simple operations really add up. To optimize this query, we look for operations that are identical. At first glance it looks like our operation to compute squares might be a good candidate, but they actually have different parents so we can’t merge them yet. We can, however, merge the load functions because they don’t have any parents and they load the same file.

Now our graph looks like this:

Now we’re loading the data once, which will save some time, but we’re still doing the squares computation twice. Since we now have a single load command, our map operations are now identical and can be merged:

Now we have an optimized query where each operation is unique. Because we always merge commands one at a time, we know that we’re not going to change the logic of the query. You can easily generate queries within loops without worrying about duplicated execution - PigPen will only execute the unique parts of the query.

Serialization

After we’re done processing data in Clojure, our data must be serialized into a binary blob so Pig can move it around between machines in a cluster. This is an expensive, but essential step for PigPen. Luckily, many consecutive operations in a script can often be packed together into a single operation. This saves a lot of time by not serializing and deserializing the data when we don’t need to. For example, any consecutive map, filter, and mapcat operations can be re-written as a single mapcat operation.

Let’s look at some examples to illustrate this:

In this example, we start with a serialized (blue) value, 4, deserialize it (orange), apply our map function, and re-serialize the value.

Now let’s try a slightly more complex (and realistic) example. In this example, we apply a map, mapcat, and filter operation.

If you haven’t used it before, mapcat is an operation where we apply a function to a value and that function returns a sequence of values. That sequence is then ‘flattened’ and each single value is fed into the next step. In Clojure, it’s the result of combining map and concat. In Scala, this is called flatMap and in c# it’s called selectMany.

In the diagram below, the flow on the left is our query before the optimization; the right is after the optimization. We start with the same value, 4, and calculate the square of the value; same as the first example. Then we take our value and apply a function that decrements the value, returns the value, and increments the value. Pig then takes this set of values and flattens them, making each one an input to the next step. Note that we had to serialize and deserialize the data when interacting with Pig. The third and final step is to filter the data; in this example we’re retaining only odd values. As you can see, we end up serializing and deserializing the data in between each step.

The right hand side shows the result of the optimization. Put simply, each operation now returns a sequence of elements. Our map operation returns a sequence of one element, 16, our mapcat remains the same, and our filter returns a sequence of zero or one elements. By making these commands more uniform, we can merge them more easily. We end up flattening more sequences of values within the set of commands, but there is no serialization cost between steps. While it looks more complex, this optimization results in much faster execution of each if these steps.

Testing, Local Execution, and Debugging

Iterative development, testing, and debuggability are key tenants of PigPen. When you have jobs that can run for days at a time, the last thing you need is an unexpected bug to show up in the eleventh hour. PigPen has a local execution mode that’s powered by rx. This allows us to write unit tests for our queries. We can then know with very high confidence that something will not crash when run and will actually return the expected results. Even better, this feature allows for iterative development of queries.

Typically, we start with just a few records of the source data and use that to populate a unit test. Because PigPen returns data in the REPL, we don’t have to go elsewhere to build our test data. Then, using the REPL, we add commands to map, filter, join, and reduce the mock data as required; each step of the way verifying that the result is what we expect. This approach produces more reliable results than building a giant monolithic script and crossing your fingers. Another useful pattern is to break up large queries into smaller functional units. Map-reduce queries tend to explode and contract the source data by orders of magnitude. When you try to test the script as a whole, you often have to start with a very large amount of data to end up with just a few rows. By breaking the query into smaller parts, you can test the first part, which may take 100 rows to produce two; and then test the second part by using those two rows as a template to simulate 100 more fake ones.

Debug mode has proven to be really useful for fixing the unexpected. When enabled, it will write to disk the result of every operation in the script, in addition to the normal outputs. This is very useful in an environment such as Hadoop, where you can’t step through code and hours may pass in between operations. Debug mode can also be coupled with a graph-viz visualization of the execution graph. You can then visually associate what it plans to do with the actual output of each operation.

To enable debug mode, see the options for pig/write-script and pig/generate-script. It will write the extra debug output to the folder specified.

Example of debug mode enabled:

(pig/write-script {:debug "/debug-output/"} "my-script.pig" my-pigpen-query)

To enable visualization, take a look at pig/show and pig/dump&show

Example of visualization:

(pig/show my-pigpen-query)        ;; Shows a graph of the query
(pig/dump&show my-pigpen-query)   ;; Shows a graph and runs it locally

Extending PigPen

One nice feature of PigPen is that it’s easy to build your own operators. For example, we built set and multi-set operators such as difference and intersection. These are just variants of other operations like co-group, but it’s really nice to define them once, test them thoroughly, and not have to think about the logic behind a multiset intersection of n sets ever again.

This is useful for more complex operators as well. We have a reusable statistics operator that computes the sum, avg, min, max, sd, and quantiles for a set of data. We also have a pivot operator that groups dimensional fields in the data and counts each group.

While each of these by themselves are simple operations, when you abstract them out of your query, your query starts to become a lot smaller and simpler. When your query is smaller and simpler, you can spend more time focusing on the actual logic of the problem you’re trying to solve instead of re-writing basic statistics each time. What you’re doing becomes clearer, every step of the way.

Why Pig?

We chose Pig because we didn’t want to re-implement all of the optimization work that has gone into Pig already. If you take away the language, Pig does an excellent job of moving big data around. Our strategy was to use Pig’s DataByteArray binary format to move around serialized Clojure data. In most cases, we found that Pig didn’t need to be aware of the underlying types present in the data. Byte arrays can be compared trivially and quickly, so for joins and groupings, Pig simply needs to compare the serialized blob. We get Clojure’s great value equality for free as equivalent data structures produce the same serialized output. Unfortunately, this doesn’t hold true for sorting data. The sorted order of a binary blob is far less than useful, and doesn’t match the sorted order of the native data. To sort data, we must fall back to the host language, and as such, we can only sort on simple types. This is one of very few places where Pig has imposed a limitation on PigPen.

We did evaluate other languages before deciding to build PigPen. The first requirement was that it was an actual programming language, not just a scripting language with UDFs. We briefly evaluated Scalding, which looks very promising, but our team primarily uses Clojure. It could be said that PigPen is to Clojure what Scalding is to Scala. Cascalog is usually the go-to language for map-reduce in Clojure, but from past experiences, datalog has proven less than useful for everyday tasks. There’s a complicated new syntax and concepts to learn, aligning variable names to do implicit joins is not always ideal, misplaced ordering of operations can often cause big performance problems, datalog will flatten data structures (which can be wasteful), and composition can be a mind bender.

We also evaluated a few options to use as a host language for PigPen. It would be possible to build a similar abstraction on top of Hive, but schematizing every intermediate product doesn’t fit well with the Clojure ideology. Also, Hive is similar to SQL, making translation from a functional language more difficult. There’s an impedance mismatch between relational models like SQL and Hive and functional models like Clojure or Pig. In the end, the most straightforward solution was to write an abstraction over Pig.

Future Work

Currently you can reference in-scope local variables within code that is executed remotely, as shown above. One limitation to this feature is that the value must be serializable. This has the downside of not being able to utilize compiled functions - you can’t get back the source code that created them in the first place. This means that the following won’t work:

(defn foo [x] ...)

(pig/map foo)

In this situation, the compiler will inform you that it doesn’t recognize foo. We’re playing around with different methods for requiring code remotely, but there are some nuances to this problem. Blindly loading the code that was present when the script was generated is an easy option, but it might not be ideal if that code accidentally runs something that was only intended to run locally. Another option would be for the user to explicitly specify what to load remotely, but this poses challenges as well, such as an elegant syntax to express what should be loaded. Everything we’ve tried so far is a little clunky and jar hell with Hadoop doesn’t make it any easier. That said, any code that’s available can be loaded from within any user function. If you upload your uberjar, you can then use a require statement to load other arbitrary code.

So far, performance in PigPen doesn’t seem to be an issue. Long term, if performance issues crop up, it will be relatively easy to migrate to running PigPen directly on Hadoop (or similar) without changing the abstraction. One of the key performance features we still have yet to build is incremental aggregation. Pig refers to this as algebraic operators (also referenced by Rich Hickey here as combining functions). These are operations that can compute partial intermediate products over aggregations. For example, say we want to take the average of a LOT of numbers - so many that we need map-reduce. The naive approach would be to move all of the numbers to one machine and compute the average. A better approach would be to partition the numbers, compute the sum and count of each of these smaller sets, and then use those intermediate products to compute the final average. The challenge for PigPen will be to consume many of these operations within a single function. For example, say we have a set of numbers and we want to compute the count, sum, and average. Ideally, we would want to define each of these computations independently as algebraic operations and then use them together over the same set of data, having PigPen do the work of maintaining a set of intermediate products. Effectively, we need to be able to compose and combine these operations while retaining their efficiency.

We use a number of other Pig & Hadoop tools at Netflix that will pair nicely with PigPen. We have some prototypes for integration with Genie, which adds a pig/submit operator. There’s also a loader for Aegisthus data in the works. And PigPen works with Lipstick as the resulting scripts are Pig scripts.

Conclusion

PigPen has been a lot of fun to build and we hope it’s even more fun to use. For more information on getting started with PigPen and some tutorials, check out the tutorial, or to contribute, take a look at our Github page: https://github.com/Netflix/PigPen

There are three distinct audiences for PigPen, so we wrote three different tutorials:

If you know both Clojure and Pig, you’ll probably find all of the tutorials interesting.

The full API documentation is located here

And if you love big data, check out our jobs.