Thursday, October 23, 2014

FIT : Failure Injection Testing

by Kolton Andrus, Naresh Gopalani, Ben Schmaus

It's no secret that at Netflix we enjoy deliberately breaking things to test our production systems. Doing so lets us validate our assumptions and prove that our mechanisms for handling failure will work when called upon. Netflix has a tradition of implementing a range of tools that create failure, and it is our pleasure to introduce you to the latest of these solutions, FIT or Failure Injection Testing.

FIT is a platform that simplifies creation of failure within our ecosystem with a greater degree of precision for what we fail and who we will impact. FIT also allows us to propagate our failures across the entirety of Netflix in a consistent and controlled manner.

Why We Built FIT
While breaking things is fun, we do not enjoy causing our customers pain. Some of our Monkeys, by design, can go a little too wild when let out of their cages. Latency Monkey in particular has bitten our developers, leaving them wary about unlocking the cage door.

Latency monkey adds a delay and/or failure on the server side of a request for a given service. This provides us good insight into how calling applications behave when their dependency slows down - threads pile up, the network becomes congested, etc. Latency monkey also impacts all calling applications - whether they want to participate or not, and can result in customer pain if proper fallback handling, timeouts, and bulkheads don’t work as expected. With the complexity of our system it is virtually impossible for us to anticipate where failures will happen when turning latency monkey loose. Validating these behaviors often is risky, but critical to remain resilient.

What we need is a way to limit the impact of failure testing while still breaking things in realistic ways. We need to control the outcome until we have confidence that the system degrades gracefully, and then increase it to exercise the failure at scale.  This is where FIT comes in.

How FIT works
Simulating failure starts when the FIT service pushes failure simulation metadata to Zuul. Requests matching the failure scope at Zuul are decorated with failure. This may be an added delay to a service call, or failure in reaching the persistence layer. Each injection point touched checks the request context to determine if there is a failure for that specific component. If found, the injection point simulates that failure appropriately.  Below is an outline of a simulated failure, demonstrating some of the inflection points in which failure can be injected.

FIT Architecture Example.png

Failure Scope
We only want to break those we intend, so limiting the potential blast radius is critical. To achieve this we use Zuul, which provides many powerful capabilities for inspecting and managing traffic. Before forwarding a request, Zuul checks a local store of FIT metadata to determine if this request should be impacted. If so, Zuul decorates the request with a failure context, which is then propagated to all dependent services.

For most failure tests, we use Zuul to isolate impacted requests to only a specific test account or a specific device. Once validated at that level, we expand the scope to a small percentage of production requests. If the failure tests still looks good, we will gradually dial up the chaos to 100%.

Injection Points
We have several key “building block” components that are used within Netflix. They help us to isolate failure and define fallbacks (Hystrix), communicate with dependencies (Ribbon), cache data (EVCache), or persist data (Astyanax). Each of these layers make perfect inflection points to inject failure. These layers interface with the FIT context to determine if this request should be impacted. The failure behavior is provided to that layer, which determines how to emulate that failure in a realistic fashion: sleep for a delay period, return a 500, throw an exception, etc.

Failure Scenarios
Whether we are recreating a past outage, or proactively testing the loss of a dependency, we need to know what could fail in order to build a simulation. We use an internal system that traces requests through the entirety of the Netflix ecosystem to find all of the injection points along the path. We then use these to create failure scenarios, which are sets of injection points which should or should not fail. One such example is our critical services scenario, the minimum set of our services required to stream. Another may be the loss of an individual service, including its persistence and caching layers.

Automated Testing
Failure testing tools are only as valuable as their usage. Our device testing teams have developed automation which: enables failure, launches Netflix on a device, browses through several lists, selects a video, and begins streaming. We began by validating this process works if only our critical services are available. Currently we are extending this to identify every dependency touched during this process, and systematically failing each one individually. As this is running continuously, it helps us to identify vulnerabilities when introduced.

Resiliency Strategies
FIT has proven useful to bridge the gap between isolated testing and large scale chaos exercises, and make such testing self service. It is one of many tools we have to help us build more resilient systems. The scope of the problem extends beyond just failure testing, we need a range of techniques and tools: designing for failure, better detection and faster diagnosis, regular automated testing, bulkheading, etc. If this sounds interesting to you, we’re looking for great engineers to join our reliability, cloud architecture, and API teams!

