Monday, January 28, 2013

Announcing Ribbon: Tying the Netflix Mid-Tier Services Together

by Allen Wang and Sudhir Tonse

Netflix embraces a fine-grained Service Oriented Architecture as the underpinning of its Cloud based deployment model. Currently, we run hundreds of such fine grained services that are collectively responsible in handling the customer facing requests via a few "Edge Services" such as the Netflix API Service. A lightweight REST based protocol is the choice for inter process communication amongst these services.

The Netflix Internal Web Service Framework (aka NIWS) forms the bedrock of this architecture in the realm of communication. Together with the previously announced Eureka, which aids in service discovery, NIWS provides all the components pertinent for making REST calls.

NIWS is comprised of a REST client and server framework, based on JSR-311 which is a RESTful API specification for Java. Our services use various payload data serialization formats such as Avro, XML, JSON, Thrift and Google Protocol Buffers. NIWS provides the mechanism for serializing and deserializing the data.

Today, we are happy to announce Ribbon as the latest offering of our hugely popular and growing Open Source libraries hosted on GitHub.

Ribbon, as a first cut, mainly offers the client side software load balancing algorithms which have been battle tested at Netflix along with a few of the other components that form our Inter Process Communication stack (aka NIWS). We plan to continue open sourcing the rest the of the NIWS stack in the coming months. Please note that the loadbalancers mentioned are the internal client-side loadbalancers used alongside Eureka that are primarily used for load balancing requests to our mid-tier services. For our public facing Edge Services we continue to use Amazon's ELB Service

Deployment Topology

DiagramTypical (representative) deployment architecture at Netflix.

A typical deployment architecture at Netflix is a multi-region, multi-zone deployment which aids in better availability and resiliency.
The Amazon ELB provides load balancing for customer/device facing requests while internal mid-tier requests are handled via the Ribbon framework.

Eureka provides the service registry for all Netflix services. Ribbon clients are typically created and configured for each of the target services. Ribbon's Client component offers a good set of configuration options such as connection timeouts, retries, retry algorithm (exponential, bounded backoff)  etc.
Ribbon comes built in with a pluggable and customizable LoadBalancing component. Some of the load balancing strategies offered are listed below, with more to follow.

  • Simple Round Robin LB
  • Weighted Response Time LB
  • Zone Aware Round Robin LB
  • Random LB

Battle Tested Features

The main benefit of Ribbon is that it offers a simple inter process communication mechanism with features built based on our operational learnings and experience in the Amazon Cloud. Ribbon's Zone Aware Load Balancer for example, is built with circuit tripping logic and can be configured to favor the target service instance based on Zone Affinity (i.e it favors the zone in which the calling service itself is hosted, thus benefiting from reduced latency and savings in cost). It monitors the operational behavior of the instances running in each zone and has the capability to quickly (at real time) drop an entire Zone out of rotation. This helps us be resilient in the face of Zone outages as described in a prior blog post.

Zone Aware Load Balancer

The picture above shows the Zone Aware LoadBalancer in action.  The LoadBalancer will do the following when picking a server:
  1. The LoadBalancer will calculate and examine zone stats of all available zones. If the active requests per server for any zone has reached a configured threshold, this zone will be dropped from the active server list. In case more than one zone has reached the threshold, the zone with the most active requests per server will be dropped.
  2. Once the the worst zone is dropped, a zone will be chosen among the rest with the probability proportional to its number of instances.
  3. A server will be returned from the chosen zone with a given Rule (A Rule is a loadbalacing strategy, for example a simple Round Robin Rule)
For each request, the steps above will be repeated. That is to say, each zone related load balancing decisions are made at real time with the up-to-date statistics aiding the choice.

Some of the features of Ribbon are listed below with more to follow.

  • Easy integration with a Service Discovery component such as Netflix's Eureka
  • Runtime configuration using Archaius
  • Operational Metrics exposed via JMX and published via Servo
  • Multiple and pluggable serialization choices (via JSR-311, Jersey)
  • Asynchronous and Batch operations (coming up)
  • Automatic SLA framework (coming up)
  • Administration/Metrics console (coming up)
Please visit the wiki pages at GitHub for detailed documentation on Ribbon.

At Netflix, we typically wrap our REST calls made via Ribbon using Hystrix which provides Latency and Fault Tolerance in Distributed Systems.

If you would like to contribute to our highly scalable libraries and frameworks for ephemeral distributed environments, please take a look at You can follow us on twitter at @NetflixOSS.
We will be hosting a NetflixOSS Open House on the 6th of February, 2013 (Limited seats. RSVP needed).

We are constantly looking for great talent to join us and we welcome you to take a look at our Jobs page or contact @stonse for positions in the Cloud Platform Infrastructure team.


  1. Netflix Open Source Dashboard
  2. Ribbon
  3. Eureka (Service Discovery and Metadata)
  4. Archaius (Dynamic configurations)
  5. Hystrix (Latency and Fault Tolerance)
  6. Simian Army

Friday, January 18, 2013

NetflixGraph Metadata Library: An Optimization Case Study

by Drew Koszewnik

