Wednesday, July 20, 2016

Netflix Billing Migration to AWS - Part II

This is a continuation in the series on Netflix Billing migration to the Cloud. An overview of the migration project was published earlier here. This post details the technical journey for the Billing applications and datastores as they were moved from the Data Center to AWS Cloud.

As you might have read in earlier Netflix Cloud Migration blogs, all of Netflix streaming infrastructure is now completely run in the Cloud. At the rate Netflix was growing, especially with the imminent Netflix Everywhere launch, we knew we had to move Billing to the Cloud sooner than later else our existing legacy systems would not be able to  scale.

There was no doubt that it would be a monumental task of moving highly sensitive applications and critical databases without disrupting the business, while at the same time continuing to build the new business functionality and features.

A few key responsibilities and challenges for Billing:

  • The Billing team is responsible for the financially critical data in the company. The data we generate on a daily basis for subscription charges, gift cards, credits, chargebacks, etc. is rolled up to finance and is reported into the Netflix accounting. We have stringent SLAs on our daily processing to ensure that the revenue gets booked correctly for each day. We cannot tolerate delays in processing pipelines.
  • Billing has zero tolerance for data loss.
  • For most parts, the existing data was structured with a relational model and necessitates use of transactions to ensure an all-or-nothing behavior. In other words we need to be ACID for some operations. But we also had use-cases where we needed to be highly available across regions with minimal replication latencies.
  • Billing integrates with the DVD business of the company, which has a different architecture than the Streaming component, adding to the integration complexity.
  • The Billing team also provides data to support Netflix Customer Service agents to answer any member billing issues or questions. This necessitates providing Customer Support with a comprehensive view of the data.

The way the Billing systems were, when we started this project, is shown below.
Canvas 1.png
  • 2 Oracle databases in the Data Center - One storing the customer subscription information and other storing the invoice/payment data.
  • Multiple REST-based applications - Serving calls from the and Customer support applications. These were essentially doing the CRUD operations
  • 3 Batch applications -
  • Subscription Renewal - A daily job that looks through the customer base to determine the customers to be billed that day and the amount to be billed by looking at their subscription plans, discounts, etc.
  • Order & Payment Processor - A series of  batch jobs that create an invoice to charge the customer to be renewed and process the invoice through various stages of the invoice lifecycle.
  • Revenue Reporting - A daily job that looks through billing data and generates reports for the Netflix Finance team to consume.
  • One Billing Proxy application (in the Cloud) - used to route calls from rest of Netflix applications in the Cloud to the Data Center.
  • Weblogic queues with legacy formats being used for communications between processes.

The goal was to move all of this to the Cloud and not have any billing applications or databases in the Data Center. All this without disrupting the business operations. We had a long way to go!
The Plan

We came up with a 3-step plan to do it:
  • Act I - Launch new countries directly in the Cloud on the billing side while syncing the data back to the Data Center for legacy batch applications to continue to work.
  • Act II - Model the user-facing data, which could live with eventual consistency and does not need to be ACID, to persist to Cassandra (Cassandra gave us the ability to perform writes in one region and make it available in the other regions with very low latency. It also gives us high-availability across regions).
  • Act III - Finally move the SQL databases to the Cloud.
In each step and for each country migration, learn from it, iterate and improve on it to make it better.
Act I – Redirect new countries to the Cloud and sync data to the Data Center
Netflix was going to launch in 6 new countries soon. We decided to take it as a challenge to launch these countries partly in the Cloud on the billing side. What that meant was the user-facing data and applications would be in the Cloud, but we would still need to sync data back to the Data Center so some of our batch applications which would continue to run in the Data Center for the time-being, could work without disruption. The customer for these new countries data would be served out of the Cloud while the batch processing would still run out of the Data Center. That was the first step.
We ported all the APIs from the 2 user-facing applications to a Cloud based application that we wrote using Spring Boot and Spring Integration. With Spring Boot, we were able to quickly jump-start building a new application, as it provided the infrastructure and plumbing we needed to stand it up out of the box and let us focus on the business logic. With Spring Integration we were able to write once and reuse a lot of the workflow style code. Also with headers and header-based routing support that it provided, we were able to implement a pub-sub model within the application to put a message in a channel and have all consumers consume it with independent tuning for each consumer. We were now able to handle the API calls for members in the 6 new countries in any AWS region with the data stored in Cassandra. This enabled Billing to be up for these countries even if an entire AWS region went down – the first time we were able to see the power of being on the Cloud!

