Thursday, June 28, 2012

Scalable Logging and Tracking

Scalable Logging

by Kedar Sadekar


At Netflix we work hard to improve personalized recommendations. We use a lot of data to make recommendations better. What may seem an arbitrary action -- scrolling up, down, left or right and how much -- actually provides us with valuable information. We work to get all the necessary data points and feedback to provide the best user experience.

It is obvious that to capture the large amount of data generated, we need a dedicated, fast, scalable and highly available and asynchronous collection system that does not slow the user experience.
In this post we discuss the decisions and considerations that went into building a service that accepts a few billion requests a day, processing and storing these requests for later use and analysis by various systems within Netflix.

Considerations

We did not want this service to disrupt the user experience, hence, the main objective was as low a latency as possible. It also needed to scale to handle billions of requests a day. The data sent to and processed by this service is noncritical data. That was an important factor in our design where we made a conscious choice of being ok with dropping data (user events) as opposed to providing a sub-optimal client experience. From the client side, the call is fire-and-forget. That essentially means that the client should not care what the end result of the call was (success/failure).

Data Size

The average size of the request and the logged data is around 16 KB (range: 800 bytes ~ 130 KB) whereas the response average is pretty consistent at around 512 bytes. Here is an example of the data (fields) that we capture: video, device, page, timestamp.

Latency

The service needs to handle a billion plus requests a day, and peak traffic could be 3 - 6 times the average when measured in terms of requests per second (RPS). To achieve our goal of having a low millisecond latency for this service. Here are some of the practises we adopted:

Holding on to the request is expensive

This service is developed using Java, deployed on a standard Tomcat container. To achieve a high throughput, we want to free up Tomcat threads as soon as we can. To do that we do not hold on to the request for any longer than required. The methodology we used was simple, we grab whatever data we need from the HTTP request object, push it onto a thread pool for processing later and flush the response to the client immediately. Holding on to the request for any longer translates to a smaller throughput per node in the cluster. A lower throughput per node in the cluster means having to scale more horizontally and scaling horizontally beyond a point is inefficient and cost ineffective.

Fail fast / first

Return as quickly as you can, which means you try to identify your failure cases first, before doing any unnecessary processing. Return as soon as you know there is no point moving forward.
An example: If your data must have some data from the cookie, try to crack the cookie first, before dealing with any other request parameters. If the cookie does not have the required data, return, don’t bother looking at any other data the request body contains.  

HTTP codes  

We captured all the 4xx / 5xx / 2xx responses that we serve. Some services don’t care about a failure, in those cases, we just returned a HTTP 202 (accepted) response. Having these metrics in place helps you tune your code, and if the calling service does not care, why bother returning a 4xx response. We have alert triggering mechanisms based on the percentage of the HTTP response codes.

Dependencies can and will slow down sometimes

We did an exercise to identify all dependencies (other Netflix services / jars) that this service depended on which were going to make across the wire calls. We have learned that however reliable and robust the dependencies are, there will be network glitches and service latency issues at some point or another. We do not want the logging service to be bogged down by such issues,
For any such service calls, we guard them by wrapping them using Java Futures with appropriate timeouts. Aggressive timeouts were specially reserved for those calls that were in the hot path (before the response is flushed). Adding a lot of metrics helped in understanding if a service was timing out too often or was the slowest.

Process Later

Once we had all the data we needed, we put into a queue for asynchronous execution by an executor pool.
The following diagram illustrates what has been described above.

 

Garbage Collection


For a service written entirely in Java, an important factor when deploying is pause times during Garbage Collections. The nature of this service is an extremely large volume of really short-lived objects. We played around with GC tuning variables to achieve the  best throughput. As part of these experiments, we tried various combinations of the parallel generational collector and the CMS (Concurrent Mark Sweep) collector too. We setup canaries taking peak production traffic for at least a couple of days with different combinations for young gen to heap ratios.
Each time we had a winner, we pitted the CMS canary against the best canary with the parallel collector. We did this 2-3 times until we were sure we had a winner.
The winner was analyzed by capturing the GC logs and mining them for timings and counts of new gen (par-new), Full GC’s and CMS failures (if any) etc.  We learned that having canaries is the only way of knowing for sure. Don’t be in a hurry to pick a winner.

Auto Scaling

Measure, Measure

Since traffic (rps) is unpredictable, at Netflix heavily leverage auto-scaling policies. There are different metrics that one could use to auto-scale a cluster, the most common ones being CPU load and RPS. We chose to primarily use RPS. CPU load is used to trigger alerts both at instance and cluster levels. A lot of the metrics gathered are powered by our own Servo code (available on github). 
We have collected the metrics over a few days, including peak traffic at weekends and then applied the policies that enable us to effectively scale in the cloud. [See reference on auto-scaling]

Have Knobs

All these throughput measurements were done in steps. We had knobs in place that allowed us to slowly ramp-up traffic, observe the system behavior and make necessary changes, gaining confidence in what the system could handle.
Here is a graph showing the RPS followed by a graph showing the average latency metrics (in milliseconds) over the same period.





Persistence

The real magic of such voluminous data collection and aggregation is actually done by our internal log collectors. Individual machines have agents that send the logs to collectors and finally to the data sink (Hive for example).



Common Infrastructure / Multiple end-points
As different teams within Netflix churn out different features and algorithms, the need to measure the efficacy and success of those never diminishes. However, those teams would love to focus on their core competencies instead of having to setup up a logging / tracking infrastructure that caters to their individual needs.
It made perfect sense for those teams to direct their traffic to the logging service. Since the data required by each team is disparate, each of these teams’ needs is considered as a new end-point on the logging service.
Supporting a new client is simple, with the main decision being whether the traffic warrants an independent cluster or can be co-deployed with a cluster that supports other end-points.

When a single service exposes multiple end-points with hundreds of millions of requests a day per end point, we needed to decide between just scaling horizontally forever or break it down into multiple clusters by functionality. There are pros / cons of doing it either way. Here are a few:

Pros of single cluster
-       Single deployment
-       One place to manage / track
Pros of multiple deployment
-       Failure in one end-point does not affect another, especially in internal dependencies
-       Ability to independently scale up/down volume
-       Easier to debug issues

Conclusion

As the traffic was ramped up, we have been able to scale up very comfortably so far learning, lessons as we went along.
Data is being analyzed multiple ways by our algorithmic teams. For example - which row types (Top 10, most recently watched etc.) did most plays emanate from. How did that vary by country and device. How far did users scroll left / right across devices - and do users ever go beyond a certain point. These and many other data points are being examined to improve our algorithms to provide users with a better viewing experience.  

References

Servo : 

Auto-scaling: 

Join Us

Like what you see and want to work on bleeding edge performance and scale?
by Kedar Sadekar, Senior Software Engineer, Product Infrastructure Team