Wednesday, July 20, 2016

Netflix Billing Migration to AWS - Part II


This is a continuation in the series on Netflix Billing migration to the Cloud. An overview of the migration project was published earlier here. This post details the technical journey for the Billing applications and datastores as they were moved from the Data Center to AWS Cloud.

As you might have read in earlier Netflix Cloud Migration blogs, all of Netflix streaming infrastructure is now completely run in the Cloud. At the rate Netflix was growing, especially with the imminent Netflix Everywhere launch, we knew we had to move Billing to the Cloud sooner than later else our existing legacy systems would not be able to  scale.

There was no doubt that it would be a monumental task of moving highly sensitive applications and critical databases without disrupting the business, while at the same time continuing to build the new business functionality and features.

A few key responsibilities and challenges for Billing:

  • The Billing team is responsible for the financially critical data in the company. The data we generate on a daily basis for subscription charges, gift cards, credits, chargebacks, etc. is rolled up to finance and is reported into the Netflix accounting. We have stringent SLAs on our daily processing to ensure that the revenue gets booked correctly for each day. We cannot tolerate delays in processing pipelines.
  • Billing has zero tolerance for data loss.
  • For most parts, the existing data was structured with a relational model and necessitates use of transactions to ensure an all-or-nothing behavior. In other words we need to be ACID for some operations. But we also had use-cases where we needed to be highly available across regions with minimal replication latencies.
  • Billing integrates with the DVD business of the company, which has a different architecture than the Streaming component, adding to the integration complexity.
  • The Billing team also provides data to support Netflix Customer Service agents to answer any member billing issues or questions. This necessitates providing Customer Support with a comprehensive view of the data.

The way the Billing systems were, when we started this project, is shown below.
Canvas 1.png
  • 2 Oracle databases in the Data Center - One storing the customer subscription information and other storing the invoice/payment data.
  • Multiple REST-based applications - Serving calls from the www.netflix.com and Customer support applications. These were essentially doing the CRUD operations
  • 3 Batch applications -
  • Subscription Renewal - A daily job that looks through the customer base to determine the customers to be billed that day and the amount to be billed by looking at their subscription plans, discounts, etc.
  • Order & Payment Processor - A series of  batch jobs that create an invoice to charge the customer to be renewed and process the invoice through various stages of the invoice lifecycle.
  • Revenue Reporting - A daily job that looks through billing data and generates reports for the Netflix Finance team to consume.
  • One Billing Proxy application (in the Cloud) - used to route calls from rest of Netflix applications in the Cloud to the Data Center.
  • Weblogic queues with legacy formats being used for communications between processes.

The goal was to move all of this to the Cloud and not have any billing applications or databases in the Data Center. All this without disrupting the business operations. We had a long way to go!
The Plan

We came up with a 3-step plan to do it:
  • Act I - Launch new countries directly in the Cloud on the billing side while syncing the data back to the Data Center for legacy batch applications to continue to work.
  • Act II - Model the user-facing data, which could live with eventual consistency and does not need to be ACID, to persist to Cassandra (Cassandra gave us the ability to perform writes in one region and make it available in the other regions with very low latency. It also gives us high-availability across regions).
  • Act III - Finally move the SQL databases to the Cloud.
In each step and for each country migration, learn from it, iterate and improve on it to make it better.
Act I – Redirect new countries to the Cloud and sync data to the Data Center
Netflix was going to launch in 6 new countries soon. We decided to take it as a challenge to launch these countries partly in the Cloud on the billing side. What that meant was the user-facing data and applications would be in the Cloud, but we would still need to sync data back to the Data Center so some of our batch applications which would continue to run in the Data Center for the time-being, could work without disruption. The customer for these new countries data would be served out of the Cloud while the batch processing would still run out of the Data Center. That was the first step.
We ported all the APIs from the 2 user-facing applications to a Cloud based application that we wrote using Spring Boot and Spring Integration. With Spring Boot, we were able to quickly jump-start building a new application, as it provided the infrastructure and plumbing we needed to stand it up out of the box and let us focus on the business logic. With Spring Integration we were able to write once and reuse a lot of the workflow style code. Also with headers and header-based routing support that it provided, we were able to implement a pub-sub model within the application to put a message in a channel and have all consumers consume it with independent tuning for each consumer. We were now able to handle the API calls for members in the 6 new countries in any AWS region with the data stored in Cassandra. This enabled Billing to be up for these countries even if an entire AWS region went down – the first time we were able to see the power of being on the Cloud!

