Time-Series Missing Data Imputation In Apache Spark
In a recent project, I needed to do some time-based imputation across a large set of data. I tried to implement my own solution with moderate success before scouring the internet for a solution. After an hour or so, I came across this article about the Spark-TS package. The package is built around doing Time-series based analysis in Spark, which wasn't exactly my use case. I had all the time series stuff done, I just wanted to fill in missing data. After digging through the documentation, I found a set of imputation techniques within the package. This post is some of my notes reformulated into a blog because I thought people might find it helpful.
The Missing Data Problem
Missing data is far from uncommon when working with datasets. Wether a sensor failed to collect data, a database shut off before a write could complete or a human made an error, missing data happens. In some analyses, simply ignoring missing data is a satisfactory approach. In a dataset with one billion rows if you ignore one row, it's hardly likely to have a statistically significant impact. Alternatively, If missing data occurs in some systematic way (Like every Friday data is missing), then that presents a bigger problem for an analysis. Systematic missing data can introduce a great deal of bias and creates operational overhead as an analyst. Often times, especially in cases where missing data is not missing in some systematic way, it's worth imputing or filling in values for data.
Naturally, how to impute become is the question anyone would ask. It's easy to imagine ways to use imputation as a way to skew results or that could make an otherwise straight forward analysis complicated. I don't have the ability to get into all the details of imputation and the ethics surrounding it but I can recommend reading a textbook chapter on it.
The Imputation Problem
The laziest imputation is replacing all missing data with zeros. For a lot of applications, this is wholly acceptable but in a portion of applications we can do better. There is a large literature on imputation techniques from re-sampling to deep-learning. The problem remains, how should I impute? Should I take an average? Should we do a regression and try to predict the missing values? Should we cluster? The answer is, it depends. In the Time-series world, it makes the most sense to try to impute based on previous or future data. Spark-TS supports a handful of methods for doing this out of the box. To Illustrate how each of them work, I've included a couple illustrations.
Linear
A linear imputation treats time as the x value and your y value is whatever you are trying to measure over time. It uses this to build a simple OLS linear regression model and fill in missing data. This is a reasonable approach given time series data, however there are many scenarios in which a linear model could not be fit, like having only 1 observation over a time window.
Nearest
The goal of the “nearest” approach to imputation is to order the data by time and for each missing data, impute it with the closest point. For example, if you had data ordered by time that looked like:
12, 12, 11, 10, 15, NA, NA, NA, 13
The first NA from left to right would be imputed with 15 because it's the closest value that is not missing. In cases of a spilt you, you can either choose random or lean to previous of next values. There are some problems with this approach, some more obvious than others. One of the greater dangers is imputing bad data and spreading it across more points. Each of the approaches has this problem but it's more pronounced in this example.
Splines
Splines uses a MARS technique to fill in missing data points. Wikipedia has a great explanation of how splines works. Essentially, it is an adaptive rather than a more prescriptive approached offered by OLS. This can be a handy method if you have data that's across a period of time. It can help smooth some of the irregularities in data the previous methods would not.
Previous
Previous is a derivative of “nearest”, however instead of looking for nearest it looks for the previous non-missing value. All of the weaknesses of the nearest are inherited here.
Next
Next is also a child of nearest and works the opposite of previous.
Zero
Zero is simply filling missing data with zero. It is the laziest form of imputation but in some cases may be appropriate.
An Example
My data looks like this: I've got a date column, that actually a string in the format: “YYYY-mm-dd”. A Sales column which is a FLOAT and an id (String) column for each row. From here I need to get to a TimeSeriesRDD, defined as:
TimeSeriesRDD(index: DateTimeIndex, parent: RDD[(K, Vector)])(implicit kClassTag: ClassTag[K])
That might not be too helpful, but Spark-TS has a built in method for making this conversion. We just need to create a DateTime index over the window of time we're looking at. The implementation is not too bad:
I'll also convert my string date to a TimeStamp:
With these transformations, we can create an TimeSeriesRDD and take a look at the different imputation techniques. Here is all the code required:
Before we look at imputation, let's take a peak at the data:
This is a key value pair, where the key is the id and the value is our sales data ordered by time. It's an array and the NaN values are missing. Imputing will fill in this with another value.
If we use the zero imputation technique the data is what you'd expect:
For something more interesting, we can look at splines:
You'll notice these print outs are all RDDs and not DataFrames. We can quickly get back to a dataframe with the TimeseriesRDD API
That will return a Dataframe with the listed columns.
That's it
As we demonstrated here, it's fairly trivial to impute time series based data with Spark. There are some additional imputation techniques that could be interesting and it's worth submitting pull requests to the maintainer if you come up with a good one!
Bonus
You can build your own custom imputation if you're not satisfied with those offered by the package. Extending the imputation implementation isn't hard, and you are just manipulating an array which is a standard collection in Scala. Here is an example of using the max value in a series: