tsfresh on Large Data Samples - Part II

tsfresh on Large Data Samples - Part II

- 23 mins

In the last post we have explored how tsfresh automatically extracts many time series features from your input data. We have also discussed two possibilities to speed up your feature extraction calculation: using multiple cores on your local machine (which is already tuned on by default) or distributing the calculation over a cluster of machines.

In this post we will go one step further: what happens if your data is so large that loading the data into your scheduling machine is not an option anymore? For many applications, this is is not the case (and keeping everything local speeds up the development cycle as well as decreased the amount of possible mistakes). However, sometimes you really need to think big.

Apache Spark and dask bindings

There exist multiple solutions to solve the problem of distributing not only the work, but also the data - actually the whole field of big data and data engineering builds around that. The general idea is to only load a fraction of the data into each machine and perform the calculation on this partition of the data - instead of loading all the data at the same time. The inputs and results are distributed either via network transmissions or via a distributed file system, such as HDFS.

Two of the best known examples for this (at least in the Python world) are Apache Spark (with its python bindings PySpark) and dask. We will not cover the basics of Apache Spark nor dask here - they are both wonderful libraries and if you never used them you should definitely check them out! If you do not want to use any of those frameworks, but you still have too much data to handle, have a look into the next section where we will discuss a simplified (but also less powerful) solution.

Since version 0.15, tsfresh contains convenience functions to input a Spark dataframe or a dask dataframe into tsfresh (remember: normally you can only use pandas dataframes). They are both defined in the tsfresh.convenience.bindings module (with documentation here) and we will cover them in the remainder of this section.

The idea is as follows:

As an example, we will again use the robot failure datasample from our Quickstart. Even though it is still small enough to fit into your memory, we will treat it as “big data” and spill it out to multiple parquet files on disk. Make sure to have a pandas parquet binding installed for this (such as pyarrow, see installation instructions) or use a different format.

from tsfresh.examples import robot_execution_failures
import os

robot_execution_failures.download_robot_execution_failures()
timeseries, _ = robot_execution_failures.load_robot_execution_failures()

def store_data(data_chunk):
    data_id = data_chunk["id"].iloc[0]

    os.makedirs(f"data/{data_id}", exist_ok=True)
    data_chunk.to_parquet(f"data/{data_id}/data.parquet", index=False)

timeseries.groupby("id").apply(store_data)

Lets start with Apache Spark first.

(Py)Spark and tsfresh

Apache Spark is basically the framework for writing and distributing fault-tolerant data pipelines. Even though it is written in Java (and Scala), it has very good and well documented python bindings called pyspark. After having installed Apache Spark (see here), you need to create a Spark cluster for distributing the work. As usual, there exist many ways for this (see here for a start), but as an example we will just use the standalone cluster, which is started when you do not specify differently (also called “local” mode). We will demonstrate the calculation in the following with a pyspark interactive console, but you can of course also write it into a python script and submit it via spark-submit (check out the documentation).

Important: Spark leverages the arrow bindings for efficient transformation between pandas and Spark dataframes. Therefore, you need to have arrow installed. In a recent arrow version, the internal data format has changed and is now incompatible with Spark. For it to still work, you need to add the line

  ARROW_PRE_0_15_IPC_FORMAT=1

to your Spark environment settings file $SPARK_HOME/conf/spark-env.sh as stated in the documentation.

Now let’s create a data pipeline! We will use Spark’s structured declarative dataframe API in the following. Remember that Spark will only build up the computation DAG until you trigger an action in the end.

So first, lets spin up an interactive pyspark shell and read in the parquet files. As we are running in local mode we do not need to make sure that the data is available for all workers (as there is only one local worker). In a productive environment you would probably use S3 or HDFS or any other shared data storage.

$ pyspark
>>> df = spark.read \
...         .parquet("data/*/data.parquet")
>>> df.printSchema()
root
|-- id: integer (nullable = true)
|-- time: integer (nullable = true)
|-- F_x: integer (nullable = true)
|-- F_y: integer (nullable = true)
|-- F_z: integer (nullable = true)
|-- T_x: integer (nullable = true)
|-- T_y: integer (nullable = true)
|-- T_z: integer (nullable = true)

