Monday, February 1, 2016

Astyanax - Retiring an old friend

In the summer of 2011, Astyanax, an Apache Cassandra (C*) Java client library was created to easily consume Cassandra, which at the time was in its infancy. Astyanax became so popular that for a good while, it became the de facto java client library for the Apache Cassandra community. Astyanax provides the following features:
  • High level, simple, object oriented interface to Cassandra.
  • Resilient behavior on the client side.
  • Connection pool abstraction. Implementation of a round robin and token-aware connection pool.
  • Monitoring abstraction to get event notification from the connection pool.
  • Complete encapsulation of the underlying Thrift API and structs.
  • Automatic retry of downed hosts.
  • Automatic discovery of additional hosts in the cluster.
  • Suspension of hosts for a short period of time after several timeouts.
  • Annotations to simplify use of composite columns.
Datastax, the enterprise company behind Apache Cassandra, took many of the lessons contained within Astyanax and included them within their official Java Cassandra driver.
When Astyanax was written, the protocol to communicate to Cassandra was Thrift and the API was very low level. Today, Cassandra is mostly consumed via a query language very similar to SQL. This new language is called CQL (Cassandra Query Language). The Cassandra community has also moved beyond the Thrift protocol to the CQL BINARY PROTOCOL.
Thrift will be deprecated in Apache Cassandra in version 4.0. Aside from that deprecation there are also the following reasons to move away from Thrift:
  • CQL Binary protocol performs better
  • Community development efforts have completely moved to the CQL Binary protocol. The thrift implementation is only in maintenance mode.
  • CQL is easier to consume since the API resembles SQL.
Today we are moving Astyanax from an active project in the NetflixOSS ecosystem, into an archived state. This means the project will still be available for public consumption, however, we will not be making any feature enhancements or performance improvements. There are still tens of thousands (if not more) lines of code, within Netflix, that use Astyanax. Moving forward, we will only be fixing Netflix critical bugs as we begin our efforts to refactor our internal systems to use the CQL Binary protocol.
If there are members of the community that would like to have a more hands-on role and maintain the project by becoming a committer, please reach out to me directly.

Wednesday, January 20, 2016

Automated Failure Testing

AKA Training Smarter Monkeys

At Netflix, we have found that proactive failure testing is a great way to ensure that we have a reliable product for our members by helping us prepare our systems, and our teams, for the problems that arise in our production environment. Our various efforts in this space, some of which are manual, have helped us make it through the holiday season without incident (which is great if you’re on-call for New Year’s Eve!). But who likes manual processes? Additionally, we are only testing for the failures we anticipate, and often only for an individual service or component per exercise. We can do better!

Imagine a monkey that crawls through your code and infrastructure, injecting small failures and discovering if it results in member pain.

While looking for a way to build such a monkey, we discovered a failure testing approach developed by Peter Alvaro called Molly. Given that we already had a failure injection service called FIT, we believed we could build a prototype implementation in short order. And we thought it would be great to see how well the concepts outlined in the Molly paper translated into a large-scale production environment.  So, we got in touch with Peter to see if he was interested in working together to build a prototype. He was and the results of our collaboration are detailed below.

Algorithm


“A lineage-driven fault injector reasons backwards from correct system outcomes to determine whether failures in the execution could have prevented the outcome.” [1]

Molly begins by looking at everything that went into a successful request and asking “What could have prevented this outcome?” Take this simplified request as an example:
simpleRequestGraph.dot.png

(A or R or P or B)

At the start, everything is necessary - as far as we know. Symbolically we say that member pain could result from failing (A or R or P or B) where A stands for API, etc. We start by choosing randomly from the potential failure points and rerunning the request, injecting failure at the chosen point.

There are three potential outcomes:
  1. The request fails - we’ve found a member facing failure
    1. From this we can prune future experiments containing this failure
  2. The request succeeds - the service/failure point is not critical
  3. The request succeeds, and there is an alternative interaction that takes the place of the failure (i.e. a failover or a fallback).  

In this example, we fail Ratings and the request succeeds, producing this graph:

simpleRequestGraph2.dot.png
(A or P or B) and (A or P or B or R)

We know more about this request’s behavior and update our failure equation. As Playlist is a potential failure point in this equation, we’ll fail it next, producing this graph:

simpleRequestGraph3.dot.png

(A or PF or B) and (A or P or B) and (A or P or B or R)

This illustrates #3 above. The request was still successful, but due to an alternate execution. Now we have a new failure point to explore. We update our equation to include this new information. Now we rinse, lather, and repeat until there are no more failures to explore.

Molly isn’t prescriptive on how to explore this search space. For our implementation we decided to compute all solutions which satisfy the failure equation, and then choose randomly from the smallest solution sets. For example, the solutions to our last representation would be: [{A}, {PF}, {B}, {P,PF}, {R,A}, {R,B} …]. We would begin by exploring all the single points of failure: A, PF, B; then proceed to all sets of size 2, and so forth.

