Tuesday, October 7, 2014

Using Presto in our Big Data Platform on AWS

by: Eva Tse, Zhenxiao Luo, Nezih Yigitbasi @ Big Data Platform team


At Netflix, the Big Data Platform team is responsible for building a reliable data analytics platform shared across the whole company. In general, Netflix product decisions are very data driven. So we play a big role in helping different teams to gain product and consumer insights from a multi-petabyte scale data warehouse (DW). Their use cases range from analyzing A/B tests results to analyzing user streaming experience to training data models for our recommendation algorithms.

We shared our overall architecture in a previous blog post. The underpinning of our big data platform is that we leverage AWS S3 for our DW. This architecture allows us to separate compute and storage layers. It allows multiple clusters to share the same data on S3 and clusters can be long-running and yet transient (for flexibility). Our users typically write Pig or Hive jobs for ETL and data analytics.

A small subset of the ETL output and some aggregated data is transferred to Teradata for interactive querying and reporting. On the other hand, we also have the need to do low latency interactive data exploration on our broader data set on S3. These are the use cases that Presto serves exceptionally well. Seven months ago, we first deployed Presto into production and it is now an integral part of our data ecosystem. In this blog post, we would like to share our experience with Presto and how we made it work for us!

Why Presto?

We had been in search of an interactive querying engine that could work well for us. Ideally, we wanted an open source project that could handle our scale of data & processing needs, had great momentum, was well integrated with the Hive metastore, and was easy for us to integrate with our DW on S3. We were delighted when Facebook open sourced Presto.

In terms of scale, we have a 10 petabyte data warehouse on S3. Our users from different organizations query diverse data sets across expansive date ranges. For this use case, caching a specific dataset in memory would not work because cache hit rate would be extremely low unless we have an unreasonably large cache. The streaming DAG execution architecture of Presto is well-suited for this sporadic data exploration usage pattern.

In terms of integrating with our big data platform, Presto has a connector architecture that is Hadoop friendly. It allows us to easily plug in an S3 file system. We were up and running in test mode after only a month of work on the S3 file system connector in collaboration with Facebook.

In terms of usability, Presto supports ANSI SQL, which has made it very easy for our analysts and developers to get rolling with it.  As far as limitations / drawbacks, user-defined functions in Presto are more involved to develop, build, and deploy as compared to Hive and Pig. Also, for users who want to productionize their queries, they need to rewrite them in HiveQL or Pig Latin, as we don’t currently use Presto in our critical production pipelines. While there are some minor inconveniences, the benefits of being able to interactively analyze large amounts of data is a huge win for us.

Finally, Presto was already running in production at Facebook. We did some performance benchmarking and stress testing and we were impressed. We also looked under the hood and saw well designed and documented Java code. We were convinced!

Our production environment and use cases

Currently, we are running with ~250 m2.4xlarge EC2 worker instances and our coordinator is on r3.4xlarge. Our users run ~2500 queries/workday. Our Presto cluster is completely isolated from our Hadoop clusters, though they all access the same data on our S3 DW.

Almost all of our jobs are CPU bound. We set our task memory to a rather high value (i.e., 7GB, with a slight chance in oversubscribing memory) to run some of our memory intensive queries, like big joins or aggregation queries.

We do not use disk (as we don’t use HDFS) in the cluster. Hence, we will be looking to upgrade to the current generation AWS instance type (e.g. r3), which has more memory, and has better isolation and performance than the previous generation of EC2 instances.

We are running the latest Presto 0.76 release with some outstanding pull requests that are not committed yet. Ideally, we would like to contribute everything back to open source and not carry custom patches in our deployment. We are actively working with Facebook and looking forward to committing all of our pull requests.

Presto addresses our ad hoc interactive use cases. Our users always go to Presto first for quick answers and for data exploration. If Presto does not support what they need (like big join / aggregation queries that exceed our memory limit or some specific user-defined functions that are not available), then they would go back to Hive or Pig.

We are working on a Presto user interface for our internal big data portal. Our algorithm team also built an interactive data clustering application by integrating R with Presto via an open source Python Presto client.

Performance benchmark

