Tuesday, August 23, 2016

Engineering Trade-Offs and The Netflix API Re-Architecture

Netflix’s engineering culture is predicated on Freedom & Responsibility, the idea that everyone (and every team) at Netflix is entrusted with a core responsibility. Within that framework they are free to operate with freedom to satisfy their mission. Accordingly, teams are generally responsible for all aspects of their systems, ranging from design, architecture, development, deployments, and operations. At the same time, it is inefficient to have all teams build everything that they need from scratch, given that there are often commonalities in the infrastructure needs of teams. We (like everyone else) value code reuse and consolidation where appropriate.

Given these two ideas (Freedom & Responsibility and leveragability of code), how can an individual and/or team figure out what they should optimize for themselves and what they should inherit from a centralized team? These kinds of trade-offs are pervasive in making engineering decisions, and Netflix is no exception.

The Netflix API is the service that handles the (sign-up, discovery and playback) traffic from all devices from all users. Over the last few years, the service has grown in a number of different dimensions: it’s grown in complexity, its request volume has increased, and Netflix’s subscriber base has grown as we expanded to most countries in the world. As the demands on the Netflix API continue to rise, the architecture that supports this massive responsibility is starting to approach its limits. As a result, we are working on a new architecture to position us well for the future (see a recent presentation at QCon for more details). This post explores the challenge of how, in the course of our re-architecture, we work to reconcile seemingly conflicting engineering principles: velocity and full ownership vs. maximum code reuse and consolidation.

Microservices Orchestration in the Netflix API
The Netflix API is the “front door” to the Netflix ecosystem of microservices. As requests come from devices, the API provides the logic of composing calls to all services that are required to construct a response. It gathers whatever information it needs from the backend services, in whatever order needed, formats and filters the data as necessary, and returns the response.

So, at its core, the Netflix API is an orchestration service that exposes coarse grained APIs by composing fined grained functionality provided by the microservices.
To make this happen, the API has at least four primary requirements: provide a flexible request protocol; map requests to one or more fine-grained APIs to backend microservices; provide a common resiliency abstraction to protect backend microservices; and create a context boundary (“buffer”) between device and backend teams.  

Today, the API service exposes three categories of coarse grained APIs: non-member (sign-up, billing, free trial, etc.), discovery (recommended shows and movies, search, etc.) and playback (decisions regarding the streaming experience, licensing to ensure users can view specific content, viewing history, heartbeats for user bookmarking, etc.).

Consider an example from the playback category of APIs. Suppose a user clicks the “play” button for Stranger Things Episode 1 on their mobile phone. In order for playback to begin, the mobile phone sends a “play” request to the API. The API in turn calls several microservices under the hood. Some of these calls can be made in parallel, because they don’t depend on each other. Others have to be sequenced in a specific order. The API contains all the logic to sequence and parallelize the calls as necessary. The device, in turn, doesn’t need to know anything about the orchestration that goes on under the hood when the customer clicks “play”.



 Figure 1: Devices send requests to API, which orchestrates the ecosystem of microservices.


Playback requests, with some exceptions, map only to playback backend services. There are many more discovery and non-member dependent services than playback services, but the separation is relatively clean, with only a few services needed both for playback and non-playback requests.

This is not a new insight for us, and our organizational structure reflects this. Today, two teams, both the API and the Playback teams, contribute to the orchestration layer, with the Playback team focusing on Playback APIs. However, only the API team is responsible for the full operations of the API, including releases, 24/7 support, rollbacks, etc. While this is great for code reuse, it goes against our principle of teams owning and operating in production what they build.

With this in mind, the goals to address in the new architecture are:
  • We want each team to own and operate in production what they build. This will allow more targeted alerting, and faster MTTR.
  • Similarly, we want each team to own their own release schedule and wherever possible not have releases held up by unrelated changes.

Two competing approaches
As we look into the future, we are considering two options. In option 1 (see figure 2), the orchestration layer in the API will, for all playback requests, be a pass-through and simply send the requests on to the playback-specific orchestration layer. The playback orchestration layer would then play the role of orchestrating between all playback services. The one exception to a full pass-through model is the small set of shared services, where the orchestration layer in the API would enrich the request with whatever information the playback orchestration layer needs in order to service the request.