Implementation


Lineage


What is the lineage of a Netflix request? We are able to leverage our tracing system to build a tree of the request execution across our microservices. Thanks to FIT, we have additional information in the form of “Injection Points”. These are key inflection points in our system where failures may occur. Injection Points include things like Hystrix command executions, cache lookups, DB queries, HTTP calls, etc. The data  provided by FIT allows us to build a more complete request tree, which is what we feed into the algorithm for analysis.

In the examples above, we see simple service request trees. Here is the same request tree extended with FIT data:

exampleRequestGraph.dot.png

Success

What do we mean by ‘success’? What is most important is our members’ experience, so we want a measurement that reflects this. To accomplish this, we tap into our device reported metrics stream. By analyzing these metrics we can determine if the request resulted in a member-facing error.  

An alternate, more simplistic approach could be to rely on the HTTP status codes for determining successful outcomes. But status codes can be misleading, as some frameworks return a ‘200’ on partial success, with a member-impacting error embedded within the payload.

Currently only a subset of Netflix requests have corresponding device reported metrics. Adding device reported metrics for more request types presents us with the opportunity to expand our automated failure testing to cover a broader set of device traffic.

Idempotence


Being able to replay requests made things nice and clean for Molly. We don’t have that luxury. We don’t know at the time we receive a request whether or not it is idempotent and safe to replay. To offset this, we have grouped requests into equivalence classes, such that requests within each class ‘behave’ the same - i.e. executes the same dependent calls and fail in the same way.

To define request classes, we focused on the information we had available when we received the request: the path (netflix.com/foo/bar), the parameters (?baz=boo), and the device making the request. Our first pass was to see if a direct mapping existed between these request features and the set of dependencies executed. This didn’t pan out. Next we explored using machine learning to find and create these mappings. This seemed promising, but would require a fair amount of work to get right.

Instead, we narrowed our scope to only examine requests generated by the Falcor framework. These requests provide, through the query parameters, a set of json paths to load for the request, i.e. ‘videos’, ‘profiles’, ‘images’. We found that these Falcor path elements matched consistently with the internal services required to load those elements.

Future work involves finding a more generic way to create these request class mappings so that we can expand our testing beyond Falcor requests.

These request classes change as code is written and deployed by Netflix engineers. To offset this drift, we run an analysis of potential request classes daily through a sampling of the device reported metrics stream. We expire old classes that no longer receive traffic, and we create new classes for new code paths.

Member Pain


Remember that the goal of this exploration is to find and fix errors before they impact a large number of members. It’s not acceptable to cause a lot of member pain while running our tests. In order to mitigate this risk, we structure our exploration so that we are only running a small number of experiments over any given period.

Each experiment is scoped to a request class and runs for a short period (twenty to thirty seconds) for a miniscule percentage of members. We want at least ten good example requests from each experiment. In order to filter out false positives, we look at the overall success rate for an experiment, only marking a failure as found if greater than 75% of requests failed. Since our request class mapping isn’t perfect, we also filter out requests which, for any reason, didn’t execute the failure we intended to test.  

Let’s say we are able to run 500 experiments in a day. If we are potentially impacting 10 members each run, then the worst case impact is 5,000 members each day. But not every experiment results in a failure - in fact the majority of them result in success. If we only find a failure in one in ten experiments (a high estimate), then we’re actually impacting 500 members requests in a day, some of which are further mitigated by retries. When you’re serving billions of requests each day, the impact of these experiments is very small.  

Results


We were lucky that one of the most important Netflix requests met our criteria for exploration - the ‘App Boot’ request. This request loads the metadata needed to run the Netflix application and load the initial list of videos for a member. This is a moment of truth that, as a company, we want to win by providing a reliable experience from the very start.

This is also a very complex request, touching dozens of internal services and hundreds of potential failure points. Brute force exploration of this space would take 2^100 iterations (roughly 1 with 30 zeros following), whereas our approach was able to explore it in ~200 experiments. We found five potential failures, one of which was a combination of failure points.

What do we do once we’ve found a failure?  Well, that part is still admittedly manual. We aren’t to the point of automatically fixing the failure yet. In this case, we have a list of known failure points, along with a ‘scenario’ which allows someone to use FIT to reproduce the failure. From this we can verify the failure and decide on a fix.

We’re very excited that we were able to build this proof of concept implementation and find real failures using it. We hope to be able to extend it to search a larger portion of the Netflix request space and find more member facing failures before they result in outages, all in an automated way.

And if you’re interested in failure testing and building resilient systems, get in touch with us - we’re hiring!

Kolton Andrus (@KoltonAndrus), Ben Schmaus (@schmaus)

Thursday, January 14, 2016