At a high level, we compare Presto and Hive query execution time using our own datasets and users’ queries instead of running standard benchmarks like TPC-H or TPC-DS. This way, we can translate the results back to what we can expect for our use cases. The graph below shows the results of three queries: a group-by query, a join plus a group-by query, and a needle-in-a-haystack (table scan) query. We compared the performance of Presto vs. Hive 0.11 on Hadoop 2 using Parquet input files on S3, all of which we currently use in production. Each query processed the same data set with varying data sizes between ~140GB to ~210GB depending on the file format.





Cluster setup:
40 nodes m2.4xlarge

Settings we tuned:
task.shard.max-threads=32
task.max-memory=7GB
sink.max-buffer-size=1GB
hive.s3.max-client-retries=50
hive.s3.max-error-retries=50
hive.s3.max-connections=500
hive.s3.connect-timeout=5m
hive.s3.socket-timeout=5m




We understand performance test environments and numbers are hard to reproduce. What is worth noting is the relative performance of these tests. The key takeaway is that queries that take one or two map-reduce (MR) phases in Hadoop run 10 to 100 times faster in Presto. The speedup in Presto is linear to the number of MR jobs involved. For jobs that only do a table scan (i.e., I/O bound instead of CPU bound), it is highly dependent on the read performance of the file format used. We did some work on Presto / Parquet integration, which we will cover in the next section.

Our Presto contributions

The primary and initial piece of work that made Presto work for us was S3 FileSystem integration. In addition, we also worked on optimizing S3 multipart upload. We also made a few enhancements and bug fixes based on our use cases along the way: disabling recursive directory listing, json tuple generation, foreground metastore refresh, mbean for S3 filesystem monitoring, and handling S3 client socket timeout.

In general, we are committed to make Presto work better for our users and to cover more of their needs. Here are a few big enhancements that we are currently working on:

Parquet file format support

We recently upgraded our DW to use the Parquet file format (FF) for its performance on S3 and for its flexibility to integrate with different data processing engines. Hence, we are committed to make Presto work better with Parquet FF.  (For details on why we chose Parquet and what we contributed to make it work in our environment, stay tuned for an upcoming blog post).

Developing based on Facebook’s initial Parquet integration, we added support for predicate pushdown, column position based access (instead of name based access) to Parquet columns, and data type coercion. For context, we use the Hive metastore as our source of truth for metadata, and we do schema evolution on the Hive metastore. Hence, we need column position based access to work with our Hive metastore instead of using the schema information stored in Parquet files.

Here is a comparison of Presto job execution times among different FFs. We compare read performance of sequence file (a FF we have historically used), ORCFile (we benchmarked the latest integration with predicate pushdown, vectorization and lazy materialization on read) and Parquet. We also compare the performance on S3 vs. HDFS. In this test, we use the same data sets and environment as the above benchmark test. The query is a needle-in-a-haystack query that does a select and filter on a condition that returns zero rows.

Screen Shot 2014-09-30 at 2.29.03 PM.png

As next step, we will look into improving Parquet performance further by doing predicate pushdown to eliminate whole row groups, vectorization and lazy materialization on read. We believe this will make Parquet performance on par with ORC files.

ODBC / JDBC support

This is one of the biggest asks from our users. Users like to connect to our Hive DW directly to do exploratory / ad hoc reporting because it has the full dataset. Given Presto is interactive and integrated with Hive metastore, it is a natural fit.

Presto has a native ODBC driver that was recently open sourced. We made a few bug fixes and we are working on more enhancements. Overall, it is working well now for our Tableau users in extract (non-live exploration) mode. For our users who prefer to use Microstrategy, we plan to explore different options to integrate with it next.

Map data type support

All the event data generated from our Netflix services and Netflix-enabled devices comes through our Suro pipeline before landing in our DW. For flexibility, this event data is structured as key/value pairs, which get automatically stored in map columns in our DW. Users may pull out keys as a top level columns in the Hive metastore by adjusting some configurations in our data pipeline. Still, a large number of key/value pairs remain in the map because there are a large number of keys and the key space is very sparse.

It is very common for users to lookup a specific key from the map. With our current Parquet integration, looking up a key from the map column means converting the column to JSON string first then parsing it. Facebook recently added native support for array and map data types. We plan to further enhance it to support array element or map key specific column pruning and predicate pushdown for Parquet FF to improve performance.