>>> df.show()
+---+----+----+---+-----+----+----+---+
| id|time| F_x|F_y|  F_z| T_x| T_y|T_z|
+---+----+----+---+-----+----+----+---+
| 71|   0| -59| 44| -205|-305|-226| 37|
| 71|   1| -71| 55| -273|-368|-271| 42|
| 71|   2| -75| 58| -305|-403|-294| 48|
| 71|   3| -89| 66| -408|-471|-342| 51|
| 71|   4| -95| 64| -461|-503|-372| 47|
| 71|   5|-100| 67| -524|-545|-400| 51|
| 71|   6|-104| 55| -567|-547|-429| 56|
...

This is the format of the robot data. id is the identifier for each time series, time is the time (sorting) parameter and the F_* and T_* are the different value series (remember: what we call the kind of the time series). Please note that this data format and the pre-processing we will perform in the following is just an example - your data might look differently and other data transformation steps might be necessary. For example if your column names are different (e.g. the id column is not called “id”), you will need to edit the steps accordingly.

The feature extraction will later run on each of these series separately - so we first need to group it both by the timeseries identifier id and the kind. However, the data is not easily groupable by kind (data of the same kind are not separated) - so we first need to reshape the data. This transformation is usually called melting and there exist various ways to do this in PySpark. For example you could follow one of the answers to this StackOverflow discussion. Just make sure to change the column name identifiers if needed.

>>> ... your melt function, e.g. see in the link above
>>> df_melted = melt(df, id_vars=["id", "time"],
...                  value_vars=["F_x", "F_y", "F_z", "T_x", "T_y", "T_z"],
...                  var_name="kind", value_name="value")
>>> df_melted.printSchema()
root
|-- id: integer (nullable = true)
|-- time: integer (nullable = true)
|-- kind: string (nullable = false)
|-- value: integer (nullable = true)
>>> df_melted.show()
+---+----+----+-----+
| id|time|kind|value|
+---+----+----+-----+
| 71|   0| F_x|  -59|
| 71|   0| F_y|   44|
| 71|   0| F_z| -205|
| 71|   0| T_x| -305|
| 71|   0| T_y| -226|
| 71|   0| T_z|   37|
| 71|   1| F_x|  -71|
| 71|   1| F_y|   55|
| 71|   1| F_z| -273|
| 71|   1| T_x| -368|
| 71|   1| T_y| -271|
| 71|   1| T_z|   42|
...

After melting, the resulting dataframe is a concatenated set of each kind of time series, while keeping the id and time columns. When we now group by id and kind

>>> df_grouped = df_melted.groupby(["id", "kind"])

each grouped chunk of data only contains the data of one kind and one id. We are now ready to use tsfresh! The preprocessing part might look different for your data sample, but you should always end up with a dataset grouped by id and kind before using tsfresh.

With the given column names in the example, the call to tsfresh looks like this:

>>> from tsfresh.convenience.bindings import spark_feature_extraction_on_chunk
>>> from tsfresh.feature_extraction import ComprehensiveFCParameters
>>> features = spark_feature_extraction_on_chunk(df_grouped, column_id="id",
...                                              column_kind="kind",
...                                              column_sort="time",
...                                              column_value="value",
...                                              default_fc_parameters=ComprehensiveFCParameters())

Please remember that Spark will only trigger the calculation once you call an action, so it is still only building up the calculation DAG.

Internally, tsfresh will call the following on each grouped chunk:

In the end, it will return a Spark dataframe, where each line is the value for a different feature of a different time series id and kind.

>>> features.printSchema()
root
|-- id: long (nullable = true)
|-- variable: string (nullable = true)
|-- value: double (nullable = true)
>>> features.show()
+---+--------------------+-------------------+
| id|            variable|              value|
+---+--------------------+-------------------+
| 12|     F_x__sum_values|               -6.0|
| 12|         F_x__median|               -1.0|
| 12|           F_x__mean|               -0.4|
| 12|         F_x__length|               15.0|
...

Depending on your use case, you might want to bring them into the usual tabular form (but please note that this might imply heavy shuffling and especially some immediate calculations as the column names need to be known):

>>> from pyspark.sql import functions as F
>>> pivoted_features = features.groupby("id").pivot("variable")
>>> feature_table = pivoted_features.agg(F.first("value"))
>>> feature_table.printSchema()
root
|-- id: long (nullable = true)
|-- F_x__length: double (nullable = true)
|-- F_x__maximum: double (nullable = true)
...

In any case, you can now apply additional transformations or write out the result of the calculation. As long as you have enough workers, this basically scales infinitely.