Dynomite with Redis on AWS - Benchmarks

About a year ago the Cloud Database Engineering (CDE) team published a post Introducing Dynomite. Dynomite is a proxy layer that provides sharding and replication and can turn existing non-distributed datastores into a fully distributed system with multi-region replication. One of its core features is the ability to scale a data store linearly to meet rapidly increasing traffic demands. Dynomite also provides high availability, and was designed and built to support Active-Active Multi-Regional Resiliency.
Dynomite, with Redis is now utilized as a production system within Netflix. This post is the first part of a series that pragmatically examines Dynomite's use cases and features. In this post, we will show performance results using Amazon Web Services (AWS) with and without the recently added consistency feature.

Dynomite Consistency

Dynomite extends eventual consistency to tunable consistency in the local region. The consistency level specifies how many replicas must respond to a write or read request before returning data to the client application. Read and write consistency can be configured to manage availability versus data accuracy. Consistency can be configured for read or write operations separately (cluster-wide). There are two configurations:
  • DC_ONE: Reads and writes are propagated synchronously only to the token owner in the local Availability Zone (AZ) and asynchronously replicated to other AZs and regions.
  • DC_QUORUM: Reads and writes are propagated synchronously to quorum number of nodes in the local region and asynchronously to the rest. The DC_QUORUM configuration writes to the number of nodes that make up a quorum. A quorum number is calculated by the formula ceiling((n+1)/2) where n is the number of nodes in a region. The operation succeeds if the read/write succeeded on a quorum number of nodes.

Test Setup

Client (workload generator) Cluster

For the workload generator, we used an internal Netflix tool called Pappy. Pappy is well integrated with other Netflix OSS services such as (Archaius for fast properties, Servo for metrics, and Eureka for discovery). However, any other other distributed load generator with Redis client plugin can be used to replicate the results. Pappy has support for modules, and one of them is Dyno Java client.
Dyno client uses topology aware load balancing (Token Aware) to directly connect to a Dynomite coordinator node that is the owner of the specified data. Dyno also uses zone awareness to send traffic to Dynomite nodes in the local ASG. To get full benefit of a Dynomite cluster a) the Dyno client cluster should be deployed across all ASGs, so all nodes can receive client traffic, and b) the number of client application nodes per ASG must be larger than the corresponding number of Dynomite nodes in the respective ASG so that the cumulative network capacity of the client cluster is at least equal to the corresponding one at the Dynomite layer.
Dyno also uses connection pooling for persistent connections to reduce the connection churn to the Dynomite nodes. However, in performance benchmarks tuning Dyno can tricky as the workload generator make become the bottleneck due to thread contention. In our benchmark, we observed the delay metrics to pick up a connection from the connection pool that Dyno exposes.
  • Client: Dyno Java client, using default configuration (token aware + zone aware)
  • Number of nodes: Equal to the number of Dynomite nodes in each experiment.
  • Region: us-west-2 (us-west-2a, us-west-2b and us-west-2c)
  • EC2 instance type: m3.2xlarge (30GB RAM, 8 CPU cores, Network throughput: high)
  • Platform: EC2-Classic
  • Data size: 1024 Bytes
  • Number of Keys: 1M random keys
  • Demo application used a simple workload of just key value pairs for read and writes i.e the Redis GET and SET api.
  • Read/Write ratio: 80:20 (the OPS was variable per test, but the ratio was kept 80:20)
  • Number of readers/writers: 80:20 ratio of reader to writer threads. 32 readers/8 writers  per Dynomite Node. We performed some experiments varying the number of readers and writers and found that in the context of our experiments, 32 readers and 8 writes per dynomite node gave the best throughput latency tradeoff.

Dynomite Cluster

  • Dynomite: Dynomite 0.5.6
  • Data store: Redis 2.8.9
  • Operating system: Ubuntu 14.04.2 LTS
  • Number of nodes: 3-48 (doubling every time)
  • Region: us-west-2 (us-west-2a, us-west-2b and us-west-2c)
  • EC2 instance type: r3.2xlarge (61GB RAM, 8 CPU cores, Network throughput: high)
  • Platform: EC2-Classic
The test was performed in a single region with 3 availability zones. Note that replicating to two other availability zones is not a requirement for Dynomite, but rather a deployment choice for high availability at Netflix. A Dynomite cluster of 3 nodes means that there was 1 node per availability zone. However all three nodes take client traffic as well as replication traffic from peer nodes. Our results were captured using Atlas. Each experiment was run 3 times, and our results are averaged over based on average of these times. Each run lasted 3h.
For the our benchmarks, we refer to the Dynomite node, as the node that contains the Dynomite layer and Redis. Hence we do not distinguish on whether Dynomite layer or Redis contributes to the average latency.

Linear Scale Test with DC_ONE