Our wishlist

There are still a couple of items that are high on our wishlist and we would love to contribute on these when we have the bandwidth.

Big table join. It is very common for our queries to join tables as we have a lot of normalized data in our DW. We are excited to see that distributed hash join is now supported and plan to check it out. Sort-merge join would likely be useful to solve some of the even bigger join use cases that we have.

Graceful shrink.  Given Presto is used for our ad hoc use cases, and given we run it in the cloud, it would be most efficient if we could scale up the cluster during peak hours (mostly work hours) and scale down during trough hours (night time or weekends). If Presto nodes can be blacklisted and gracefully drained before shutdown, we could combine that with available JMX metrics to do heuristic-based auto expand/shrink of the cluster.

Key takeaway

Presto makes the lives of our users a lot easier. It tremendously improves their productivity.

We have learned from our experience that getting involved and contributing back to open source technologies is the best way to make sure it works for our use cases in a fast paced and evolving environment. We have been working closely with the Facebook team to discuss our use cases and align priorities. They have been open about their roadmap, quick in adding new features, and helpful in providing feedback to our contributions. We look forward to continuing to work with them and the community to make Presto even better and more comprehensive. Let us know if you are interested in sharing your experiences using Presto.

Last but not least, the Big Data Platform team at Netflix has been heads-down innovating on our platform to meet our growing business needs. We will share more of our experiences with our Parquet FF migration and Genie 2 upgrade in upcoming blog posts.

If you are interested in solving big data problems like ours, we would like to hear from you!


Thursday, October 2, 2014

A State of Xen - Chaos Monkey & Cassandra

On Sept 25th, 2014 AWS notified users about an EC2 Maintenance where “a timely security and operational update” needed to be performed that required rebooting a large number of instances. (around 10%)  On Oct 1st, 2014 AWS sent an updated about the status of the reboot and XSA-108.


While we’d love to claim that we weren’t concerned at all given our resilience strategy, the reality was that we were on high alert given the potential of impact to our services.  We discussed different options, weighed the risks and monitored our services closely.  We observed that our systems handled the reboots extremely well with the resilience measures we had in place.  These types of unforeseen events reinforce regular, controlled chaos and continued to invest in chaos engineering is necessary. In fact, Chaos Monkey was mentioned as a best practice in the latest EC2 Maintenance update.


Our commitment to induced chaos testing helps drive resilience, but it definitely isn’t trivial or easy; especially in the case of stateful systems like Cassandra. The Cloud Database Engineering team at Netflix rose to the challenge to embrace chaos and runs chaos monkey live in production last year.  The number of nodes rebooted served as true battle testing for the resilience design measures created to operate cassandra.


Monkeying with the Database

Databases have long been the pampered and spoiled princes of the application world. They received the best hardware, copious amounts of personalized attention and no one would ever dream of purposely mucking around with them. In the world of democratized Public Clouds, this is no longer possible. Node failures are not just probable, they are expected. This requires database technology that can withstand failure and continue to perform.
Cassandra, Netflix’s database of choice, straddles the AP (Availability, Partition Tolerance) side of the CAP theorem. By trading away C (Consistency), we’ve made a conscious decision to design our applications with eventual consistency in mind. Our expectation is that Cassandra would live up to its side of the bargain and provide strong availability and partition tolerance. Over the years, it had demonstrated fairly good resilience to failure. However, it required lots of human intervention.
Last year we decided to invest in automating the recovery of failed Cassandra nodes. We were able to detect and determine a failed node. With the cloud APIs afforded to us by AWS, we can identify the location of the failed node and programmatically initiate the replacement and bootstrap of a new Cassandra node. This gave us the confidence to have Cassandra participate in our Chaos Monkey exercises.
It wasn’t perfect at first, but then again, what is? In true Netflix fashion, we failed fast and fixed forward. Over the next few months, our automation got better. There were less false positives, and our remediation scripts required almost no more human intervention.

AWS RE:BOOT

