Friday, February 12, 2016

Distributed Time Travel for Feature Generation

We want to make it easy for Netflix members to find great content to fulfill their unique tastes. To do this, we follow a data-driven algorithmic approach based on machine learning, which we have described in past posts and other publications. We aspire to a day when anyone can sit down, turn on Netflix, and the absolute best content for them will automatically start playing. While we are still way off from that goal, it sets a vision for us to improve the algorithms that span our service: from how we rank videos to how we construct the homepage to how we provide search results. To make our algorithms better, we follow a two-step approach. First, we try an idea offline using historical data to see if it would have made better recommendations. If it does, we then deploy a live A/B test to see if it performs well in reality, which we measure through statistically significant improvements in core metrics such as member engagement, satisfaction, and retention.

While there are many ways to improve machine learning approaches, arguably the most critical is to provide better input data. A model can only be as good as the data we give it. Thus, we spend a lot of time experimenting with new kinds of input signals for our models. Most machine learning models expect input to be represented as a vector of numbers, known as a feature vector. Somehow we need to take an arbitrary input entity (e.g. a tuple of member profile, video, country, time, device, etc.), with its associated, richly structured data, and provide a feature vector representing that entity for a machine learning algorithm to use. We call this transformation feature generation and it is central to providing the data needed for learning. Examples features include how many minutes a member has watched a video, the popularity of the video, its predicted rating, what genre a video belongs to, or how many videos are in a row. We use the term feature broadly, since a feature could be a simple indicator or have a full model behind it, such as a Matrix Factorization.

We will describe how we built a time machine for feature generation using Apache Spark that enables our researchers to quickly try ideas for new features on historical data such that running offline experiments and transitioning to online A/B tests is seamless.

Why build a time machine?

There are many ways to approach feature generation, several of which we’ve used in the past. One way is to use logged event data that we store on S3 and access via Hive by running queries on these tables to define features. While this is flexible for exploratory analysis, it has several problems. First, to run an A/B test we need the feature calculation to run within our online microservice architecture. We run the models online because we know that freshness and responsiveness of our recommendations is important to the member experience. This means we would need to re-implement feature generation to retrieve data from online services instead of Hive tables. It is difficult to match two such implementations exactly, especially since any discrepancies between offline and online data sources can create unexpected differences in the model output. In addition, not all of our data is available offline, particularly output of recommendation models, because these involve a sparse-to-dense conversion that creates a large volume of data.

On the other extreme, we could log our features online where a model would be used. While this removes the offline/online discrepancy and makes transitioning to A/B test easy, it means we need to deploy each idea for a new feature into production and wait for the data to collect before we can determine if a feature is useful. This slows down the iteration cycle for new ideas. It also requires that all the data for a feature to be available online, which could mean building new systems to serve that data, again before we have determined if it is valuable. We also need to compute features for many more members or requests than we may actually need for training based on how we choose label data.

We’ve also tried a middle ground where we use feature code that calls online services, such as the one that provides viewing history, and filters out all the data with timestamps past a certain point in time. However, this only works for situations where a service records a log of all historical events; services that just provide the current state cannot be used. It also places additional load on the online services each time we generate features.

Throughout these approaches, management of time is extremely important. We want an approach that balances the benefits of all the above approaches without the drawbacks. In particular, we want a system that:
  • Enables quick iteration from idea to modeling to running an A/B test
  • Uses the data provided by our online microservices, without overloading them
  • Accurately represents input data for a model at a point in time to simulate online use
  • Handles our scale of data with many researchers running experiments concurrently, without using more than 1.21 gigawatts of power
  • Works well in an interactive environment, such as using a notebook for experimentation, and also reliably in a batch environment, such as for doing periodic retraining
  • Should only need to write feature code once so that we don’t need to spend time verifying that two implementations are exactly equivalent
  • Most importantly, no paradoxes are allowed (e.g. the label can’t be in the features)
When faced with tough problems one often wishes for a time machine to solve them. So that is what we decided to build. Our time machine snapshots online services and uses the snapshot data offline to reconstruct the inputs that a model would have seen online to generate features. Thus, when experimenters design new feature encoders — functions that take raw data as input and compute features — they can immediately use them to compute new features for any time in the past, since the time machine can retrieve the appropriate snapshots and pass them to the feature encoders.

How to build a Time Machine

Here are the various components needed in a time machine that snapshots online services:
  • Select contexts to snapshot
  • Snapshot data of various micro services for the selected context
  • Build APIs to serve this data for a given time coordinate in the past

Context Selection

Snapshotting data for all contexts (e.g all member profiles, devices, times of day) would be very expensive. Instead, we select samples of contexts to snapshot periodically (typically daily), though different algorithms may need to train on different distributions. For example, some use  stratified samples based on properties such as viewing patterns, devices, time spent on the service, region, etc. To handle this, we use Spark SQL to select an appropriate sample of contexts for each experiment from Hive. We merge the context set across experiments and persist it into S3 along with the corresponding experiment identifiers.