We deployed our application on EC2 instances in AWS in multiple regions. We added a redirection layer in our existing Cloud proxy application to switch billing calls for users in the new countries to go to the new billing APIs in the Cloud and billing calls for the users in the existing countries to continue to go to the old billing APIs in the Data Center. We opened direct connectivity from one of the AWS regions to the existing Oracle databases in the Data Center and wrote an application to sync the data from Cassandra via SQS in the 3 regions back to this region. We used SQS queues and Dead Letter Queues (DLQs) to move the data between regions and process failures.
New country launches usually mean a bump in member base. We knew we had to move our Subscription Renewal application from the Data Center to the Cloud so that we don’t put the load on the Data Center one. So for these 6 new countries in the Cloud, we wrote a crawler that went through all the customers in Cassandra daily and came up with the members who were to be charged that day. This all row iterator approach would work for now for these countries, but we knew it wouldn’t hold ground when we migrated the other countries and especially the US data (which had majority of our members  at that time) to the Cloud. But we went ahead with it for now to test the waters. This would be the only batch application that we would run from the Cloud in this stage.
We had chosen Cassandra as our data store to be able to write from any region and due to the fast replication of the writes it provides across regions. We defined a data model where we used the customerId as the key for the row and created a set of composite Cassandra columns to enable the relational aspect of the data. The picture below depicts the relationship between these entities and how we represented them in a single column family in Cassandra. Designing them to be a part of a single column family helped us achieve transactional support for these related entities.


We designed our application logic such that we read once at the beginning of any operation, updated objects in memory and persisted it to a single column family at the end of the operation. Reading from Cassandra or writing to it in the middle of the operation was deemed an anti-pattern.  We wrote our own custom ORM using Astyanax (a Netflix grown and open-sourced Cassandra client) to be able to read/write the domain objects from/to Cassandra.

We launched in the new countries in the Cloud with this approach and after a couple of initial minor issues and bug fixes, we stabilized on it. So far so good!
The Billing system architecture at the end of Act I was as shown below:
Canvas 2.png
Act II – Move all applications and migrate existing countries to the cloud
With Act I done successfully, we started focusing on moving the rest of the apps to the Cloud without moving the databases. Most of the business logic resides in the batch applications, which had matured over years and that meant digging into the code for every condition and spending time to rewrite it. We could not simply forklift these to the Cloud as is. We used this opportunity to remove dead code where we could, break out functional parts into their own smaller applications and restructure existing code to scale. These legacy applications were coded to read from config files on disk on startup and use other static resources like reading messages from Weblogic queues -  all anti-patterns in the Cloud due to the ephemeral nature of the instances. So we had to re-implement those modules to make the applications Cloud-ready. We had to change some APIs to follow an async pattern to allow moving the messages through the queues to the region where we had now opened a secure connection to the Data Center.
The Cloud Database Engineering (CDE) team setup a multi node Cassandra cluster for our data needs. We knew that the all row Cassandra iterator Renewal solution that we had implemented for renewing customers from earlier 6 countries would not scale once we moved the entire Netflix member billing data to Cassandra. So we designed a system to use Aegisthus to pull the data from Cassandra SSTables and convert it to JSON formatted rows that were staged out to S3 buckets. We then wrote Pig scripts to run mapreduce on the massive dataset everyday to fetch customer list to renew and charge for that day. We also wrote Sqoop jobs to pull data from Cassandra and Oracle and write to Hive in a queryable format which enabled us to join these two datasets in Hive for faster troubleshooting.

To enable DVD servers to talk to us in the Cloud, we setup load balancer endpoints (with SSL client certification) for DVD to route calls to us through the Cloud proxy, which for now would pipe the call back to the Data Center, until we migrated US. Once US data migration was done, we would sever the Cloud to Data Center communication link.

To validate this huge data migration, we wrote a comparator tool to compare and validate the data that was migrated to the Cloud, with the existing data in the Data Center. We ran the comparator in an iterative format, where we were able to identify any bugs in the migration, fix them, clear out the data and re-run. As the runs became clearer and devoid of issues, it increased our confidence in the data migration. We were excited to start with the migration of the countries. We chose a country with a small Netflix member base as the first country and migrated it to the Cloud with the following steps:

  • Disable the non-GET apis for the country under migration. (This would not impact members, but delay any updates to subscriptions in billing)
  • Use Sqoop jobs to get the data from Oracle to S3 and Hive.
  • Transform it to the Cassandra format using Pig.
  • Insert the records for all members for that country into Cassandra.
  • Enable the non-GET apis to now serve data from the Cloud for the country that was migrated.