We deployed our application on EC2 instances in AWS in multiple regions. We added a redirection layer in our existing Cloud proxy application to switch billing calls for users in the new countries to go to the new billing APIs in the Cloud and billing calls for the users in the existing countries to continue to go to the old billing APIs in the Data Center. We opened direct connectivity from one of the AWS regions to the existing Oracle databases in the Data Center and wrote an application to sync the data from Cassandra via SQS in the 3 regions back to this region. We used SQS queues and Dead Letter Queues (DLQs) to move the data between regions and process failures.
New country launches usually mean a bump in member base. We knew we had to move our Subscription Renewal application from the Data Center to the Cloud so that we don’t put the load on the Data Center one. So for these 6 new countries in the Cloud, we wrote a crawler that went through all the customers in Cassandra daily and came up with the members who were to be charged that day. This all row iterator approach would work for now for these countries, but we knew it wouldn’t hold ground when we migrated the other countries and especially the US data (which had majority of our members  at that time) to the Cloud. But we went ahead with it for now to test the waters. This would be the only batch application that we would run from the Cloud in this stage.
We had chosen Cassandra as our data store to be able to write from any region and due to the fast replication of the writes it provides across regions. We defined a data model where we used the customerId as the key for the row and created a set of composite Cassandra columns to enable the relational aspect of the data. The picture below depicts the relationship between these entities and how we represented them in a single column family in Cassandra. Designing them to be a part of a single column family helped us achieve transactional support for these related entities.

We designed our application logic such that we read once at the beginning of any operation, updated objects in memory and persisted it to a single column family at the end of the operation. Reading from Cassandra or writing to it in the middle of the operation was deemed an anti-pattern.  We wrote our own custom ORM using Astyanax (a Netflix grown and open-sourced Cassandra client) to be able to read/write the domain objects from/to Cassandra.

We launched in the new countries in the Cloud with this approach and after a couple of initial minor issues and bug fixes, we stabilized on it. So far so good!
The Billing system architecture at the end of Act I was as shown below:
Canvas 2.png
Act II – Move all applications and migrate existing countries to the cloud
With Act I done successfully, we started focusing on moving the rest of the apps to the Cloud without moving the databases. Most of the business logic resides in the batch applications, which had matured over years and that meant digging into the code for every condition and spending time to rewrite it. We could not simply forklift these to the Cloud as is. We used this opportunity to remove dead code where we could, break out functional parts into their own smaller applications and restructure existing code to scale. These legacy applications were coded to read from config files on disk on startup and use other static resources like reading messages from Weblogic queues -  all anti-patterns in the Cloud due to the ephemeral nature of the instances. So we had to re-implement those modules to make the applications Cloud-ready. We had to change some APIs to follow an async pattern to allow moving the messages through the queues to the region where we had now opened a secure connection to the Data Center.
The Cloud Database Engineering (CDE) team setup a multi node Cassandra cluster for our data needs. We knew that the all row Cassandra iterator Renewal solution that we had implemented for renewing customers from earlier 6 countries would not scale once we moved the entire Netflix member billing data to Cassandra. So we designed a system to use Aegisthus to pull the data from Cassandra SSTables and convert it to JSON formatted rows that were staged out to S3 buckets. We then wrote Pig scripts to run mapreduce on the massive dataset everyday to fetch customer list to renew and charge for that day. We also wrote Sqoop jobs to pull data from Cassandra and Oracle and write to Hive in a queryable format which enabled us to join these two datasets in Hive for faster troubleshooting.

To enable DVD servers to talk to us in the Cloud, we setup load balancer endpoints (with SSL client certification) for DVD to route calls to us through the Cloud proxy, which for now would pipe the call back to the Data Center, until we migrated US. Once US data migration was done, we would sever the Cloud to Data Center communication link.