Tuesday, October 7, 2014

Using Presto in our Big Data Platform on AWS

by: Eva Tse, Zhenxiao Luo, Nezih Yigitbasi @ Big Data Platform team

At Netflix, the Big Data Platform team is responsible for building a reliable data analytics platform shared across the whole company. In general, Netflix product decisions are very data driven. So we play a big role in helping different teams to gain product and consumer insights from a multi-petabyte scale data warehouse (DW). Their use cases range from analyzing A/B tests results to analyzing user streaming experience to training data models for our recommendation algorithms.

We shared our overall architecture in a previous blog post. The underpinning of our big data platform is that we leverage AWS S3 for our DW. This architecture allows us to separate compute and storage layers. It allows multiple clusters to share the same data on S3 and clusters can be long-running and yet transient (for flexibility). Our users typically write Pig or Hive jobs for ETL and data analytics.

A small subset of the ETL output and some aggregated data is transferred to Teradata for interactive querying and reporting. On the other hand, we also have the need to do low latency interactive data exploration on our broader data set on S3. These are the use cases that Presto serves exceptionally well. Seven months ago, we first deployed Presto into production and it is now an integral part of our data ecosystem. In this blog post, we would like to share our experience with Presto and how we made it work for us!

Why Presto?

We had been in search of an interactive querying engine that could work well for us. Ideally, we wanted an open source project that could handle our scale of data & processing needs, had great momentum, was well integrated with the Hive metastore, and was easy for us to integrate with our DW on S3. We were delighted when Facebook open sourced Presto.

In terms of scale, we have a 10 petabyte data warehouse on S3. Our users from different organizations query diverse data sets across expansive date ranges. For this use case, caching a specific dataset in memory would not work because cache hit rate would be extremely low unless we have an unreasonably large cache. The streaming DAG execution architecture of Presto is well-suited for this sporadic data exploration usage pattern.

In terms of integrating with our big data platform, Presto has a connector architecture that is Hadoop friendly. It allows us to easily plug in an S3 file system. We were up and running in test mode after only a month of work on the S3 file system connector in collaboration with Facebook.

In terms of usability, Presto supports ANSI SQL, which has made it very easy for our analysts and developers to get rolling with it.  As far as limitations / drawbacks, user-defined functions in Presto are more involved to develop, build, and deploy as compared to Hive and Pig. Also, for users who want to productionize their queries, they need to rewrite them in HiveQL or Pig Latin, as we don’t currently use Presto in our critical production pipelines. While there are some minor inconveniences, the benefits of being able to interactively analyze large amounts of data is a huge win for us.

Finally, Presto was already running in production at Facebook. We did some performance benchmarking and stress testing and we were impressed. We also looked under the hood and saw well designed and documented Java code. We were convinced!

Our production environment and use cases

Currently, we are running with ~250 m2.4xlarge EC2 worker instances and our coordinator is on r3.4xlarge. Our users run ~2500 queries/workday. Our Presto cluster is completely isolated from our Hadoop clusters, though they all access the same data on our S3 DW.

Almost all of our jobs are CPU bound. We set our task memory to a rather high value (i.e., 7GB, with a slight chance in oversubscribing memory) to run some of our memory intensive queries, like big joins or aggregation queries.

We do not use disk (as we don’t use HDFS) in the cluster. Hence, we will be looking to upgrade to the current generation AWS instance type (e.g. r3), which has more memory, and has better isolation and performance than the previous generation of EC2 instances.

We are running the latest Presto 0.76 release with some outstanding pull requests that are not committed yet. Ideally, we would like to contribute everything back to open source and not carry custom patches in our deployment. We are actively working with Facebook and looking forward to committing all of our pull requests.

Presto addresses our ad hoc interactive use cases. Our users always go to Presto first for quick answers and for data exploration. If Presto does not support what they need (like big join / aggregation queries that exceed our memory limit or some specific user-defined functions that are not available), then they would go back to Hive or Pig.