After validating that everything looked good, we moved to the next country. We then ramped up to migrate set of similar countries together. The last country that we migrated was US, as it held most of our member base and also had the DVD subscriptions. With that, all of the customer-facing data for Netflix members was now being served through the Cloud. This was a big milestone for us!
After Act II, we were looking like this:
Canvas 3.png
Act III  – Good bye Data Center!
Now the only (and most important) thing remaining in the Data Center was the Oracle database. The dataset that remained in Oracle was highly relational and we did not feel it to be a good idea to model it to a NoSQL-esque paradigm. It was not possible to structure this data as a single column family as we had done with the customer-facing subscription data. So we evaluated Oracle and Aurora RDS as possible options. Licensing costs for Oracle as a Cloud database and Aurora still being in Beta didn’t help make the case for either of them.

While the Billing team was busy in the first two acts, our Cloud Database Engineering team was working on creating the infrastructure to migrate billing data to MySQL instances on EC2. By the time we started Act III, the database infrastructure pieces were ready, thanks to their help. We had to convert our batch application code base to be MySQL-compliant since some of the applications used plain jdbc without any ORM. We also got rid of a lot of the legacy pl-sql code and rewrote that logic in the application, stripping off dead code when possible.
Our database architecture now consists of a MySQL master database deployed on EC2 instances in one of the AWS regions. We have a Disaster Recovery DB that gets replicated from the master and will be promoted to master if the master goes down. And we have slaves in the other AWS regions for read only access to applications.
Our Billing Systems, now completely in the Cloud, look like this:
Canvas 4.png
Needless to say, we learned a lot from this huge project. We wrote a few tools along the way to help us debug/troubleshoot and improve developer productivity. We got rid of old and dead code, cleaned up some of the functionality and improved it wherever possible. We received support from many other engineering teams within Netflix. We had engineers from the Cloud Database Engineering, Subscriber and Account engineering, Payments engineering, Messaging engineering worked with us on this initiative for anywhere between 2 weeks to a couple of months. The great thing about the Netflix culture is that everyone has one goal in mind – to deliver a great experience for our members all over the world. If that means helping Billing solution move to the Cloud, then everyone is ready to do that irrespective of team boundaries!
The road ahead …
With Billing in the Cloud, Netflix streaming infrastructure now completely runs in the Cloud. We  can scale any Netflix service on demand, do predictive scaling based on usage patterns, do single-click deployments using Spinnaker and have consistent deployment architectures between various Netflix applications. Billing infrastructure can now make use of all the Netflix platform libraries and frameworks for monitoring and tooling support in the Cloud. Today we support billing for over 81 million Netflix members in 190+ countries. We generate and churn through terabytes of data everyday to accomplish  billing events. Our road ahead includes rearchitecting membership workflows for a global scale and business challenges. As part of our new architecture, we would be redefining our services to scale natively in the Cloud.  With the global launch, we have an opportunity to learn and redefine Billing and Payment methods in newer markets and integrate with many global  partners and local payment processors in the regions. We are looking forward to architect more functionality and scale out further.

If you like to design and implement large-scale distributed systems for critical data and build automation/tooling for testing it, we have a couple of positions open and would love to talk to you! Check out the positions here :

Monday, July 18, 2016

Chelsea: Encoding in the Fast Lane

Back in May Netflix launched its first global talk show: Chelsea. Delivering this new format was a first for us, and a fun challenge in many different aspects, which this blog describes in more detail. Chelsea Handler's new Netflix talk show ushered in a Day-of-Broadcast (DOB) style of delivery that is demanding on multiple levels for our teams, with a lightning-fast tight turnaround time. We looked at all the activities that take place in the Netflix Digital Supply Chain, from source delivery to live-on-site, and gave a time budget for each activity, pushing on all the teams to squeeze their times, aiming at an aggressive overall goal. In this article we explain enhancements and techniques that the encoding team used to successfully process this show faster than ever.