Figure 2: OPTION 1: Pass-through orchestration layer with playback-specific orchestration layer


Alternatively, we could simply split into two separate APIs (see figure 3).



Figure 3: OPTION 2: Separate playback and discovery/non-member APIs


Both of the approaches actually solve the challenges we set out to solve: for each option, each team will own the release cycle as well as the production operations of their own orchestration layer - a step forward in our minds. This means that the choice between the two options comes down to other factors. Below we discuss some of our considerations.

Developer Experience
The developers who use our API (i.e., Netflix’s device teams) are top priority when designing, building and supporting the new API. They will program against our API daily, and it is important for our business that their developer experience and productivity is excellent. Two of the top concerns in this area are discovery and documentation: our partner teams will need to know how to interact with the API, what parameters to pass in and what they can expect back. Another goal is flexibility: due to the complex needs we have for 1000+ device types, our API must be extremely flexible. For instance, a device may want to request a different number of videos, and different properties about them, than another device would. All of this work will be important to both playback and non-playback APIs, so how is this related to the one vs. two APIs discussion? One API facilitates more uniformity in those areas: how requests are made and composed, how the API is documented, where and how teams find out about changes or additions to the API, API versioning, tools to optimize the developer experience, etc. If we go the route of two APIs, this is all still possible, but we will have to work harder across the two teams to achieve this.

Organizational implications and shared components
The two teams are very close and collaborate effectively on the API today. However, we are keenly aware that a decision to create two APIs, owned by two separate teams, can have profound implications. Our goals would, and should, be minimal divergence between the two APIs. Developer experience, as noted above, is one of the reasons. More broadly, we want to maximize the reuse of any components that are relevant to both APIs. This also includes any orchestration mechanisms, and any tools, mechanisms, and libraries related to scalability, reliability, and resiliency. The risk is that the two APIs could drift apart over time. What would that mean? For one, it could have organizational consequences (e.g., need for more staff). We could end up in a situation where we have valued ownership of components to a degree that we have abandoned component reuse. This is not a desirable outcome for us, and we would have to be very thoughtful about any divergence between the two APIs.

Even in a world where we have a significant amount of code use, we recognize that the operational overhead will be higher. As noted above, the API is critical to the Netflix service functioning properly for customers. Up until now, only one of the teams has been tasked with making the system highly scalable and highly resilient, and carrying the operational burden. The team has spent years building up expertise and experience in system scale and resiliency. By creating two APIs, we would be distributing these tasks and responsibilities to both teams.

Simplicity
If one puts the organizational considerations aside, two separate APIs is simply the cleaner architecture. In option 1, if the API acts largely as a pass-through, is it worth incurring the extra hop? Every playback request that would come into the API would simply be passed along to the playback orchestration layer without providing much functional value (besides the small set of functionality needed from the shared services). If the components that we build for discovery, insights, resiliency, orchestration, etc. can be reused in both APIs, the simplicity of having a clean separation between the two APIs is appealing. Moreover, as mentioned briefly above, option 1 also requires two teams to be involved for Playback API pushes that change the interaction model, while option 2 truly separates out the deployments.

Where does all of this leave us? We realize that this decision will have long-lasting consequences. But in taking all of the above into consideration, we have also come to understand that there is no perfect solution. There is no right or wrong, only trade-offs. Our path forward is to make informed assumptions and then experiment and build based on them. In particular, we are experimenting with how much we can generalize the building blocks we have already built and are planning to build, so that they could be used in both APIs. If this proves fruitful, we will then build two APIs. Despite the challenges, we are optimistic about this path and excited about the future of our services. If you are interested in helping us tackle this and other equally interesting challenges, come join us! We are hiring for several different roles.

By Katharina Probst, Justin Becker

Tuesday, August 16, 2016

Distributed delay queues based on Dynomite

Netflix’s Content Platform Engineering runs a number of business processes which are driven by asynchronous orchestration of micro-services based tasks, and queues form an integral part of the orchestration layer amongst these services.   

Few examples of these processes are:
  • IMF based content ingest from our partners
  • Process of setting up new titles within Netflix
  • Content Ingest, encode and deployment to CDN