Dask and tsfresh

In principle interacting with tsfresh from dask follows the same principles as with Spark, so lets quickly walk through them:

  1. Spin up a dask cluster. There are again multiple ways how to do this depending on your environment. A very good starting point is the dask documentation. If you do not have a cluster to play around, local mode will also work (which means you do not need to setup or import anything). For testing, you can use an interactive python shell or a jupyter notebook.
  2. Read in the data sample

     >>> from dask import dataframe as dd
     >>> # Make sure to setup your client here if you have a cluster
     >>> df = dd.read_parquet("data/*/data.parquet")
    
  3. Bring the data into the same format as above, where data with different id and kind can be separated easily. Fortunately, dask has already a melt feature, so you just need to call it:

     >>> df_melted = df.melt(id_vars=["id", "time"],
     ...                     value_vars=["F_x", "F_y", "F_z", "T_x", "T_y", "T_z"],
     ...                     var_name="kind", value_name="value")
     >>> df_melted.columns
     Index(['id', 'time', 'kind', 'value'], dtype='object')
     >>> df_melted.head()
         id  time kind  value
     0   1     0  F_x     -1
     1   1     1  F_x      0
     2   1     2  F_x     -1
     3   1     3  F_x     -1
     4   1     4  F_x     -1
    
  4. Now separate the data for different id and kind by grouping

     >>> df_grouped = df_melted.groupby(["id", "kind"])
    
  5. The data is in the correct format - we can apply tsfresh! We are again using the full set of feature calculators as an example here, but have a look into the documentation for more information.

     >>> from tsfresh.convenience.bindings import dask_feature_extraction_on_chunk
     >>> from tsfresh.feature_extraction.settings import ComprehensiveFCParameters
     >>> features = dask_feature_extraction_on_chunk(df_grouped,
     ...                                             column_id="id",
     ...                                             column_kind="kind",
     ...                                             column_sort="time",
     ...                                             column_value="value",
     ...                                             default_fc_parameters=ComprehensiveFCParameters())
    

    Again, this will internally transform each chunk into a pandas dataframe, apply the feature extraction and transform back into a dask dataframe. The result will be a dataframe with one extracted feature for one id and kind per row.

     >>> features.columns
     Index(['id', 'variable', 'value'], dtype='object')
     >>> features.head()
                 id                 variable      value
     id kind
     11 T_x  0  11          T_x__sum_values -49.000000
             1  11              T_x__median  -5.000000
             2  11                T_x__mean  -3.266667
             3  11              T_x__length  15.000000
             4  11  T_x__standard_deviation   3.549022
    
  6. Continue with your calculation, e.g. transform the results into the usual tabular form (again: this might be computationally very intensive!):

     >>> features = features.categorize(columns=["variable"])
     >>> features = features.reset_index(drop=True)
     >>> feature_table = features.pivot_table(index="id", columns="variable",
     ...                                      values="value", aggfunc="sum")
    

    In case you are wondering: the aggregation function “sum” does not really matter here: there is only ever a single value per feature name and identifier.

You might ask yourself what the difference is between using a ClusterDaskDistributor (which distributes to a dask cluster) as described in the last post and this dask binding.

The data preprocessing depends a lot on the shape of your data and the way shown here is most likely not the most efficient way for you (because it might lead to a lot of shuffling or re-partitioning). Please think about how you can transform your data efficiently into the input format for tsfresh.

Side Note: luigi - the simple way

You have a lot of data which does not fit into memory, but you do not want to add the burden of using a distributed framework (such as dask or PySpark) to your project? Then read on!

In the following, we will follow the ideas of a distributed framework such as dask or Apache Spark without actually using one (which of course also has some downsides, see below). The idea is:

For this part, we will use the powers of luigi, a task orchestrator framework. If you have never used luigi have a look into the documentation for a first overview before you continue. We will also try to walk through the basic ideas in the following.

We will also assume that your data is stored to disk (maybe on a distributed storage like HDFS) and already partitioned by id and kind. For example, it might be in the form

<path>/<id>/<kind>/input.parquet

If you want to generate some test data with the robot dataset, you can use the following python snippet:

from tsfresh.examples import robot_execution_failures
import os

robot_execution_failures.download_robot_execution_failures()
timeseries, _ = robot_execution_failures.load_robot_execution_failures()