The graphs indicate that Dynomite can scale horizontally in terms of throughput. Therefore, it can handle more traffic by increasing the number of nodes per region. On a per node basis with r3.2xlarge nodes, Dynomite can fully process the traffic generated by the client workload generator (i.e. 32K reads OPS and 8K OPS on a per node basis). For a 1KB payload, as the above graphs show, the main bottleneck is the network (1Gbps EC2 instances). Therefore, Dynomite can potentially provide even faster throughput, if r3.4xlarge (2Gbps) or r3.8xlarge (10Gbps) EC2 instances are used. However, we need to note that 10Gbps optimizations will only be effective when the instances are launched in Amazon's VPC with instance types that can support Enhanced Networking using single root I/O virtualization (SR-IOV).
The average and median latency values show that Dynomite can provide sub-millisecond average latency to the client application. More specifically, Dynomite does not add extra latency as it scales to higher number of nodes, and therefore higher throughput. Overall, the Dynomite node contributes around 20% of the average latency, and the rest of it is a result of the network latency and client processing latency.
At the 95th percentile Dynomite's latency is 0.4ms and does not increase as we scale the cluster up/down. More specifically, the network and client is the major reason for the 95th percentile latency, as Dynomite's node effect is <10%.
It is evident from the 99th percentile graph that the latency for Dynomite pretty much remains the same while the client side increases indicating the variable nature of the network between the clusters.

Linear Scale Test With DC_QUORUM

The test setup was similar to what we used for DC_ONE tests above. Consistency was set to DC_QUORUM for both reads and writes on all Dynomite nodes. In DC_QUORUM, our expectations are that throughput will reduce and latency will increase because Dynomite waits for quorum number of responses.
Looking at the above graph it is clear that dynomite still scales well as the cluster nodes are increasing. Moreover Dynomite node achieves 18K OPS per node in our setup, when the cluster spans a single region. In comparison, Dynomite can achieve 40K OPS per node in DC_ONE.
The average and median latency remains <2.5ms even when DC_QUORUM consistency is enabled in Dynomite nodes. The average and median latency are slightly higher than the corresponding experiments with DC_ONE. In DC_ONE, the dynomite co-ordinator only waits for the local zone node to respond. In DC_QUORUM, the coordinator waits for quorum nodes to respond. Hence, in the overall read and write latency formula, the latency of the network hop to the other ASG, and the latency of performing the corresponding operation on those nodes in other ASGs must be included.
The 95th percentile at the Dynomite level is less than 2ms even after increasing the traffic on each Dyno client node (linear scale). At the client side it remains below 3ms.
At the 99th Percentile with DC_QUORUM enabled, Dynomite produces less than 3ms of latency. When considering the network from the cluster to the client, the latency remains well below 5ms opening the door for a number of applications that require consistency with low latency. Dynomite can report 90th percentile and 99.9th percentile through the statistics port. For brevity, we have decided to present the 99th percentile only.  

Pipelining

Redis Pipelining is client side batching that is also supported by Dynomite; the client application sends requests without waiting for a response from a previous request and later reads a single response for the whole batch. Pipelining makes it possible to increase the overall throughput at the expense of additional latency for individual operations. In the following experiments, the Dyno client randomly selected between 3 to 10 operations in one pipeline request. We believe that this configuration might be close to how a client application would use the Redis Pipelining. The experiments were performed for both DC_ONE and DC_QUORUM.
For comparison reason, we showcase both the non-pipelining and pipelining results. In our tests, pipelining increased the throughput upto 50%. For a small Dynomite cluster the improvement is larger, but as Dynomite horizontally scales the benefit of pipelining decreases.
Latency is a factor of how many requests are combined into one pipeline request so it will vary and will be higher than non pipelined requests.

Conclusion

We performed the tests to get some more insights about Dynomite using Redis at the data store layer, and how to size our clusters. We could have achieved better results with better instance types both at the client and Dyomite server cluster. For example, adding Dynomite nodes with better network capacity (especially the ones supporting enhanced Networking on Linux Instances in a VPC) could further increase the performance of our clusters.
Another way to improve the performance is by using fewer availability zones. In that case,  Dynomite would replicate the data in one more availability zone instead of two more, hence more bandwidth would have been available to client connections. In our experiment we used 3 availability zones in us-west-2, which is a common deployment in most production clusters at Netflix.
In summary, our benchmarks were based on instance types and deployments that are common at Netflix and Dynomite. We presented results that indicate that DC_QUORUM provides better read and write guarantees to the client but with higher latencies and lower throughput. We also showcased how a client can configure Redis Pipeline and benefit from request batching.
We briefly mentioned the availability of higher consistency in this article. In the next article we'll dive deeper into how we implemented higher consistency and how we handle anti-entropy.
by: Shailesh Birari, Jason Cacciatore, Minh Do, Ioannis Papapanagiotou, Christos Kalantzis