Here at Netflix, we serve more than 30 million subscribers across over 40 countries. These users collectively generate billions of requests per day, most of which require metadata about our videos. Each call we receive can potentially cull through thousands of video metadata attributes. In order to minimize the latency at which this data can be accessed, we generally store most of it directly in RAM on the servers responsible for servicing live traffic.

We have two main applications that package and deliver this data to the servers which enable all of our user experiences -- from playing movies on the thousands of devices we support to just checking out our selection with their cell phones:
  • VMS, our Video Metadata Platform, and
  • NetflixGraph, which contains data representable as a directed graph
VMS is responsible for packaging data about videos such as synopses, titles, as well as data about video artwork and streams. NetflixGraph contains data about relationships between entities like videos, characters, and tags (e.g. gritty, quirky, funny). This data enables the highly personalized service our users enjoy. Remember when you let Netflix know that you enjoyed Exciting movies? Fantasy movies? Remember when you enjoyed watching Spider-Man? That’s why we decided to recommend Captain America to you. In the directed graph NetflixGraph represents, Captain America is connected to many of the things we’ve discovered explicitly and implicitly about you.

This article specifically details how we achieved a 90% reduction in the memory footprint of NetflixGraph. The results of this work will be open-sourced in the coming months.


We constantly need to be aware of the memory footprints on our servers at Netflix. NetflixGraph presented a great opportunity for experimentation with reduction of memory footprints. The lessons and techniques we learned from this exercise have had a positive impact towards other applications within Netflix and, we hope, can have applications outside of Netflix as well.


The first step in the optimization of any specific resource is to become familiar with the biggest consumers of that resource. After all, it wouldn't make much sense to shrink a data structure that consumes a negligible amount of memory; it wouldn’t be an optimal use of engineering time.

We started by creating a small test application which loaded only sample NetflixGraph data, then we took a heap dump of that running process. A histogram from this dump (shown below in Eclipse Memory Analyzer) shows us the types of objects which are the largest memory consumers:

From this histogram, we can clearly see that HashMapEntry objects and arrays of HashMapEntry objects are our largest consumers by far. In fact, these structural elements themselves consumed about 83% of our total memory footprint. Upon inspection of the code, the reason for this finding was not surprising. The relationships between objects in our directed graph were generally represented with HashMaps, where HashSets of “to” objects were keyed by “from” objects. For example, the set of genres to which a video belongs would have been represented with a HashMap<Video, HashSet<Genre>>. In this map, the Video object representing “Captain America” might have been the key for a Set containing the Genres “Action”, “Adventure”, “Comic Books & Superheroes”, and maybe, in typical Netflix fashion, the very specific “Action & Adventure Comic Book Movies Depicting Scenes from World War II”.

Solution: Compact Encoded Data Representation

We knew that we could hold the same data in a more memory-efficient way. We created a library to represent directed-graph data, which we could then overlay with the specific schema we needed.

We start by numbering each of our unique objects from 1 to n. The value that each object gets assigned we refer to as an "ordinal". Once each object is numbered, we need a data structure which will maintain the relationships between ordinals. Let’s take an example: the figure below represents a few nodes in our graph which have been assigned ordinals and which are connected to each other by some property.

Internal to the graph data structure, we refer to each object only by its assigned ordinal. This way, we avoid using expensive 64-bit object references. Instead, the objects to which another object is connected can be represented by just a list of integers (ordinals). In the above diagram, we can see that the node which was assigned the ordinal “2” is connected to nodes 3, 5, and 7. These connections are of course fully representable by just the list of integers [ 3, 5, 7 ].

Our data structure maintains two arrays. One is an integer array, and the other is a byte array. Each object's connections are encoded into the byte array as delta-encoded variable-byte integers (more on this in the next paragraph). The integer array contains offsets into the byte array, such that the connections for the object represented by some ordinal are encoded starting at the byte indicated by offsetArray[ordinal].

Variable-byte encoding is a way to represent integers in a variable number of bytes, whereby smaller values are represented in fewer bytes. An excellent explanation is available here on Wikipedia. Because smaller values can be represented in fewer bytes, we benefit significantly if we can represent our connected ordinals with smaller values. If we sort the ordinals for some connection set in ascending order, we might represent each connection not by it's actual value, but by the difference between it's value and the previous value in the set. For example, if we had some ordinals [1, 2, 3, 5, 7, 11, 13], we would represent this with the values [1, 1, 1, 2, 2, 4, 2].

Of course, there’s more to our data than just nodes and connections. Each node is typed (e.g. Video, Genre, Character), and each type has a different set of properties. For example, a video may belong to several genres, which is one type of connection. But it also may depict several characters, which is a different type of connection.

The character Captain America, above, is not the same node type as the movie Captain America: The First Avenger

In order to represent these different types of nodes, and different properties for each node type, we define a schema. The schema tells us about each of our node types. For each node type, it also enumerates which properties are available to get a set of connections for.

When all connections for a node are encoded, the connection grouping for each of its properties are appended to the byte array in the order which they appear in the schema. Each group of integers representing these connections is preceded by a single integer, indicating the total number of bytes used to encode that property. In our earlier example, since each of the values [1, 1, 1, 2, 2, 4, 2] are representable with a single byte, this grouping would be preceded by the encoded value “7”, indicating that seven bytes are used to represent the connections for this property. This allows us to iteratively read how many bytes are in a given property, then skip that many bytes if we are not interested in that property.