Traditionally, we have been using a Cassandra based queue recipe along with Zookeeper for distributed locks, since Cassandra is the de facto storage engine at Netflix. Using Cassandra for queue like data structure is a known anti-pattern, also using a global lock on queue while polling, limits the amount of concurrency on the consumer side as the lock ensures only one consumer can poll from the queue at a time.  This can be addressed a bit by sharding the queue but the concurrency is still limited within the shard.  As we started to build out a new orchestration engine, we looked at Dynomite for handling the task queues.  

We wanted the following in the queue recipe:
  1. Distributed
  2. No external locks (e.g. Zookeeper locks)
  3. Highly concurrent
  4. At-least-once delivery semantics
  5. No strict FIFO
  6. Delayed queue (message is not taken out of the queue until some time in the future)
  7. Priorities within the shard
The queue recipe described here is used to build a message broker server that exposes various operations (push, poll, ack etc.) via REST endpoints and can potentially be exposed by other transports (e.g. gRPC).  Today, we are open sourcing the queue recipe.

Using Dynomite & Redis for building queues

Dynomite is a generic dynamo implementation that can be used with many different key-value pair storage engines. Currently, it provides support for the Redis Serialization Protocol (RESP) and Memcached write protocol. We chose Dynomite for its performance, multi-datacenter replication and high availability. Moreover, Dynomite provides sharding, and pluggable data storage engines, allowing us to scale vertically or horizontally as our data needs increase.

Why Redis?

We chose to build the queues using Redis as a storage engine for Dynomite.
  1. Redis architecture lends nicely to a queuing design by providing data structures required for building queues. Moreover, Redis in memory design provides superior performance (low latency).  
  2. Dynomite, on top of Redis, provides high availability, peer-to-peer replication and required semantics around consistency (DC_SAFE_QUORUM) for building queues in a distributed cluster.

Queue Recipe

A queue is stored as a sorted set (ZADD, ZRANGE etc. operations) within Redis.  Redis sorts the members in a sorted set using the provided score.  When storing an element in the queue, the score is computed as a function of the message priority and timeout (for timed queues).  

Push &  Pop Using Redis Primitives

The following sequence describes the high level operations used to push/poll messages into the system.  For each queue three set of Redis data structures are maintained:


  1. A Sorted Set containing queued elements by score.
  2. A Hash set that contains message payload, with key as message ID.
  3. A Sorted Set containing messages consumed by client but yet to be acknowledged. Un-ack set.
Push
  • Calculate the score as a function of message timeout (delayed queue) and priority
  • Add to sortedset for queue
  • Add message payload by ID into Redis hashed set with key as message ID.
Poll
  • Calculate max score as current time
  • Get messages with score between 0 and max
  • Add the message ID to unack set and remove from the sorted set for the queue.
  • If the previous step succeeds, retrieve the message payload from the Redis set based on ID
Ack
  • Remove from unack set by ID
  • Remove from the message payload set

Messages that are not acknowledged by the client are pushed back to the queue (at-least once semantics).

Availability Zone / Rack Awareness

Our queue recipe was built on top of Dynomite’s Java client, Dyno. Dyno provides connection pooling for persistent connections, and can be configured to be topology aware (token aware). Moreover, Dyno provides application specific local rack (in AWS a rack is a zone, e.g. us-east-1a, us-east-1b etc.) affinity based on request routing to Dynomite nodes. A client in us-east-1a will connect to a Dynomite/Redis node in the same AZ (unless the node is not available, in which case the  client will failover).  This property is exploited for sharding the queues by availability zone.

Sharding

Queues are sharded based on the availability zone. When pushing an element to the queue, the shard is determined based on round robin. This will ensure eventually all the shards are balanced. Each shard represents a sorted set on Redis with key being combination of queueName & AVAILABILITY _ZONE.

Dynomite consistency

The message broker uses a Dynomite cluster with consistency level set to DC_SAFE_QUORUM. Reads and writes are propagated synchronously to quorum number of nodes in the local data center and asynchronously to the rest. The DC_SAFE_QUORUM configuration writes to the number of nodes that make up a quorum. A quorum is calculated, and then rounded down to a whole number. This consistency level ensures all the writes are acknowledged by majority quorum.

