Manifold Blog

Manifold Blog

Using Dask in Machine Learning: Preprocessing

Posted by Jason Carpenter on Apr 25, 2019 6:00:00 AM

Introduction

This is the second post in a five part series about using Dask in machine learning workflows:

  • Using Dask in Machine Learning: Best Practices
  • Using Dask in Machine Learning: Preprocessing
  • Using Dask in Machine Learning: Feature Engineering
  • Using Dask in Machine Learning: Model Training
  • Using Dask in Machine Learning: Model Evaluation

Starting with this post, each installment will have data snapshots and code snippets to give you an example of the problem we are working on. We have this public self-contained GitHub repo. You can pull that repo and run the code yourself and follow along more closely.

Additionally, we presented a summary of this blog series at the 2019 Anaconda conference, with the title Scalable Machine Learning Pipelines with Dask. You can download the slides from the talk here.

Sometimes the most time-consuming phase of dealing with data can be preprocessing, or manipulating the data into a format that you can work with for the feature engineering phase. This is particularly problematic when dealing with large datasets that do not fit in memory or have long processing times. Dask can help in addressing these challenges.

In this post, we will preprocess the abstracts from the arXiv repository of a peer-reviewed journal article so that we can classify them into subject categories. We will describe specific design patterns and best practices to avoid out of memory errors, and use Dask to accelerate the preprocessing phase.

Although we're going to use the arXiv NLP project to convey best practices, these design patterns generalize to many other use-cases (such as image preprocessing and data cleaning).

arXiv NLP Project

As is often the case in real-world machine learning projects, our data is in a raw format, which requires some preprocessing. We have a series of JSON files, which contain information captured in the following snapshot. We will be taking a closer look at the highlighted record.
raw_data_df.png

We are interested in using just the abstracts to predict which category or categories the paper is classified into. So let's look at the raw abstract for this article.

In [7]:" ".join(raw_df.loc["0704.0304", "abstract"].split("\n"))
Out[7]:
'This paper discusses the benefits of describing the world as information,
especially in the study of the evolution of life and cognition. Traditional
studies encounter problems because it is difficult to describe life and cognition
in terms of matter and energy, since their laws are valid only at the physical
scale. However, if matter and energy, as well as life and cognition, are described
in terms of information, evolution can be described consistently as information
becoming more complex. The paper presents eight tentative laws of information,
valid at multiple scales, which are generalizations of Darwinian, cybernetic,
thermodynamic, psychological, philosophical, and complexity principles.
These are further used to discuss the notions of life, cognition and their
evolution.'
It seems as though this paper is discussing a cross-section of a few different fields. Let's find out what category the paper is classified into by arXiv.
In [8]:raw_df.loc["0704.0304", "categories"]
Out[8]:
{'category-0': {'main_category': 'Computer Science',
'sub_category': 'Information Theory'},
'category-1': {'main_category': 'Computer Science',
'sub_category': 'Artificial Intelligence'},
'category-2': {'main_category': 'Mathematics',
'sub_category': 'Information Theory'},
'category-3': {'main_category': 'Quantitative Biology',
'sub_category': 'Populations and Evolution'}}
It turns out that this paper not only discusses topics from different fields, but it is even classified into three different main categories. This is what is referred to in machine learning as a multi-label classification problem. When creating our response variables, we will transform this into several binary classification problems by one-hot encoding this information and running a model for every category.

Preprocessing

We need to convert the data from this raw form into a format we can use for feature engineering. In the context of the arXiv project, that means two separate tasks:

  1. Tokenization of the abstracts
  2. One-hot encoding of the categories

The results of our processing will be of the following form shown below. The token_abs column contains the tokenized version of the abstracts.tokenized_df.png


Since our dataset is large and consists of several files, we don't want to do this in pandas on every file. We can use Dask to parallelize this workload. Imagine we have a function, preprocess_input_json, which preprocesses one file, and looks similar to the code below. For brevity, we can ignore the details of how the tokenize and get_ohced_main_category functions work. If you want to see the full code please check out the preprocessing notebook in our GitHub repo.
def preprocess_input_json(jfile, preprocessed_dir, category_list, return_df=False):
processed_df = pd.DataFrame()

with open(jfile, "r") as f:
raw_df = pd.read_json(f, orient='index')