At runtime, when we need to find the set of connections over some property for a given object, we go through the following steps:

  1. find the object's ordinal.
  2. look up the pointer into our byte array for this object.
  3. find the first property for the node type of this object in our schema.
  4. while the current property is not the property where interested in:
    4a. read how many bytes are used to represent this property.
    4b. increment our pointer by the value discovered in (4a).
  5. move to the next property in the schema.
  6. iteratively decode values from the byte array, each time adding the current value to the previous value.
  7. look up the connected objects by the returned ordinals.


When we dropped this new data structure in the existing NetflixGraph library, our memory footprint was reduced by 90%. A histogram of our test application from above, loading the exact same set of data, now looks like the following:

When to consider this solution

There is a potential disadvantage to this approach. In addition to memory footprint on our edge servers, another thing we constantly need to be cognizant of is CPU utilization. When we represented connections as HashSets, determining whether an object is connected to another object was an O(1) operation. To ask this question in the new way, our data structure requires an iteration over all values in the set, which is an O(n) operation.

Luckily, the vast majority of our access patterns for this data are full iterations over the sets, which are no slower now than they were in the previous implementation. In addition, the engineers for each of the teams responsible for maintaining our edge services are extremely vigilant, and our resource utilization is heavily scrutinized with sophisticated tools on each new deployment.


This article has discussed one of the approaches we took for compacting directed graph data in the context of one of our more recognizable data sets – deeply personalized genres. Any application with data sets which lend themselves to representation as directed graphs can benefit from this specific optimization. We will be open-sourcing the memory optimized graph component of this library in the coming months. Stay tuned!

By the way, if you’re interested in working with the amazing group of engineers who solve the scalability problems Netflix faces on a daily basis, we are looking for both a software and automation engineer on the Product Infrastructure team. At Netflix, you’ll be working with some of the most talented engineering teammates in the world. Visit to get started!

Wednesday, January 16, 2013

Reactive Programming at Netflix

Hi, my name is Jafar Husain. I'm a Senior Developer on the TV User Interface team.
Over the last year, Netflix has reinvented our client-server interaction model. One of the key building blocks of our platform is Microsoft's open-source Reactive Extensions library (Rx). Netflix is a big believer in the Rx model, because Rx has made it much easier for us to build complex asynchronous programs.

Asynchronous Programming is Hard

Events and AJAX requests are sequences of values that are pushed from the producer to the consumer asynchronously. The consumer reacts to the data as it comes in, which is why asynchronous programming is also called Reactive Programming. Every web application is a reactive program, because code reacts to events like mouse clicks, key presses, and the asynchronous arrival of data from the server.
Asynchronous programming is hard, because logical units of code have to be split across many different callbacks so that they can be resumed after async operations complete. To make matters worse, most programming languages have no facilities for propagating asynchronous errors. Asynchronous errors aren't thrown on the stack, which means that try/catch blocks are useless.

Events are Collections

The Reactive Extensions library models each event as a collection of data rather than a series of callbacks. This is a revolutionary idea, because once you model an event as a collection you can transform events in much the same way you might transform in-memory collections. Rx provides developers with a SQL-like query language that can be used to sequence, filter, and transform events. Rx also makes it possible to propagate and handle asynchronous errors in a manner similar to synchronous error handling.
Rx is currently available for JavaScript and Microsoft's .NET platform, and we're using both flavors to power our PS3 and Windows 8 clients respectively. As Ben Christensen mentioned in his post "Optimizing the Netflix API", we've also ported Rx to the Java platform so that we can use it on the server. Today, Reactive Extensions is required learning for many developers at Netflix. We've developed an online, interactive tutorial for teaching our developers Rx, and we're opening it up to the public today.

Reactive Extensions at Netflix

On a very hot day in Brisbane, I gave an interview to Channel 9 during which I discussed Rx use at Netflix in-depth. I also discussed Falkor, a new protocol we've designed for client-server communication at Netflix. Falkor provides developers with a unified model for interacting with both local and remote data, and it's built on top of Rx.
In the coming weeks we'll be blogging more about Reactive Programming, our next generation data platform, and the Falkor protocol. Today Netflix is one of very few technology companies using reactive programming on both the server and the client. If you think this is as exciting as we do, join the team!

Tuesday, January 15, 2013

Optimizing the Netflix API

by Ben Christensen

About a year ago the Netflix API team began redesigning the API to improve performance and enable UI engineering teams within Netflix to optimize client applications for specific devices. Philosophies of the redesign were introduced in a previous post about embracing the differences between the different clients and devices.

This post is part one of a series on the architecture of our redesigned API.


We had multiple goals in creating this system, as follows:

Reduce Chattiness

One of the key drivers in pursuing the redesign in the first place was to reduce the chatty nature of our client/server communication, which could be hindering the overall performance of our device implementations.

Due to the generic and granular nature of the original REST-based Netflix API, each call returns only a portion of functionality for a given user experience, requiring client applications to make multiple calls that need to be assembled in order to render a single user experience. This interaction model is illustrated in the following diagram:

To reduce the chattiness inherent in the REST API, the discrete requests in the diagram above should be collapsed into a single request optimized for a given client. The benefit is that the device then pays the price of WAN latency once and leverages the low latency and more powerful hardware server-side. As a side effect, this also eliminates redundancies that occur for every incoming request.

A single optimized request such as this must embrace server-side parallelism to at least the same level as previously achieved through multiple network requests from the client. Because the server-side parallelized requests are running in the same network, each one should be more performant than if it was executed from the device. This must be achieved without each engineer implementing an endpoint needing to become an expert in low-level threading, synchronization, thread-safety, concurrent data structures, non-blocking IO and other such concerns.

Distribute API Development

A single team should not become a bottleneck nor need to have expertise on every client application to create optimized endpoints. Rapid innovation through fast, decoupled development cycles across a wide variety of device types and distributed ownership and expertise across teams should be enabled. Each client application team should be capable of implementing and operating their own endpoints and the corresponding requests/responses.

Mitigate Deployment Risks

The Netflix API is a Java application running on hundreds of servers processing 2+ billion incoming requests a day for millions of customers around the world. The system must mitigate risks inherent in enabling rapid and frequent deployment by multiple teams with minimal coordination.

Support Multiple Languages

Engineers implementing endpoints come from a wide variety of backgrounds with expertise including Javascript, Objective-C, Java, C, C#, Ruby, Python and others. The system should be able to support multiple languages at the same time.

Distribute Operations

Each client team will now manage the deployment lifecycle of their own web service endpoints. Operational tools for monitoring, debugging, testing, canarying and rolling out code must be exposed to a distributed set of teams so teams can operate independently.


To achieve the goals above our architecture distilled into a few key points:
  • dynamic polyglot runtime
  • fully asynchronous service layer
  • Reactive programming model
The following diagram and subsequent annotations explain the architecture:

[1] Dynamic Endpoints

All new web service endpoints are now dynamically defined at runtime. New endpoints can be developed, tested, canaried and deployed by each client team without coordination (unless they depend on new functionality from the underlying API Service Layer shown at item 5 in which case they would need to wait until after those changes are deployed before pushing their endpoint).

[2] Endpoint Code Repository and Management

Endpoint code is published to a Cassandra multi-region cluster (globally replicated) via a RESTful Endpoint Management API used by client teams to manage their endpoints.

[3] Dynamic Polyglot JVM Language Runtime

Any JVM language can be supported so each team can use the language best suited to them.
The Groovy JVM language was chosen as our first supported language. The existence of first-class functions (closures), list/dictionary syntax, performance and debuggability were all aspects of our decision. Moreover, Groovy provides syntax comfortable to a wide range of developers, which helps to reduce the learning curve for the first language on the platform.

[4 & 5] Asynchronous Java API + Reactive Programming Model

Embracing concurrency was a key requirement to achieve performance gains but abstracting away thread-safety and parallel execution implementation details from the client developers was equally important in reducing complexity and speeding up their rate of innovation. Making the Java API fully asynchronous was the first step as it allows the underlying method implementations to control whether something is executed concurrently or not without the client code changing. We chose a reactive programming model with a functional programming style for handling composition and conditional flows of asynchronous callbacks. Our implementation is modeled after Rx Observables.

[6] Hystrix Fault Tolerance

As we have described in a previous post, all service calls to backend systems are made via the Hystrix fault tolerance layer (which was recently open sourced, along with its dashboard) that isolates the dynamic endpoints and the API Service Layer from the inevitable failures that occur while executing billions of network calls each day from the API to backend systems.
The Hystrix layer is inherently mutlti-threaded due to its use of threads for isolating dependencies and thus is leveraged for concurrent execution of blocking calls to backend systems. These asynchronous requests are then composed together via the reactive framework.

[7] Backend Services and Dependencies

The API Service Layer abstracts away all backend services and dependencies behind facades. As a result, endpoint code accesses “functionality” rather than a “system”. This allows us to change underlying implementations and architecture with no or limited impact on the code that depends on the API. For example, if a backend system is split into 2 different services, or 3 are combined into one, or a remote network call is optimized into an in-memory cache, none of these changes should affect endpoint code and thus the API Service Layer ensures that object models and other such tight-couplings are abstracted and not allowed to “leak” into the endpoint code.


The new Netflix API architecture is a significant departure from our previous generic RESTful API.
Dynamic JVM languages combined with an asynchronous Java API and the reactive programming model have proven to be a powerful combination to enable safe and efficient development of highly concurrent code.

The end result is a fault-tolerant, performant platform that puts control in the hands of those who know their target applications the best.

Following posts will provide further implementation and operational details about this new architecture.
If this type of work interests you we are always looking for talented engineers.

September 2014 Update

  • This blog post originally used the term "functional reactive programming" or FRP. This term was used in error. RxJava does not implement "continuous time" which is a requirement for FRP from previous literature.

Monday, January 14, 2013

NetflixOSS Open House

By Adrian Cockcroft, Ruslan Meshenberg.