Data Snapshots

The next component in the time machine fetches data from various online services and saves a snapshot of the returned data for the selected contexts. Netflix embraces a fine-grained Service Oriented Architecture for our cloud-based deployment model. There are hundreds of such micro services that are collectively responsible for handling the member experience. Data from various such services like Viewing History, My List, and Predicted Ratings are used as input for the features in our models.

We use Netflix-specific components such as Eureka, Hystrix, and Archaius to fetch data from online services through their client libraries. However, some of these client libraries bulk-load data, so they have a high memory footprint and a large startup time. Spark is not well suited for loading such components inside its JVM. Moreover, the requirement of creating an uber jar to run Spark jobs can cause runtime jar incompatibility issues with other Netflix libraries. To alleviate this problem, we used Prana, which runs outside the Spark JVM, as a data proxy to the Netflix ecosystem.

Spark parallelizes the calls to Prana, which internally fetches data from various micro services for each of these contexts. We chose Thrift as the binary communication protocol between Spark and Prana. We store the snapshotted data in S3 using Parquet, a compressed column-oriented binary format, for both time and space efficiency, and persist the location of the S3 data in Cassandra.
Ensuring pristine data quality of these snapshots is critical for us to correctly evaluate our models. Hence, we store the confidence level for each snapshot service, which is the percentage of successful data fetches from the micro services excluding any fallbacks due to timeouts or service failures. We expose it to our clients, who can chose to use this information for their experimentation.

For both snapshotting and context selection, we needed to schedule several Spark jobs to run on a periodic basis, with dependencies between them. To that end, we built a general purpose workflow orchestration and scheduling framework called Meson, which is optimized for machine learning pipelines, and used it to run the Spark jobs for the components of the time machine. We intend to open source Meson in the future and will provide more detail about it in an upcoming blog post.

APIs for Time Travel

We built APIs that enable time travel and fetch the snapshot data from S3 for a given time in the past. Here is a sample API to get the snapshot data for the Viewing History service.

Given a destination time in the past, the API fetches the associated S3 location of the snapshot data from Cassandra and loads the snapshot data in Spark. In addition, when given an A/B test identifier, the API filters the snapshot data to return only those contexts selected for that A/B test. The system transforms the snapshot data back into the respective services’ Java objects (POJOs) so that the feature encoders operate on the exact same POJOs for both offline experimentation and online feature generation in production.

The following diagram shows the overall architecture of the time machine and where Spark is used in building it: from selecting members for experimentation, snapshotting data of various services for the selected members, to finally serving the data for a time in the past.

DeLorean: Generating Features via Time Travel

DeLorean is our internal project to build the system that takes an experiment plan, travels back in time to collect all the necessary data from the snapshots, and generates a dataset of features and labels for that time in the past to train machine learning models. Of course, the first step is to select the destination time, to bring it up to 88 miles per hour, then DeLorean takes care of the rest.

Running an Experiment

DeLorean allows a researcher to run a wide range of experiments by automatically determining how to launch the time machine, what time coordinates are needed, what data to retrieve, and how to structure the output. Thus, to run a new experiment, an experimenter only needs to provide the following:
  • Label data: A blueprint for obtaining a set of contexts with associated time coordinates, items, and labels for each. This is typically created by a Hive, Pig, or Spark SQL query
  • A feature model containing the required feature encoder configurations
  • Implementations of any new feature encoders that do not already exist in our library
DeLorean provides a capability for writing and modifying a new feature encoder during an experiment, for example, in a Zeppelin Notebook or in Spark Shell, so that it can be used immediately for feature generation. If we find that new feature encoder useful, we can later productionize it by adding it to our library of feature encoders.

The high-level process to generate features is depicted in the following diagram, where the blocks highlighted in light green are typically customized for new experiments. In this scenario, experimenters can also implement new feature encoders that are used in conjunction with existing ones.
DeLorean image by &

Label Data and Feature Encoders

One of the primary inputs to DeLorean is the label data, which contains information about the contexts, items, and associated labels for which to generate features. The contexts, as its name suggests, can be describe the setting for where a model is to be used (e.g. tuples of member profiles, country, time, device, etc.). Items are the elements which are to be trained on, scored, and/or ranked (e.g. videos, rows, search entities). Labels are typically the targets used in supervised learning for each context-item combination. For unsupervised learning approaches, the label is not required. As an example, for personalized ranking the context could be defined as the member profile ID, country code, and time, whereas the item as the video, and the labels as plays or non-plays. In this example, the label data is created by joining the set of snapshotted contexts to the logged play actions.

Once we have this label dataset, we need to compute features for each context-item combination in the dataset by using the desired set of feature encoders. Each feature encoder takes a context and each of target items associated with the context, together with some raw data elements in the form of POJOs, to compute one or more features.