Historically there was not as much pressure on encode times. Our system was optimized for throughput and robustness, paying less attention to speed. In the last few years we had worked to reduce the ingest and encode time to about 2.5 hours. This met the demands of our most stringent use cases like the Day-After-Broadcast delivery of Breaking Bad. Now, Chelsea was pushing us to reduce this time even further. The new aggressive time budget calls for us to ingest and encode a 30 minute title in under 30 minutes. Our solution ends up using about 5 minutes for source inspection and 25 minutes for encoding.
The Starting Point
Although Chelsea challenged us to encode with a significantly shorter turnaround time compared to other movies or shows in our catalog, our work over the last few years on developing a robust and scalable cloud-based system helped jumpstart our efforts to meet this challenge.
Parallel Encoding
In the early days of Netflix streaming, the entire video encode of a title would be generated on a single Windows machine. For some streams (for example, slower codecs or higher resolutions), generating a single encode would take more than 24 hours. We improved on our system a few years ago by rolling out a parallel encoding workflow, which breaks up a title in “chunks” and the chunks can be processed in parallel on different machines. This allowed for shorter latency, especially as the number of machines scale up, and robustness to transient errors. If a machine is unexpectedly terminated, only a small amount of work is lost.
Automated Parallel Inspections
To ensure that we deliver high quality video streams to our members, we have invested in developing automated quality checks throughout the encoding pipeline. We start with inspecting the source “mezzanine” file to make sure that a pristine source is ingested into the system. Types of inspections include detection of wrong metadata, picture corruption, insertion of extra content, frame rate conversion and interlacing artifacts. After generating a video stream, we verify the encodes by inspecting the metadata, comparing the output video to the mezzanine fingerprint and generating quality metrics. This enables us to detect issues caused by glitches on the cloud instances or software implementation bugs. Through automated inspections of the encodes we can detect output issues early on, without the video having to reach manual QC. Just as we do encoding in parallel by breaking the source into chunks, likewise we can run our automated inspections in parallel by chunking the mezzanine file or encoded video.
Internal Spot Market
Since automated inspections and encoding are enabled to run in parallel in a chunked model, increasing the number of available instances can greatly reduce end-to-end latency. We recently worked on a system to dynamically leverage unused Netflix-reserved AWS servers during off-peak hours. The additional cloud instances, not used by other Netflix services, allowed us to expedite and prioritize encoding of Chelsea’s show.
Priority Scheduling
Encoding jobs can come in varying priorities from highly urgent (e.g. DOB titles, or interactive jobs submitted by humans) to low priority background backfill. Within the same title, certain codecs and bitrates rank higher in priority than others so that required bitrates necessary to go live are always processed first. To handle the fine grain and dynamic nature of job priority, the encoding team developed a custom priority messaging service. Priorities are broadly classified by priority class that models after the US Postal service classes of mail, and fine grain job priority is expressed by a due date. Chelsea belongs to the highest priority class, Express (sorry, no Sunday delivery). With the axiom that “what’s important is needed yesterday”, all Chelsea show jobs are due 30 years ago!
Innovations Motivated by Chelsea
As we analyzed our entire process looking for ways to make the process faster, it was apparent that DOB titles have different goals and characteristics than other titles. Some improvement techniques would only be practical on a DOB title, and others that might make sense on ordinary titles may only be practical on the smaller scale of DOB titles and not on the scale of the entire catalog. Low latency is often at odds with high throughput, and we still have to support an enormous throughput. So understand that the techniques described here are used selectively on the most urgent of titles.

When trying to make anything faster we consider these standard approaches:
  1. Use phased processing to postpone blocking operations
  2. Increase parallelism
  3. Make it plain faster
We will mention some improvements from each of these categories.
Phased Processing
Inspections
Most sources for Netflix originals go through a rigorous set of inspections after delivery, both manual and automated. First, manual inspections happen on the source delivered to us to check if it adheres to the Netflix source guidelines. With Chelsea, this inspection begins early with the pre-taped segments being inspected well before the show itself is taped. Then, inspections are done during taping and again during the editorial process, right on set. By the time it is delivered, we are confident that it needs no further manual QC because exhaustive QC was performed at post.
We have control over the source production; it is our studio and our crew and our editing process. It is well-rehearsed and well-known. If we assume the source is good, we can bypass the automated inspections that focus on errors introduced by the production process. Examples of inspections typically done on all sources are detection of telecine, interlacing, audio hits, silence in audio and bad channel mapping. Bypassing the most expensive inspections, such as deep audio inspections, allowed us to bring the execution time down from 30 minutes to about 5 minutes on average. Aside from detecting problems, the inspection stage generates artifacts that are necessary for the encoding processing. We maintain all inspections that produce these artifacts.
Complexity Analysis
A previous article described how we use an encoding recipe uniquely tailored to each title. The first step in this per-title encode optimization is complexity analysis, an expensive examination of large numbers of frames to decide on a strategy that is optimal for the title.
For a DOB title, we are willing to release it with a standard set of recipes and bitrates, the same way Netflix had delivered titles for years. This standard treatment is designed to give a good experience for any show and does an adequate job for something like Chelsea.
We launch an asynchronous job to do the complexity analysis on Chelsea, which will trigger a re-encode and produce streams with ultimate efficiency and quality. We are not blocked on this. If it is not finished by the show start date, the show will still go live with the standard streams. Sometime later the new streams will replace the old.
Increase Parallelism
Encoding in Chunks
As mentioned earlier, breaking up a video into small chunks and encoding different chunks in parallel can effectively reduce the overall encoding time. At the time the DOB project started, we still had a few codecs that were processed as a single chunk, such as h263. We took this opportunity to create a chunkable process for these remaining codecs.
Optimized Encoding Chunk Size
For DOB titles we went more extreme. After extensive testing with different chunk sizes, we discovered that by reducing the chunk size from our previous standard of 3 minutes to 30 seconds we can cut down the encoding time by 80% without noticeable video quality degradation.