As many of you know, we have open sourced many components of the Netflix infrastructure over the last year - ranging from our deployment console Asgard and Simian Army Monkeys, to the Cassandra and Zookeeper clients that we use internally. We hope that these tools are helping you run your business on Amazon Web Services, just like they help us. You can find all of our open source projects at our Github site.

The components we open sourced can be used separately, but we use them as an integrated platform to support the Netflix Web site and services. We have several missing pieces that we plan to release over the coming months that will fill out the platform functionality. So, ultimately, you will also have access to what we call the Netflix Open Source Platform.

So far we’ve had great interest and involvement from the open source community, and would like to extend an invitation to our NetflixOSS Open House.  At this event you can meet the authors of the components and deep-dive into technology and architectural choices.  We’ll show how all the components fit together, and share some details on what’s coming next on our Open Source Platform roadmap.

You can also follow @NetflixOSS on Twitter for announcements, and join our Meetup Group or our LinkedIn Community to discuss NetflixOSS with us.

Event schedule: Wednesday, February 6th

6:30pm - 7:00pm - Refreshments/snacks 
7:00pm - 7:15pm - Kick-off by Ruslan and Adrian
  • introduction to NetflixOSS platform
  • 2013 roadmap and sneak peek into what’s to come
7:15pm - 7:45pm - Lightning Talks about individual NetflixOSS projects
8:00pm - 9:00pm - Breakout Sessions in Rooms and Lab/Demo Stations 

Thursday, January 10, 2013

Hadoop Platform as a Service in the Cloud

by Sriram Krishnan and Eva Tse, Cloud Big Data Platform

Hadoop has become the de facto standard for managing and processing hundreds of terabytes to petabytes of data. At Netflix, our Hadoop-based data warehouse is petabyte-scale, and growing rapidly. However, with the big data explosion in recent times, even this is not very novel anymore. Our architecture, however, is unique as it enables us to build a data warehouse of practically infinite scale in the cloud (both in terms of data and computational power). In this article, we discuss our cloud-based data warehouse, how it is different from a traditional data center-based Hadoop infrastructure, and how we leverage the elasticity of the cloud to build a system that is dynamically scalable. We also introduce Genie, which is our in-house Hadoop Platform as a Service (PaaS) that provides REST-ful APIs for job execution and resource management.

Architectural Overview

In a traditional data center-based Hadoop data warehouse, the data is hosted on the Hadoop Distributed File System (HDFS). HDFS can be run on commodity hardware, and provides fault-tolerance and high throughput access to large datasets. The most typical way to build a Hadoop data warehouse in the cloud would be to follow this model, and store your data on HDFS on your cloud-based Hadoop clusters. However, as we describe in the next section, we have chosen to store all of our data on Amazon’s Storage Service (S3), which is the core principle on which our architecture is based. A high-level overview of our architecture is shown below, followed by the details.

S3 as the Cloud Data Warehouse

We use S3 as the “source of truth” for our cloud-based data warehouse. Any dataset that is worth retaining is stored on S3. This includes data from billions of streaming events from (Netflix-enabled) televisions, laptops, and mobile devices every hour captured by our log data pipeline (called Ursula), plus dimension data from Cassandra supplied by our Aegisthus pipeline. So why do we use S3, and not HDFS as the source of truth? Firstly, S3 is designed for 99.999999999% durability and 99.99% availability of objects over a given year, and can sustain concurrent loss of data in two facilities. Secondly, S3 provides bucket versioning, which we use to protect against inadvertent data loss (e.g. if a developer errantly deletes some data, we can easily recover it). Thirdly, S3 is elastic, and provides practically “unlimited” size. We grew our data warehouse organically from a few hundred terabytes to petabytes without having to provision any storage resources in advance. Finally, our use of S3 as the data warehouse enables us to run multiple, highly dynamic clusters that are adaptable to failures and load, as we will show in the following sections. On the flip side, reading and writing from S3 can be slower than writing to HDFS. However, most queries and processes tend to be multi-stage MapReduce jobs, where mappers in the first stage read input data in parallel from S3, and reducers in the last stage write output data back to S3. HDFS and local storage are used for all intermediate and transient data, which reduces the performance overhead.

Multiple Hadoop Clusters for Different Workloads

We currently use Amazon’s Elastic MapReduce (EMR) distribution of Hadoop. Our use of S3 as the data warehouse enables us to spin up multiple Hadoop clusters for different workloads, all accessing the exact same data. A large (500+ node) "query" cluster is used by engineers, data scientists and analysts to perform ad hoc queries. Our "production" (or “SLA”) cluster, which is around the same size as the query cluster, runs SLA-driven ETL (extract, transform, load) jobs. We also have several other “dev” clusters that are spun up as needed. If we had used HDFS as our source of truth, then we would need a process to replicate data across all the clusters. With our use of S3, this is non-issue because all clusters have instant access to the entire dataset. We dynamically resize both our query and production clusters daily. Our query cluster can be smaller at night when there are fewer developers logged in, whereas the production cluster must be larger at night, when most of our ETL is run. We do not have to worry about data redistribution or loss during expand/shrink because the data is on S3. And finally, although our production and query clusters are long-running clusters in the cloud, we can treat them as completely transient. If a cluster goes down, we can simply spin up another identically sized cluster (potentially in another Availability Zone, if needed) in tens of minutes with no concerns about data loss.