def store_data(data_chunk):
    data_id = data_chunk["id"].iloc[0]
    data_kind = data_chunk["kind"].iloc[0]

    os.makedirs(f"data/{data_id}/{data_kind}", exist_ok=True)
    data_chunk.to_parquet(f"data/{data_id}/{data_kind}/input.parquet", index=False)

timeseries.melt(id_vars=["id", "time"],
                value_vars=["F_x", "F_y", "F_z", "T_x", "T_y", "T_z"],
                var_name="kind", value_name="value").groupby(["id", "kind"]).apply(store_data)

Please note that the data is basically already melted due to the way it is stored. It is also possible to store the data only separated by id and let tsfresh extract the features for all kinds simultaneously. Feel free to adjust the luigi script below to your needs.

The core building block of luigi are tasks. Tasks can have dependencies among each other, they define which output files they create and of course what to do when the task runs. They are controlled by parameters, which can be used to distinguish different instances of the same task. Let’s define a Task for the cycle of reading, extracting and writing the data as described above.

import luigi
import pandas as pd
from tsfresh import extract_features

# Where the input data is stored.
DATA_INPUT_NAME = "data/{data_id}/{data_kind}/input.parquet"
# Where the output data will be stored
DATA_OUTPUT_NAME = "data/{data_id}/{data_kind}/output.parquet"

class FeatureExtractorTask(luigi.Task):
    """
    Task to extract the features for one time series
    of given `id` and `kind`.
    Reads in the data, extracts the features and stores the data again.
    """
    data_id = luigi.Parameter()
    data_kind = luigi.Parameter()

    def output(self):
        """Define what this task will output"""
        return luigi.LocalTarget(DATA_OUTPUT_NAME.format(data_id=self.data_id,
                                                         data_kind=self.data_kind))


    def run(self):
        """Define, what the task will actually do"""
        # 1. Read in the time series data from disk
        input_file = DATA_INPUT_NAME.format(data_id=self.data_id, data_kind=self.data_kind)
        df = pd.read_parquet(input_file)

        # 2. Extract the features.
        # Turn of multiprocessing - the parallelism comes with multiple luigi workers.
        features = extract_features(df, column_id="id",
                                    column_kind="kind",
                                    column_sort="time",
                                    column_value="value",
                                    n_jobs=0)

        # 3. Store the data
        features.to_parquet(self.output().path)

As you can see, the code is quite straightforward. We are again assuming only local setup for this example - in a real world application the input and output paths will be on a shared file system (NFS, HDFS, S3) so that both the scheduler and the worker can read/write the files.

Once we have defined a task for this cycle, we just need to run it. There exist multiple ways to do this - you could use a local scheduler, a central scheduler, or even a batch system (maybe with the help of my package b2luigi ;-)). As an example, we will just define a list of tasks to run and give it to the luigi.build function. Also, we are only using a single worker and a local scheduler.

# ... continue from above

if __name__ == "__main__":
    task_list = []
    for data_kind in ["F_x", "F_y"]:
        for data_id in range(1, 5):
            task_list.append(FeatureExtractorTask(data_kind=data_kind, data_id=str(data_id)))
    luigi.build(task_list, local_scheduler=True)

Running this script will give you a happy smiley output and your output data stored in the specified paths. For distributing the work among several machines, start a central scheduler

$ luigid

And remove the local_scheduler flag. Each call to the luigi script will now start another worker, which will connect to the central scheduler and start processing work (if your code is running on multiple machines, you need to give the scheduler hostname as well). The documentation describes more options. If you want to test it out, make sure to increase the number of ids it needs to process - otherwise it will run out of work quickly.

Using luigi comes with the benefit of a very simple code base and a simplified execution model. But of course can not replace a distributed framework such as dask or Apache Spark with all its features:

But following the KISS principle, I would always prefer a small luigi application over running a complex distributed application (and have done so successfully several times in the past).

Summary

After all these possibilities, the question arises: When should you use what?

Let’s try to give some usage hints:

There is a lot more to discover when it comes to distributed computing or distributed data. If you find out about interesting bindings between your framework and tsfresh or of you developed a cool distributor you want to share, we are already happy for pull requests. Happy coding!

Big thanks to @dotcsDE for great suggestions!

Nils Braun

Nils Braun

high energy data engineer

rss facebook twitter github youtube mail spotify lastfm instagram linkedin google google-plus pinterest medium vimeo stackoverflow reddit quora quora