More chunks means more overhead so for normal titles we stick to a 3 minute chunk size. For DOB titles we are willing to pay the increased overhead.
Reduce Dependency in Steps
Some older codec formats (for example, VC1), used by legacy TVs and Bluray players, were being encoded from lightly-compressed intermediate files. This meant we could not begin encoding these streams (which were one of the slower processes in our pipeline) until we had finished the intermediate encode. We changed our process to do the legacy streams directly from the source so that we did not have to wait for the intermediate step.
Make It Faster
Infrastructure Enhancements
Once an AV source is entered into the encoding system, we encode it with a number of codecs, resolutions, and bitrates for all Netflix playback devices. To meet the SLA for DOB encoding and be as fast as possible, we need to run all DOB encoding jobs in parallel without waiting. We have an extra challenge that with a finer grain of chunk size used for DOB, there are more jobs that need to be run in parallel.
Right Sizing
The majority of the computing resources are spent on video encoding. A relatively small percentage of computing is spent on source inspection, audio, subtitle, and other assets. It is easy to pre-scale the production environment for these smaller activities. On the other hand, with a 30 second chunk size, we drastically increased the number of parallel video encoding activities. For a 30 minute Chelsea episode, we estimated a need of 1,000 video encoders to compute all codecs, resolutions, and bit rates at the same time. For the video encoders, we make use of internal spot market, the unused Netflix reserved instances, to achieve this high instance count.
Warm Up
The resource scheduler normally samples the work queues and autoscales video encoders based on workload at the moment. Scaling Amazon EC2 instances takes time. The amount of time to scale depends on many factors and is something that could prevent us from achieving the proper SLA for encoding a DOB title. Pre-scaling 1,000 video encoders eliminates the scaling time penalty when a DOB title arrives. It is uneconomical to keep 1,000 video encoders 24x7 regardless of workload.
To strike a balance, we introduce a warm-up mechanism. We pre-scale 1,000 video encoders at the earliest signal of an imminent DOB title arrival, and keep them around for an hour. The Netflix ingest pipeline sends a notification to the resource scheduler whenever we start to receive a DOB title from the on set. Upon receiving the notification, the resource scheduler immediately procures 1,000 video encoders spread out over many instance types and zones (e.g. r3.2xlarge on us-east-1e) parallelizing instance acquisition and reduce the overall time. This strategy also mitigates the risk of running out of a specific instance type and availability zone combination.
Priority and Job Preemption
Since the warm-up comes in advance of having actual DOB jobs, the video encoders will busy themselves with existing encode jobs. By the time the DOB Express priority jobs arrive, it is possible that a video encoder already has a lower priority job in-flight. We can't afford to wait for these jobs to finish before beginning the Express priority jobs. To mitigate this scenario, we enhanced our custom-built priority messaging service with job preemption where a high priority job such as a Chelsea video encode interrupts a lower priority job.
Empirical data shows that all DOB jobs are picked up within 60 seconds on average.
The Fast Lane
We examined all the interactions with other systems and teams and identified latencies that could be improved. Like the encoding system, other systems at Netflix were also designed for high throughput, while latency was a secondary concern. For all systems overall, throughput remains a top priority and we cannot sacrifice in this area. In many cases it was not practical to improve the latency of interactions for all titles while satisfying throughput demands so we developed special fast lane communications. Communications about DOB titles follow a different path, one that leads to lower latency.
Conclusion
We achieved our goal of reducing the time for ingest and encode to be approximately the runtime of the source, i.e. 30 minutes for a 30 minute source. We were pleased that the architecture we put in place in recent years that emphasizes flexibility and configurability provided a great foundation for building out the DOB process. This investment paid off by allowing us to quickly respond to business demands and effectively deliver our first talk show to members all around the world.

By Rick Wong, Zhan Chen, Anne Aaron, Megha Manohara, and Darrell Denlinger