Tools and Gateways

Our developers use a variety of tools in the Hadoop ecosystem. In particular, they use Hive for ad hoc queries and analytics, and use Pig for ETL and algorithms. Vanilla java-based MapReduce is also occasionally used for some complex algorithms. Python is the common language of choice for scripting various ETL processes and Pig User Defined Functions (UDF). Our Hadoop clusters are accessible via a number of “gateways”, which are just cloud instances that our developers log into and run jobs using the command-line interfaces (CLIs) of Hadoop, Hive and Pig. Often our gateways become single points of contention, when there are many developers logged in and running a large number of jobs. In this case, we encourage the heavy users to spin up new pre-baked instances of our “personal” gateway AMIs (Amazon Machine Images) in the cloud. Using a personal gateway also allows developers to install other client-side packages (such as R) as needed.

Introducing Genie - the Hadoop Platform as a Service

Amazon provides Hadoop Infrastructure as a Service, via their Elastic MapReduce (EMR) offering. EMR provides an API to provision and run Hadoop clusters (i.e. infrastructure), on which you can run one or more Hadoop jobs. We have implemented Hadoop Platform as a Service (called “Genie”), which provides a higher level of abstraction, where one can submit individual Hadoop, Hive and Pig jobs via a REST-ful API without having to provision new Hadoop clusters, or installing any Hadoop, Hive or Pig clients. Furthermore, it enables administrators to manage and abstract out configurations of various back-end Hadoop resources in the cloud.

Why did we build Genie?

Our ETL processes are loosely-coupled, using a combination of Hadoop and non-Hadoop tools, spanning the cloud and our data center. For instance, we run daily summaries using Pig and Hive on our cloud-based Hadoop data warehouse, and load the results into our (order of magnitude smaller) relational data warehouse in the data center. This is a fairly common big data architecture, where a much smaller relational data warehouse is used to augment a Hadoop-based system. The former provides more real-time interactive querying and reporting, plus better integration with traditional BI (business intelligence) tools. Currently, we are using Teradata as our relational data warehouse. However, we are also investigating Amazon’s newRedshift offering. We use an enterprise scheduler (UC4) in our data center to define dependencies between various jobs between our data center and the cloud, and run them as “process flows”. Hence, we need a mechanism to kick off Hadoop, Hive and Pig jobs from any client, without having to install the entire Hadoop software stack on them. Furthermore, since we now run hundreds of Hadoop jobs per hour, we need this system to be horizontally scalable, especially since our workload will increase as we migrate more of our ETL and processing to Hadoop in the cloud. Finally, since our clusters in the cloud are potentially transient, and there is more than one cluster that can run Hadoop jobs, we need to abstract away physical details of the backend clusters from our clients.

Why build something new?

Why did we build Genie, as opposed to using something else that is already available? The simple answer is that there was nothing that was already out there in the open source community that handled our requirements - an API to run jobs, abstraction of backend clusters, an ability to submit jobs to multiple clusters, and scalable enough (horizontally or otherwise) to support our usage. The closest alternative that we considered was Oozie, which is a workflow scheduler similar to UC4. It is not a job submission API like Genie (hence not an apples-to-apples comparison). We ruled out the use of Oozie as our scheduler, since it only supports jobs in the Hadoop ecosystem, whereas our process flows span Hadoop and non-Hadoop jobs. Also, when we started our work on Genie, Oozie didn’t support Hive, which was a key requirement for us. A closer alternative to Genie is Templeton, which is now part of HCatalog. However, Templeton doesn’t support concurrent job submissions to more than one cluster, is still evolving, and doesn’t appear quite ready for production.

What is Genie?

Genie is a set of REST-ful services for job and resource management in the Hadoop ecosystem. Two key services are the Execution Service, which provides a REST-ful API to submit and manage Hadoop, Hive and Pig jobs, and the Configuration Service, which is a repository of available Hadoop resources, along with the metadata required to connect to and run jobs on these resources.

Execution Service 

Clients interact with Genie via the Execution Service API. They launch a job by sending a JSON or XML message to this API to specify a set of parameters, which include:
  • a job type, which can be Hadoop, Hive or Pig,
  • command-line arguments for the job,
  • file dependencies such as scripts and jar files (e.g. for UDFs) on S3,
  • a schedule type, such as “ad hoc” or “SLA”, which Genie uses to map the job to an appropriate cluster, and
  • a name for the Hive metastore to connect to (e.g. prod, test, or one of the dev ones).
If a job submission is successful, Genie returns a job id for the job, which can be used to get the job status, and the output URL. The output URL is an HTTP URL pointing to the working directory of the job, which contains the standard output and error logs (see screenshot below). Each job id can translate to multiple MapReduce jobs depending on the number of intermediate stages in the Hive or Pig query being run.

Configuration Service 