When we got the news about the emergency EC2 reboots, our jaws dropped. When we got the list of how many Cassandra nodes would be affected, I felt ill. Then I remembered all the Chaos Monkey exercises we’ve gone through. My reaction was, “Bring it on!”.” - Christos Kalantzis - Engineering Manager, Cloud Database Engineering
That weekend our on-call staff was exceptionally vigilant. The whole Cloud Database Engineering team was on high alert. We have confidence in our automation but a prudent team prepares for the worst and hopes for the best.
Out of our 2700+ production Cassandra nodes, 218 were rebooted. 22 Cassandra nodes were on hardware that did not reboot successfully. This led to those Cassandra nodes not coming back online. Our automation detected the failed nodes and replaced them all, with minimal human intervention. Netflix experienced 0 downtime that weekend.

Repeatedly and regularly exercising failure, even in the persistence layer, should be part of every company’s resilience planning. If it wasn’t for Cassandra’s participation in Chaos Monkey, this story would have ended much differently.
by Bruce Wong, Engineering Manager - Chaos Engineering and Christos Kalantzis, Engineering Manager - Cloud Database Engineering

Thursday, September 25, 2014

Inviso: Visualizing Hadoop Performance

by: Daniel C. Weeks

inviso-lg 2.png


In a post last year we discussed our big data architecture and the advantages of working with big data in the cloud (read more here).  One of the key points from the article is that Netflix leverages Amazon’s Simple Storage Service (S3) as the “source of truth” for all data warehousing.  This differentiates us from the more traditional configuration where Hadoop’s distributed file system is the storage medium with data and compute residing in the same cluster.  Decentralizing the data warehouse frees us to explore new ways to manage big data infrastructure but also introduces a new set of challenges.


From a platform management perspective, being able to run multiple clusters isolated by concerns is both convenient and effective.  We experiment with new software and perform live upgrades by simply diverting jobs from one cluster to another or adjust the size and number of clusters based on need as opposed to capacity.  Genie, our execution service, abstracts the configuration and resource management for job submissions by providing a centralized service to query across all big data resources.  This cohesive infrastructure abstracts all of the orchestration from the execution and allows the platform team to be flexible and adapt to dynamic environments without impacting users of the system.


However, as a user of the system, understanding where and how a particular job executes can be confusing.  We have hundreds of platform users ranging from running casual queries to ETL developers and data scientists running tens to hundreds of queries every day.  Navigating the maze of tools, logs, and data to gather information about a specific run can be difficult and time consuming.  Some of the most common questions we hear are:


Why did my job run slower today than yesterday?
Can we expand the cluster to speed up my job?
What cluster did my job run on?
How do I get access to task logs?


These questions can be hard to answer in our environment because clusters are not persistent.  By the time someone notices a problem, the cluster that ran the query, along with detailed information, may already be gone or archived.


To help answer these questions and empower our platform users to explore and improve their job performance, we created a tool: Inviso (latin: to go to see, visit, inspect, look at).  Inviso is a job search and visualization tool intended to help big data users understand execution performance. Netflix is pleased to add Inviso to our open source portfolio under the Apache License v2.0 and is available on github.  


Inviso provides an easy interface to find jobs across all clusters, access other related tools, visualize performance, make detailed information accessible, and understand the environment in which jobs run.  


Searching for Jobs
Finding a specific job run should be easy, but with each Hive or Pig script abstracting multiple Hadoop jobs, finding and pulling together the full execution workflow can be painful. To simplify this process, Inviso indexes every job configuration across all clusters into ElasticSearch and provides a simple search interface to query.  Indexing job configurations into ElasticSearch is trivial because the structure is simple and flat.  With the ability to use the full lucene query syntax, finding jobs is straightforward and powerful.


The search results are displayed in a concise table reverse ordered by time with continuous scrollback and links to various tools like the job history page, Genie, or Lipstick.  Clicking the links will take you directly to the specific page for that job.  Being able to look back over months of different runs of the same job allows for detailed analysis of how the job evolves over time.  




In addition to the interface provided by Inviso, the ElasticSearch index is quite effective for other use cases.  Since the index contains the full text of hive or pig script, searching for table or UDF usage is possible as well.  Internally, we use the index to search for dependencies and scripts when modifying/deprecating/upgrading datasources, UDFs, etc.  For example, when we last upgraded Hive, the new version had keyword conflicts with some existing scripts and we were able to identify the scripts and owners to upgrade prior to rolling out the new version of Hive.  Others use it to identify who is using a specific table in case they want to change the structure or retire the table.