To validate this huge data migration, we wrote a comparator tool to compare and validate the data that was migrated to the Cloud, with the existing data in the Data Center. We ran the comparator in an iterative format, where we were able to identify any bugs in the migration, fix them, clear out the data and re-run. As the runs became clearer and devoid of issues, it increased our confidence in the data migration. We were excited to start with the migration of the countries. We chose a country with a small Netflix member base as the first country and migrated it to the Cloud with the following steps:

  • Disable the non-GET apis for the country under migration. (This would not impact members, but delay any updates to subscriptions in billing)
  • Use Sqoop jobs to get the data from Oracle to S3 and Hive.
  • Transform it to the Cassandra format using Pig.
  • Insert the records for all members for that country into Cassandra.
  • Enable the non-GET apis to now serve data from the Cloud for the country that was migrated.

After validating that everything looked good, we moved to the next country. We then ramped up to migrate set of similar countries together. The last country that we migrated was US, as it held most of our member base and also had the DVD subscriptions. With that, all of the customer-facing data for Netflix members was now being served through the Cloud. This was a big milestone for us!
After Act II, we were looking like this:
Canvas 3.png
Act III  – Good bye Data Center!
Now the only (and most important) thing remaining in the Data Center was the Oracle database. The dataset that remained in Oracle was highly relational and we did not feel it to be a good idea to model it to a NoSQL-esque paradigm. It was not possible to structure this data as a single column family as we had done with the customer-facing subscription data. So we evaluated Oracle and Aurora RDS as possible options. Licensing costs for Oracle as a Cloud database and Aurora still being in Beta didn’t help make the case for either of them.

While the Billing team was busy in the first two acts, our Cloud Database Engineering team was working on creating the infrastructure to migrate billing data to MySQL instances on EC2. By the time we started Act III, the database infrastructure pieces were ready, thanks to their help. We had to convert our batch application code base to be MySQL-compliant since some of the applications used plain jdbc without any ORM. We also got rid of a lot of the legacy pl-sql code and rewrote that logic in the application, stripping off dead code when possible.
Our database architecture now consists of a MySQL master database deployed on EC2 instances in one of the AWS regions. We have a Disaster Recovery DB that gets replicated from the master and will be promoted to master if the master goes down. And we have slaves in the other AWS regions for read only access to applications.
Our Billing Systems, now completely in the Cloud, look like this:
Canvas 4.png
Needless to say, we learned a lot from this huge project. We wrote a few tools along the way to help us debug/troubleshoot and improve developer productivity. We got rid of old and dead code, cleaned up some of the functionality and improved it wherever possible. We received support from many other engineering teams within Netflix. We had engineers from the Cloud Database Engineering, Subscriber and Account engineering, Payments engineering, Messaging engineering worked with us on this initiative for anywhere between 2 weeks to a couple of months. The great thing about the Netflix culture is that everyone has one goal in mind – to deliver a great experience for our members all over the world. If that means helping Billing solution move to the Cloud, then everyone is ready to do that irrespective of team boundaries!
The road ahead …
With Billing in the Cloud, Netflix streaming infrastructure now completely runs in the Cloud. We  can scale any Netflix service on demand, do predictive scaling based on usage patterns, do single-click deployments using Spinnaker and have consistent deployment architectures between various Netflix applications. Billing infrastructure can now make use of all the Netflix platform libraries and frameworks for monitoring and tooling support in the Cloud. Today we support billing for over 81 million Netflix members in 190+ countries. We generate and churn through terabytes of data everyday to accomplish  billing events. Our road ahead includes rearchitecting membership workflows for a global scale and business challenges. As part of our new architecture, we would be redefining our services to scale natively in the Cloud.  With the global launch, we have an opportunity to learn and redefine Billing and Payment methods in newer markets and integrate with many global  partners and local payment processors in the regions. We are looking forward to architect more functionality and scale out further.

If you like to design and implement large-scale distributed systems for critical data and build automation/tooling for testing it, we have a couple of positions open and would love to talk to you! Check out the positions here :