The Configuration Service is used to keep track of all the clusters that are currently running, and the schedule types that they support. For instance, our query cluster has been configured to support “ad hoc” jobs, whereas our production cluster has been configured to support “SLA” jobs. When a cluster comes up, we publish to the Configuration Service the type of jobs that it supports, and also the set of configuration files for that cluster (viz. the mapred-site.xml, core-site.xml, hdfs-site.xml for Hadoop, and hive-site.xml for Hive). We also mark its status as “Up”. Similarly, when a cluster goes down we mark it as “Terminated”. There is also an “Out of Service” state for clusters, which means it is alive, but not supporting any new job submissions. This is useful during upgrades and the termination process, when a cluster should no longer accept any new jobs, but should let all running jobs finish before it is terminated. This service is complementary to Eureka, which is designed to be a repository of metadata for ephemeral instances (and not clusters) in the cloud. When the Execution Service receives a job submission request, it maps the job to an appropriate cluster using the Configuration Service. If there are multiple clusters that could satisfy the job requirements, it currently picks one of the candidate clusters at random. However, this behavior could be modified by implementing a custom load balancer (e.g. based on available slots). It then fetches all the configuration files for that cluster, and forks off the individual Hadoop, Hive or Pig jobs into separate individual working directories for each job (see screenshot above), thereby providing isolation between Genie and the jobs themselves. A single Genie instance can thus support multiple concurrent job submissions to different clusters, all completely abstracted out from the clients.

How do we use Genie for dynamic resource management?

Various engineering teams in Netflix run their services on reserved instances on ASGs (auto-scaling groups) on AWS, which they expand and shrink based on load. Most of the ETL jobs run after midnight (PST), which conveniently happens to be the time when most of these ASGs have scaled down (due to the natural viewing pattern of Netflix in North America). Hence, we use these surplus reserved instances to spin up additional “bonus” clusters supplementing our other production clusters. We register them with the Configuration Service, and the Genie clients (i.e. ETL jobs) access these new clusters using the Execution Service API. When the engineering teams need their instances back, the bonus clusters are terminated and de-registered, and are no longer available via Genie clients. There is no need to perform any rolling upgrades, which is common in traditional Hadoop clusters. If we need to perform an upgrade of our production (SLA) cluster, one option is to spin up a new production cluster with the upgraded software stack, and stop routing traffic to the old cluster by simply setting its status to “Out of Service” (we call this a “red-black” push). Alternatively, we can perform an upgrade on a running cluster by setting its status to “Out of Service”, and temporarily marking another running cluster as an SLA cluster, while the upgrade is being performed. If we do not want running jobs to fail, we wait for them to finish before terminating the old cluster or upgrading a running cluster. This is similar to the capability provided by Asgard for application deployments and cloud instance management.

What is the current deployment status of Genie?

Although it continues to be a work in progress, we use Genie heavily in our production environment. It is currently deployed in a 6-12 node Auto Scaling Group (ASG), spanning three Availability Zones for load-balancing and fault tolerance purposes. For horizontal scalability, we adjust the number of nodes based on load (i.e. the number of concurrent running jobs). This is configured via CloudWatch alarms, coupled with auto-scaling policies managed by Asgard. Genie instances are registered with Eureka, and clients use the Eureka API to choose an available instance to run their job. To avoid client skew, Genie also forwards job requests to other lightly loaded instances, when an instance gets more requests than it can handle. Genie currently supports hundreds of concurrent job submissions at any time. On a daily basis, it runs hundreds of Hive jobs from our visualization tools and our custom Hive/Pig Web UI, plus thousands of Hive and Pig-based ETL jobs. Scaling to thousands of concurrent jobs is theoretically possible, simply by increasing the number of instances in the ASG.


In this article, we described the cloud-based Hadoop architecture at Netflix, which is significantly different than data-center based ones. We are able to leverage the elasticity of the cloud to scale up and down, and we can spin up multiple clusters dynamically to address different kinds of workloads. We also described Genie, which is a horizontally scalable Hadoop Platform as a Service. Genie exposes APIs to submit and manage jobs, as well as manage and abstract out backend Hadoop resources. We are considering open sourcing Genie in the near future, and would love to hear your feedback on whether this might be useful in your big data environment. If building critical big data components like this sounds exciting to you (for a service that millions of people love worldwide!), take a look at

Friday, January 4, 2013

Janitor Monkey - Keeping the Cloud Tidy and Clean

By Michael Fu and Cory Bennett, Engineering Tools

One of the great advantages of moving from a private datacenter into the cloud is that you have quick and easy access to nearly limitless new resources. Innovation and experimentation friction is greatly reduced: to push out a new application release you can quickly build up a new cluster, to get more storage just attach a new volume, to backup your data just make a snapshot, to test out a new idea just create new instances and get to work. The downside of this flexbility is that it is pretty easy to lose track of the cloud resources that are no longer needed or used. Perhaps you forgot to delete the cluster with the previous version of your application, or forgot to destroy the volume when you no longer needed the extra disk. Taking snapshots is great for backups, but do you really need them from 12 months ago? It's not just forgetfulness that can cause problems. API and network errors can cause your request to delete an unused volume to get lost.

At Netflix, when we analyzed our Amazon Web Services (AWS) usage, we found a lot of unused resources and we needed a solution to rectify this problem. Diligent engineers can manualy delete unused resources via Asgard but we needed a way to automatically detect and clean them up. Our solution was Janitor Monkey.