Each type of item, context variable or data element, has a data key associated with it. Every feature encoder has a method that returns the set of keys for the data it consumes. DeLorean uses these keys to identify the required data types, retrieves the data, and passes it to the feature encoder as a data map — which is a map from data keys to data objects.

We made DeLorean flexible enough to allow the experiments to use different types of contexts and items without needing to customize the feature generation system. DeLorean can be used not only for recommendations, but also for a row ordering experiment which has profile-device tuple as context and rows of videos as items. Another use case may be a search experiment which has the query-profile-country tuple as context and individual videos as items. To achieve this, DeLorean automatically infers the type of contexts and items from the label data and the data keys required by the feature encoders.

Data Elements

Data elements are the ingredients that get transformed into features by a feature encoder. Some of these are context-dependent, such as viewing history for a profile, and others are shared by all contexts, such as metadata of the videos. We handle these two types of data elements differently.

For context-dependent data elements, we use the snapshots described above, and associate each one with a data key. We bring all the required snapshot data sources together with the values, items, and labels for each context, so that the data for a single context is sent to a single Spark executor. Different contexts are broken up to enable distributed feature generation. The snapshots are loaded as an RDD of (context, Map(data key -> data element)) in a lazy fashion and a series of joins between the label data and all the necessary context-dependent data elements are performed using Spark.

For context-independent data elements, DeLorean broadcasts these bulk data elements to each executor. Since these data elements have manageable sizes and often have a slow rate of change over time, we keep a record of each update that we use to rewind back to the appropriate previous version. These are kept in memory as singleton objects and made available to the feature generators for each context processed by an executor. Thus, a complete data map is created for each context containing the context data, context-dependent snapshot data elements, and shared data singletons.

Once the features are generated in Spark, the data is represented as a Spark DataFrame with an embedded schema. For many personalization application, we need to rank a number of items for each context. To avoid shuffling in the ranking process, item features are grouped by context in the output. The final features are stored in Hive using a Parquet format.

Model Training, Validation, and Testing

We use features generated using our time machine to train the models that we use in various parts of our recommendation systems. We use a standardized schema for passing the DataFrames of training features to machine learning algorithms, as well as computing predictions and metrics for trained models on the validation and test feature DataFrames. We also standardized a format to serialize the models that we use for publishing the models to be later consumed by online applications or in other future experiments.

The following diagram shows how we run a typical machine learning experiment. Once the experiment is designed, we collect the dataset of contexts, items, and labels. Next the features for the label dataset are generated. We then train models using either single machine, multi-core, or distributed algorithms and perform parameter tuning by computing metrics on a validation set. Then we pick the best models and compare them on a testing set. When we see a significant improvement in the offline metrics over the production model and that the outputs are different enough, we design an A/B test using variations of the model and run it online. If the A/B test shows a statistically significant increase in core metrics, we roll it out broadly. Otherwise, we learn from the results to iterate on the next idea.  

Going Online

One of the primary motivations for building DeLorean is to share the same feature encoders between offline experiments and online scoring systems to ensure that there are no discrepancies between the features generated for training and those computed online in production. When an idea is ready to be tested online, the model is packaged with the same feature configuration that was used by DeLorean to generate the features.

To compute features in the production system, we directly call our online microservices to collect the data elements required by all the feature encoders used in a model, instead of obtaining them from snapshots as we do offline. We then assemble them into data maps and pass them to the feature encoders. The feature vector is then passed to the offline-trained model for computing predictions, which are used to create our recommendations. The following diagram shows the high-level process of transitioning from an offline experiment to an online production system where the blocks highlighted in yellow are online systems, and the ones highlighted in blue are offline systems. Note that the feature encoders are shared between online and offline to guarantee the consistency of feature generation.

Conclusion and Future work

By collecting the state of the online world at a point in time for a select set of contexts, we were able to build a mechanism for turning back time. Spark’s distributed, resilient computation power enabled us to snapshot millions of contexts per day and to implement feature generation, model training and validation at scale. DeLorean is now being used in production for feature generation in some of the latest A/B tests for our recommender system.

However, this is just a start and there are many ways in which we can improve this approach. Instead of batch snapshotting on a periodic cadence, we can drive the snapshots based on events, for example at a time when a particular member visits our service. To avoid duplicate data collection, we can also capture data changes instead of taking full snapshots each time. We also plan on using the time machine capability for other needs in evaluating new algorithms and testing our systems. Of course, we leave the ability to travel forward in time as future work.

Fast experimentation is the hallmark of a culture of innovation. Reducing the time to production for an idea is a key metric we use to measure the success of our infrastructure projects. We will continue to build on this foundation to bring better personalization to Netflix in our effort to delight members and win moments of truth. If you are interested in these types of time-bending engineering challenges, join us.