Avoiding Global Locks





  • Each node (N1...Nn in the above diagram) has affinity to the availability zone and talks to the redis servers in that zone.
  • A Dynomite/Redis node serves only one request at a time. Dynomite can hold thousands of concurrent connections, however requests are processed by a single thread inside Redis.  This ensures when two concurrent calls are issued to poll an element from queue, they are served sequentially by Redis server avoiding any local or distributed locks on the message broker side.
  • In an event of failover, DC_SAFE_QUORUM write ensures no two client connections are given the same message out of a queue, as write to UNACK collection will only succeed for a single node for a given element.  This ensures if the same element is picked up by two broker nodes (in an event of a failover connection to Dynomite) only one will be able to add the message to the UNACK collection and another will receive failure. The failed node then moves onto peek another message from the queue to process.

Queue Maintenance Considerations

Queue Rebalancing

Useful when queues are not balanced or new availability zone is added or an existing one is removed permanently.

Handling Un-Ack’ed messages

A background process monitors for the messages in the UNACK collections that are not acknowledged by a client in a given time (configurable per queue). These messages are moved back into the queue.

Further extensions

Multiple consumers

A modified version can be implemented, where the consumer can “subscribe” for a message type (message type being metadata associated with a message) and a message is delivered to all the interested consumers.

Ephemeral Queues

Ephemeral queues have messages with a specified TTL and are only available to consumer until the TTL expires. Once expired, the messages are removed from queue and no longer visible to consumer. The recipe can be modified to add TTL to messages thereby creating an ephemeral queue. When adding elements to the Redis collections, they can be TTLed, and will be removed from collection by Redis upon expiry.

Other messaging solutions considered

  1. Kafka
Kafka provides robust messaging solution with at-least once delivery semantics.  Kafka lends itself well for message streaming use cases.  Kafka makes it harder to implement the semantics around priority queues and time based queue (both are required for our primary use case).  Case can be made to create large number of partitions in a queue to handle client usage - but then again adding a message broker in the middle will complicate things further.
  1. SQS
Amazon SQS is a viable alternative and depending upon the use case might be a good fit.  However, SQS does not support priority or time based queues beyond 15 minute delay.
  1. Disque
Disque is a project that aims to provide distributed queues with Redis like semantics. At the time we started working on this project, Disque was in beta (RC is out).
  1. Zookeeper (or comparable) distributed locks / coordinator based solutions
A distributed queue can be built with Cassandra or similar backend with zookeeper as the global locking solution. However, zookeeper quickly becomes the bottleneck as the no. of clients grow adding to the latencies.  Cassandra itself is known to have queues as anti-pattern use case.

Performance Tests

Below are some of the performance numbers for the queues implemented using the above recipe. The numbers here measures the server side latencies and does not include the network time between client and server. The Dynomite cluster as noted above runs with DC_SAFE_QUORUM consistency level guarantee.

Cluster Setup

Dynomite
3 x r3.2xlarge, us-east-1, us-west-2, eu-west-1
Message Broker
3 x m3.xlarge, us-east-1
Publisher / Consumer
m3.large, us-east-1


Dynomite cluster is deployed across 3 regions providing higher availability in case of region outages. Broker talks to the Dynomite cluster in the same region (unless the entire region fails over) as the test focuses on the measuring latencies within the region. For very high availability use cases, message broker could  be deployed in multiple region along with Dynomite cluster.

Results



Events Per Second
Poll Latency (in millisecond)
Push Latency (in millisecond)
Avg
95th
99th
Avg
95th
99th
90
5.6
7.8
88
1.3
1.3
2.2
180
2.9
2.4
12.3
1.3
1.3
2.1
450
4.5
2.6
104
1.2
1.5
2.1
1000
10
15
230
1.8
3.3
6.3


Conclusion

We built the queue recipe based on the need for micro-services orchestration. Building the recipe on top of Dynomite, provides flexibility for us to port the solution to other storage engine depending upon the workload needs.  We think the recipe is hackable enough to support further use cases. We are releasing the recipe to open source: https://github.com/Netflix/dyno-queues.

If you like the challenges of building distributed systems and are interested in  building the Netflix studio eco-system and the content pipeline at scale, check out our job openings.