We have written about our Simian Army in the past and we are now proud to announce that the source code for the new member of our simian army, Janitor Monkey, is now open and available to the public.

What is Janitor Monkey?

Janitor Monkey is a service which runs in the Amazon Web Services (AWS) cloud looking for unused resources to clean up. Similar to Chaos Monkey, the design of Janitor Monkey is flexible enough to allow extending it to work with other cloud providers and cloud resources. The service is configured to run, by default, on non-holiday weekdays at 11 AM. The schedule can be easily re-configured to fit your business' need.

Janitor Monkey determines whether a resource should be a cleanup candidate by applying a set of rules on it. If any of the rules determines that the resource is a cleanup candidate, Janitor Monkey marks the resource and schedules a time to clean it up. We provide a collection of rules in the open sourced version that are currently used at Netflix and believed general enough to be used by most users. The design of Janitor Monkey also makes it simple to customize rules or to add new ones.

Since there can be exceptions when you want to keep an unused resource around, before a resource is deleted by Janitor Monkey, the owner of the resource will receive a notification a configurable number of days ahead of the cleanup time. This is to prevent a resource that is still needed from being deleted by Janitor Monkey. The resource owner can then flag the resources that they want to keep as exceptions and Janitor Monkey will leave them alone.

Over the last year Janitor Monkey has deleted over 5,000 resources running in our production and test environments. It has helped keep our costs down and has freed up engineering time which is no longer needed to manage unused resources.

Resource Types and Rules

Four types of AWS resources are currently managed by Janitor Monkey: Instances, EBS Volumes, EBS Volume Snapshots, and Auto Scaling Groups. Each of these resource types has its own rules to mark unused resources. For example, an EBS volume is marked as a cleanup candidate if it has not been attached to any instance for 30 days. Another example is that an instance will be cleaned by Janitor Monkey if it is not in any auto scaling group for over 3 days since we know these are experimentation instances -- all others must be in auto scaling groups. The number of retention days in these rules is configurable so the rules can be easily customized to fit your business requirements. We plan to make Janitor Monkey support more resource types in the future, such as launch configurations, security groups, and AMIs. The design of Janitor Monkey makes adding new resource types easy.

How Janitor Monkey Cleans

Janitor Monkey works in three stages: "mark, notify, delete". When Janitor Monkey marks a resource as a cleanup candidate, it schedules a time to delete the resource. The delete time is specified in the rule that marks the resource. Every resource is associated with an owner email, which can be specified as a tag on the resource. You can also easily extend Janitor Monkey to obtain this information from your internal system. The simplest way is using a default email address, e.g. your team's email list for all the resources. You can configure a number of days for specifying when to let Janitor Monkey send notification to the resource owner before the scheduled termination. By default the number is 2, which means that the owner will receive a notification 2 business days ahead of the termination date. During the 2-day period the resource owner can decide if the resource can be deleted. In case a resource needs to be retained, the owner can use a simple REST interface to flag the resource to be excluded by Janitor Monkey. The owner can later use another REST interface to remove the flag and Janitor Monkey will then be able to manage the resource again. When Janitor Monkey sees a resource marked as a cleanup candidate and the scheduled termination time has passed, it will delete the resource. The resource owner can also delete the resource manually if he/she wants to release the resource earlier to save cost. When the status of the resource changes, making the resource not a cleanup candidate (e.g. a detached EBS volume is attached to an instance), Janitor Monkey will unmark the resource and no cleanup will occur.

Configuration and Customization

The resource types managed by Janitor Monkey, the rules for each resource type to mark cleanup candidates, and the parameters used to configure each individual rule, are all configurable. You can easily customize Janitor Monkey with the most appropriate set of rules for your resources by setting Janitor Monkey properties in a configuration file. You can also create your own rules or add support for new resource types, and we encourage you to contribute your cleanup rules to the project so that all can benefit.

Auditing, Logging, and Costs

Janitor Monkey events are logged in an Amazon SimpleDB table by default. You can easily check the SimpleDB records to find out what Janitor Monkey has done. The resources managed by Janitor Monkey are also stored in SimpleDB. At Netflix we have a UI for managing the Janitor Monkey resources and we have plans to open source it in the future as well.

There could be associated costs with Amazon SimpleDB, but in most cases the activity of Janitor Monkey should be small enough to fall within Amazon's Free Usage Tier. Ultimately the costs associated with running Janitor Monkey are your responsibility. For your reference, the costs of Amazon SimpleDB can be found at

Coming Up

In the near future we are planning to release some new resource types for Janitor Monkey to manage. As mentioned earlier, the next candidate will likely be launch configuration. Also, we will add support for using Edda to implement existing and new Janitor Monkey rules. Edda allows us to query the history of resources, helping Janitor Monkey find unused resources more accurately and reliably.


Janitor Monkey helps keep our cloud clean and clutter-free. We hope you find Janitor Monkey to be useful for your business. We'd appreciate any feedback on it. We're always looking for new members to join the team. If you are interested in working on great open source software, take a look at for current openings!