We are working on a Presto user interface for our internal big data portal. Our algorithm team also built an interactive data clustering application by integrating R with Presto via an open source Python Presto client.

Performance benchmark

At a high level, we compare Presto and Hive query execution time using our own datasets and users’ queries instead of running standard benchmarks like TPC-H or TPC-DS. This way, we can translate the results back to what we can expect for our use cases. The graph below shows the results of three queries: a group-by query, a join plus a group-by query, and a needle-in-a-haystack (table scan) query. We compared the performance of Presto vs. Hive 0.11 on Hadoop 2 using Parquet input files on S3, all of which we currently use in production. Each query processed the same data set with varying data sizes between ~140GB to ~210GB depending on the file format.

Cluster setup:
40 nodes m2.4xlarge

Settings we tuned:

We understand performance test environments and numbers are hard to reproduce. What is worth noting is the relative performance of these tests. The key takeaway is that queries that take one or two map-reduce (MR) phases in Hadoop run 10 to 100 times faster in Presto. The speedup in Presto is linear to the number of MR jobs involved. For jobs that only do a table scan (i.e., I/O bound instead of CPU bound), it is highly dependent on the read performance of the file format used. We did some work on Presto / Parquet integration, which we will cover in the next section.

Our Presto contributions

The primary and initial piece of work that made Presto work for us was S3 FileSystem integration. In addition, we also worked on optimizing S3 multipart upload. We also made a few enhancements and bug fixes based on our use cases along the way: disabling recursive directory listing, json tuple generation, foreground metastore refresh, mbean for S3 filesystem monitoring, and handling S3 client socket timeout.

In general, we are committed to make Presto work better for our users and to cover more of their needs. Here are a few big enhancements that we are currently working on:

Parquet file format support

We recently upgraded our DW to use the Parquet file format (FF) for its performance on S3 and for its flexibility to integrate with different data processing engines. Hence, we are committed to make Presto work better with Parquet FF.  (For details on why we chose Parquet and what we contributed to make it work in our environment, stay tuned for an upcoming blog post).

Developing based on Facebook’s initial Parquet integration, we added support for predicate pushdown, column position based access (instead of name based access) to Parquet columns, and data type coercion. For context, we use the Hive metastore as our source of truth for metadata, and we do schema evolution on the Hive metastore. Hence, we need column position based access to work with our Hive metastore instead of using the schema information stored in Parquet files.

Here is a comparison of Presto job execution times among different FFs. We compare read performance of sequence file (a FF we have historically used), ORCFile (we benchmarked the latest integration with predicate pushdown, vectorization and lazy materialization on read) and Parquet. We also compare the performance on S3 vs. HDFS. In this test, we use the same data sets and environment as the above benchmark test. The query is a needle-in-a-haystack query that does a select and filter on a condition that returns zero rows.

Screen Shot 2014-09-30 at 2.29.03 PM.png

As next step, we will look into improving Parquet performance further by doing predicate pushdown to eliminate whole row groups, vectorization and lazy materialization on read. We believe this will make Parquet performance on par with ORC files.

ODBC / JDBC support

This is one of the biggest asks from our users. Users like to connect to our Hive DW directly to do exploratory / ad hoc reporting because it has the full dataset. Given Presto is interactive and integrated with Hive metastore, it is a natural fit.

Presto has a native ODBC driver that was recently open sourced. We made a few bug fixes and we are working on more enhancements. Overall, it is working well now for our Tableau users in extract (non-live exploration) mode. For our users who prefer to use Microstrategy, we plan to explore different options to integrate with it next.

Map data type support

All the event data generated from our Netflix services and Netflix-enabled devices comes through our Suro pipeline before landing in our DW. For flexibility, this event data is structured as key/value pairs, which get automatically stored in map columns in our DW. Users may pull out keys as a top level columns in the Hive metastore by adjusting some configurations in our data pipeline. Still, a large number of key/value pairs remain in the map because there are a large number of keys and the key space is very sparse.

It is very common for users to lookup a specific key from the map. With our current Parquet integration, looking up a key from the map column means converting the column to JSON string first then parsing it. Facebook recently added native support for array and map data types. We plan to further enhance it to support array element or map key specific column pruning and predicate pushdown for Parquet FF to improve performance.