processed_df['token_abs'] = raw_df['abstract'].map(tokenize)
ohced_main_cat = get_ohced_main_category(raw_df, category_list)
processed_df = pd.concat([processed_df, ohced_main_cat], axis=1)

out_file = os.path.join(preprocessed_dir, os.path.basename(jfile))
with open(out_file, 'w'):
processed_df.to_json(out_file, orient='records', lines=True)

if return_df:
return processed_df
Here we've written code to tokenize one file, but we have several files we want to preprocess in parallel. So we can use the dask.bag data structure to preprocess several files in parallel. Since we chose one json file to be the “unit” over which we are parallelizing, Dask's api makes it painless to write a wrapper function like the one below.
def dask_preprocess(input_dir, preprocessed_dir, glob_pattern, category_list):
input_files = pathlib.Path(input_dir).glob(glob_pattern)

json_db = db.from_sequence(input_files) \
.map(preprocess_input_json, preprocessed_dir, category_list)

json_db.compute()

The screenshot below shows the state of our local cluster when the above function is called. On the left, we see the asterisk on the currently executing cell. On the right, we can see the dashboard. There are four workers in our cluster and they are working in parallel, each processing one JSON file. So we have 4x parallelism compared to a serial loop using pure pandas code. If we replace the local cluster with a Dask cluster running on Kubernetes and our files on a cloud storage system like S3, we can increase this parallelism significantly to get further speedup and handle even larger datasets.

dask_dashboard.png

Design Patterns

Build incrementally

From our experience working on a variety of machine learning projects, we believe the best path to achieving an effective machine learning solution is one that starts with building an end-to-end system quickly. Once we have an end-to-end system off the ground, we can assess model performance and improve it using quantitative metrics. The quality of our final model then becomes a function of how quickly we can iterate on our approaches at each step in the ML workflow.

Said another way, the more time we spend fine-tuning code to be perfect from the get-go, the less time we spend knowing how well it actually works. Particularly early in the machine learning process, e.g. in preprocessing, we need to get up and running as quickly as possible and be able to make tweaks and iterate rapidly.

In our case, we started with simple non-parallelized code to process one file and then scaled out by writing a Dask wrapper to execute many data parallel jobs simultaneously. Once we had simple code using pandas that worked well, we parallelized it by putting a list of filepaths into a dask bag and then map the preprocessing transformation function on each filepath and write each transformed file back to disk in a new directory. In this way, we were able to write parallelized code in a much shorter amount of time. While one key motivation for using dask is to reduce compute time, another important consideration is to spend our development time wisely as well. If we spend hours trying to get more complicated parallelized code up and running, we are inherently limiting the amount of time we can devote to iterating rapidly.

Identify the parallelizable uniT

The implicit notion in building incrementally is that the code can be written such that there is a unit-parallelizable process. That is, we can write simple code to execute the process for one unit of data (e.g. files, images, unique keys in a pandas dataframe, elements in a list, etc.) and then write code to parallelize this process over all units of data in the dataset.

The choice of which unit to parallelize over is an important consideration. Broadly speaking, there are two options in this type of scenario, as illustrated in the picture below. The file-level parallelism works well for this situation because:

a) The overhead of spinning up worker processes is shared by processing of many records
b) We wanted to have output as separate files, one for each input file, which gives us better observability as data flows through our pipeline

In a different situation, ID-level parallelism might be a better fit. For example, imagine a situation where we have a tabular dataset that corresponds to customers visiting a website. There may be multiple records per customer in our table. A common operation in those types of datasets is to compute some metric per customer. In this scenario, it is natural to partition the dataset by customer ID as illustrated below and our operations would be parallelized over customers. In an extreme case of this ID-level parallelism, each row can have a unique ID. In that case, our operations would be parallelized over rows.

Screen Shot 2019-04-15 at 1.20.05 PM.png

Conclusion

In this blog post we learned how to utilize Dask for a simple NLP preprocessing step. Next we will utilize it for feature engineering, after which we will be ready for model training, before finally concluding with model evaluation.

ML pipelines - dask uses cases in ML (1)

Topics: Data engineering, Machine learning

Never Miss a Post

Get the Manifold Blog in Your Inbox

We publish occasional blog posts about our client work, open source projects, and conference experiences. We focus on industry insights and practical takeaways to help you accelerate your data roadmap and create business value.


Subscribe Here


Popular Posts