Visualizing Job Performance
Simply finding a job and the corresponding hadoop resources doesn’t make it any easier to understand the performance.  Stages of a Hive or Pig script might execute in serially or parallel impacting the total runtime.  Inviso correlates the various stages and lays them out in a swimlane diagram to show the parallelism.  Hovering over a job provides detailed information including the full set of counters.  The stages taking the longest time and where to focus effort to improve performance is readily apparent.
Overview Diagram Showing Stages of a Pig Job
Below the workflow diagram is a detailed task diagram for each job showing the individual execution of every task attempt.  Laying these out in time order shows how tasks were allocated and executed.  This visual presentation can quickly convey obvious issues with jobs including data skew, slow attempts, inconsistent resource allocation, speculative execution, and locality.  Visualizing job performance in this compact format allows users to quickly scan the behavior of many jobs for problems.  Hovering over an individual task will bring up task specific details including counters making it trivial to compare task details and performance.


Diagram Showing Task Details
Diagram of Execution Locality
Tasks are ordered by scheduler allocation providing insight into how many resources were available at the time and how long it took for the attempt to start.  The color indicates the task type or status.  Failed or killed tasks even present the failure reason and stack trace, so delving into the logs isn’t necessary.  If you do want to look at a specific task log, simply select the task and click the provided link to go directly to the log for that task.


The detailed information used to populate this view comes directly from the job history file produced for every mapreduce job.  Inviso has a single REST endpoint to parse the detailed information for a job and represent it as a json structure.  While this capability is similar to what the MapReduce History Server REST API provides, the difference is that Inviso provides the complete structure in a single response.  Gathering this information from the History Server would require thousands of requests with the current API and could impact the performance of other tools that rely on the history server such as Pig and Hive clients.  We also use this REST API to collect metrics to aggregate job statistics and identify performance issues and failure causes.


Cluster Performance
With job performance we tend to think of how a job will run in isolation, but that’s rarely the case in any production environment.  At Netflix, we have clusters isolated by concen: multiple clusters for production jobs, ad-hoc queries, reporting, and some dedicated to smaller efforts (e.g. benchmarking, regression testing, test data pipeline).  The performance of any specific run is a function of the cluster capacity and the allocation assigned by the scheduler.  If someone is running a job at the same time as our ETL pipeline, which has higher weight, they might get squeezed out due to the priority we assign ETL.


Similar to how Inviso indexes job configurations, REST endpoints are polled on the Resource Manager to get the current metrics for all clusters and indexes the results into ElasticSearch.  With this information we can query and reconstitute the state of the cluster for any timespan going back days or months.  So even though the a cluster may be gone or the job details are purged from the system, you can look back at how busy the cluster was when a job ran to determine if the performance was due to congestion.

Application Stream: Running applications with ETL and Data Pipeline Activity Highlighted
In a second graph on the same page, Inviso displays the capacity and backlog on the cluster using the running, reserved, and pending task metrics available from the Resource Manager’s REST API.  This view has a range selector to adjust the timespan in the first graph and looks back over a longer period.


This view provides a way to gauge the load and backlog of the cluster.  When large jobs are submitted the pending tasks will spike and slowly drain as the cluster works them off.  If the cluster is unable to work off the backlog, the cluster might need to be expanded.  Another insight this view provides is periods when clusters have underutilized capacity.  For example, ad-hoc clusters are used less frequently at night, which is an opportune time to run a large backfill job.  Inviso makes these types of usage patterns clear so we can shift resources or adjust usage patterns to take full advantage of the cluster.


Resource Stream: Showing Active Containers and Backlog of Tasks

Putting it all Together
With the increasing size and complexity of Hadoop deployments, being able to locate and understand performance is key to running an efficient platform.  Inviso provides a convenient view of the inner workings of jobs and platform.  By simply overlaying a new view on existing infrastructure, Inviso can operate inside any Hadoop environment with a small footprint and provide easy access and insight.  

Given an existing cluster (in the datacenter or cloud), setting up Inviso should only take a few minutes, so give it a shot.  If you like it and want to make it better, send some pull requests our way.