Our wishlist

There are still a couple of items that are high on our wishlist and we would love to contribute on these when we have the bandwidth.

Big table join. It is very common for our queries to join tables as we have a lot of normalized data in our DW. We are excited to see that distributed hash join is now supported and plan to check it out. Sort-merge join would likely be useful to solve some of the even bigger join use cases that we have.

Graceful shrink.  Given Presto is used for our ad hoc use cases, and given we run it in the cloud, it would be most efficient if we could scale up the cluster during peak hours (mostly work hours) and scale down during trough hours (night time or weekends). If Presto nodes can be blacklisted and gracefully drained before shutdown, we could combine that with available JMX metrics to do heuristic-based auto expand/shrink of the cluster.

Key takeaway

Presto makes the lives of our users a lot easier. It tremendously improves their productivity.

We have learned from our experience that getting involved and contributing back to open source technologies is the best way to make sure it works for our use cases in a fast paced and evolving environment. We have been working closely with the Facebook team to discuss our use cases and align priorities. They have been open about their roadmap, quick in adding new features, and helpful in providing feedback to our contributions. We look forward to continuing to work with them and the community to make Presto even better and more comprehensive. Let us know if you are interested in sharing your experiences using Presto.

Last but not least, the Big Data Platform team at Netflix has been heads-down innovating on our platform to meet our growing business needs. We will share more of our experiences with our Parquet FF migration and Genie 2 upgrade in upcoming blog posts.

If you are interested in solving big data problems like ours, we would like to hear from you!

Thursday, October 2, 2014

A State of Xen - Chaos Monkey & Cassandra

On Sept 25th, 2014 AWS notified users about an EC2 Maintenance where “a timely security and operational update” needed to be performed that required rebooting a large number of instances. (around 10%)  On Oct 1st, 2014 AWS sent an updated about the status of the reboot and XSA-108.

While we’d love to claim that we weren’t concerned at all given our resilience strategy, the reality was that we were on high alert given the potential of impact to our services.  We discussed different options, weighed the risks and monitored our services closely.  We observed that our systems handled the reboots extremely well with the resilience measures we had in place.  These types of unforeseen events reinforce regular, controlled chaos and continued to invest in chaos engineering is necessary. In fact, Chaos Monkey was mentioned as a best practice in the latest EC2 Maintenance update.

Our commitment to induced chaos testing helps drive resilience, but it definitely isn’t trivial or easy; especially in the case of stateful systems like Cassandra. The Cloud Database Engineering team at Netflix rose to the challenge to embrace chaos and runs chaos monkey live in production last year.  The number of nodes rebooted served as true battle testing for the resilience design measures created to operate cassandra.

Monkeying with the Database

Databases have long been the pampered and spoiled princes of the application world. They received the best hardware, copious amounts of personalized attention and no one would ever dream of purposely mucking around with them. In the world of democratized Public Clouds, this is no longer possible. Node failures are not just probable, they are expected. This requires database technology that can withstand failure and continue to perform.
Cassandra, Netflix’s database of choice, straddles the AP (Availability, Partition Tolerance) side of the CAP theorem. By trading away C (Consistency), we’ve made a conscious decision to design our applications with eventual consistency in mind. Our expectation is that Cassandra would live up to its side of the bargain and provide strong availability and partition tolerance. Over the years, it had demonstrated fairly good resilience to failure. However, it required lots of human intervention.
Last year we decided to invest in automating the recovery of failed Cassandra nodes. We were able to detect and determine a failed node. With the cloud APIs afforded to us by AWS, we can identify the location of the failed node and programmatically initiate the replacement and bootstrap of a new Cassandra node. This gave us the confidence to have Cassandra participate in our Chaos Monkey exercises.
It wasn’t perfect at first, but then again, what is? In true Netflix fashion, we failed fast and fixed forward. Over the next few months, our automation got better. There were less false positives, and our remediation scripts required almost no more human intervention.


When we got the news about the emergency EC2 reboots, our jaws dropped. When we got the list of how many Cassandra nodes would be affected, I felt ill. Then I remembered all the Chaos Monkey exercises we’ve gone through. My reaction was, “Bring it on!”.” - Christos Kalantzis - Engineering Manager, Cloud Database Engineering
That weekend our on-call staff was exceptionally vigilant. The whole Cloud Database Engineering team was on high alert. We have confidence in our automation but a prudent team prepares for the worst and hopes for the best.
Out of our 2700+ production Cassandra nodes, 218 were rebooted. 22 Cassandra nodes were on hardware that did not reboot successfully. This led to those Cassandra nodes not coming back online. Our automation detected the failed nodes and replaced them all, with minimal human intervention. Netflix experienced 0 downtime that weekend.

Repeatedly and regularly exercising failure, even in the persistence layer, should be part of every company’s resilience planning. If it wasn’t for Cassandra’s participation in Chaos Monkey, this story would have ended much differently.
by Bruce Wong, Engineering Manager - Chaos Engineering and Christos Kalantzis, Engineering Manager - Cloud Database Engineering

Thursday, September 25, 2014

Inviso: Visualizing Hadoop Performance

by: Daniel C. Weeks

inviso-lg 2.png

In a post last year we discussed our big data architecture and the advantages of working with big data in the cloud (read more here).  One of the key points from the article is that Netflix leverages Amazon’s Simple Storage Service (S3) as the “source of truth” for all data warehousing.  This differentiates us from the more traditional configuration where Hadoop’s distributed file system is the storage medium with data and compute residing in the same cluster.  Decentralizing the data warehouse frees us to explore new ways to manage big data infrastructure but also introduces a new set of challenges.

From a platform management perspective, being able to run multiple clusters isolated by concerns is both convenient and effective.  We experiment with new software and perform live upgrades by simply diverting jobs from one cluster to another or adjust the size and number of clusters based on need as opposed to capacity.  Genie, our execution service, abstracts the configuration and resource management for job submissions by providing a centralized service to query across all big data resources.  This cohesive infrastructure abstracts all of the orchestration from the execution and allows the platform team to be flexible and adapt to dynamic environments without impacting users of the system.

However, as a user of the system, understanding where and how a particular job executes can be confusing.  We have hundreds of platform users ranging from running casual queries to ETL developers and data scientists running tens to hundreds of queries every day.  Navigating the maze of tools, logs, and data to gather information about a specific run can be difficult and time consuming.  Some of the most common questions we hear are:

Why did my job run slower today than yesterday?
Can we expand the cluster to speed up my job?
What cluster did my job run on?
How do I get access to task logs?

These questions can be hard to answer in our environment because clusters are not persistent.  By the time someone notices a problem, the cluster that ran the query, along with detailed information, may already be gone or archived.

To help answer these questions and empower our platform users to explore and improve their job performance, we created a tool: Inviso (latin: to go to see, visit, inspect, look at).  Inviso is a job search and visualization tool intended to help big data users understand execution performance. Netflix is pleased to add Inviso to our open source portfolio under the Apache License v2.0 and is available on github.  

Inviso provides an easy interface to find jobs across all clusters, access other related tools, visualize performance, make detailed information accessible, and understand the environment in which jobs run.  

Searching for Jobs
Finding a specific job run should be easy, but with each Hive or Pig script abstracting multiple Hadoop jobs, finding and pulling together the full execution workflow can be painful. To simplify this process, Inviso indexes every job configuration across all clusters into ElasticSearch and provides a simple search interface to query.  Indexing job configurations into ElasticSearch is trivial because the structure is simple and flat.  With the ability to use the full lucene query syntax, finding jobs is straightforward and powerful.

The search results are displayed in a concise table reverse ordered by time with continuous scrollback and links to various tools like the job history page, Genie, or Lipstick.  Clicking the links will take you directly to the specific page for that job.  Being able to look back over months of different runs of the same job allows for detailed analysis of how the job evolves over time.  

In addition to the interface provided by Inviso, the ElasticSearch index is quite effective for other use cases.  Since the index contains the full text of hive or pig script, searching for table or UDF usage is possible as well.  Internally, we use the index to search for dependencies and scripts when modifying/deprecating/upgrading datasources, UDFs, etc.  For example, when we last upgraded Hive, the new version had keyword conflicts with some existing scripts and we were able to identify the scripts and owners to upgrade prior to rolling out the new version of Hive.  Others use it to identify who is using a specific table in case they want to change the structure or retire the table.

Visualizing Job Performance
Simply finding a job and the corresponding hadoop resources doesn’t make it any easier to understand the performance.  Stages of a Hive or Pig script might execute in serially or parallel impacting the total runtime.  Inviso correlates the various stages and lays them out in a swimlane diagram to show the parallelism.  Hovering over a job provides detailed information including the full set of counters.  The stages taking the longest time and where to focus effort to improve performance is readily apparent.
Overview Diagram Showing Stages of a Pig Job
Below the workflow diagram is a detailed task diagram for each job showing the individual execution of every task attempt.  Laying these out in time order shows how tasks were allocated and executed.  This visual presentation can quickly convey obvious issues with jobs including data skew, slow attempts, inconsistent resource allocation, speculative execution, and locality.  Visualizing job performance in this compact format allows users to quickly scan the behavior of many jobs for problems.  Hovering over an individual task will bring up task specific details including counters making it trivial to compare task details and performance.

Diagram Showing Task Details
Diagram of Execution Locality
Tasks are ordered by scheduler allocation providing insight into how many resources were available at the time and how long it took for the attempt to start.  The color indicates the task type or status.  Failed or killed tasks even present the failure reason and stack trace, so delving into the logs isn’t necessary.  If you do want to look at a specific task log, simply select the task and click the provided link to go directly to the log for that task.

The detailed information used to populate this view comes directly from the job history file produced for every mapreduce job.  Inviso has a single REST endpoint to parse the detailed information for a job and represent it as a json structure.  While this capability is similar to what the MapReduce History Server REST API provides, the difference is that Inviso provides the complete structure in a single response.  Gathering this information from the History Server would require thousands of requests with the current API and could impact the performance of other tools that rely on the history server such as Pig and Hive clients.  We also use this REST API to collect metrics to aggregate job statistics and identify performance issues and failure causes.

Cluster Performance
With job performance we tend to think of how a job will run in isolation, but that’s rarely the case in any production environment.  At Netflix, we have clusters isolated by concen: multiple clusters for production jobs, ad-hoc queries, reporting, and some dedicated to smaller efforts (e.g. benchmarking, regression testing, test data pipeline).  The performance of any specific run is a function of the cluster capacity and the allocation assigned by the scheduler.  If someone is running a job at the same time as our ETL pipeline, which has higher weight, they might get squeezed out due to the priority we assign ETL.

Similar to how Inviso indexes job configurations, REST endpoints are polled on the Resource Manager to get the current metrics for all clusters and indexes the results into ElasticSearch.  With this information we can query and reconstitute the state of the cluster for any timespan going back days or months.  So even though the a cluster may be gone or the job details are purged from the system, you can look back at how busy the cluster was when a job ran to determine if the performance was due to congestion.

Application Stream: Running applications with ETL and Data Pipeline Activity Highlighted
In a second graph on the same page, Inviso displays the capacity and backlog on the cluster using the running, reserved, and pending task metrics available from the Resource Manager’s REST API.  This view has a range selector to adjust the timespan in the first graph and looks back over a longer period.

This view provides a way to gauge the load and backlog of the cluster.  When large jobs are submitted the pending tasks will spike and slowly drain as the cluster works them off.  If the cluster is unable to work off the backlog, the cluster might need to be expanded.  Another insight this view provides is periods when clusters have underutilized capacity.  For example, ad-hoc clusters are used less frequently at night, which is an opportune time to run a large backfill job.  Inviso makes these types of usage patterns clear so we can shift resources or adjust usage patterns to take full advantage of the cluster.

Resource Stream: Showing Active Containers and Backlog of Tasks

Putting it all Together
With the increasing size and complexity of Hadoop deployments, being able to locate and understand performance is key to running an efficient platform.  Inviso provides a convenient view of the inner workings of jobs and platform.  By simply overlaying a new view on existing infrastructure, Inviso can operate inside any Hadoop environment with a small footprint and provide easy access and insight.  

Given an existing cluster (in the datacenter or cloud), setting up Inviso should only take a few minutes, so give it a shot.  If you like it and want to make it better, send some pull requests our way.