# Distributed Machine Learning Model Sweeping

There are two use cases for training machine learning models in a distributed setting. The [dask-ml](https://ml.dask.org/) documentation does a good job of describing this. The first use case is more memory-bound problems. This is when the dataset has to be divided across multiple machines because it doesn't fit in a single one. A single model is then training on the distributed dataset.

The second use case is for compute-bound problems. A data scientist has datasets that fit in memory, but they want to train multiple models and evaluate the performance in parallel. Extending this idea, big data often has logical groupings that can be used to partition the data. We can then train multiple models on each dataset.

This tutorial will focus on the compute-bound problem. For example, if we have a large dataset that spans 5 regions, we can train region-specific models. For each region, we can train 5 different models, which models in 25 model training runs. These 25 model trainings can be distributed on a cluster and run in parallel as they are independent. 

## Training Multiple Models for One Dataset

**Creating a Dataset**

We start by creating a `make_data()` function using the sklearn `make_regression()` function. This function takes in a number of `groups` and for each group, generates a new dataset. This means that each group will have different properties when we generate multiple groups and benefit from separate models. For now though, we focus on the training logic for a single group.

In [1]:
import pandas as pd
from sklearn.datasets import make_regression

def make_data(groups=1):
    result = []
    for i in range(groups):
        X, y = make_regression(n_samples=100, n_features=8, n_informative=5, noise=10)
        df = pd.DataFrame({"group": [f"group{i}"]*100})
        df = pd.concat([df, pd.DataFrame(X, columns=[f"x{n}" for n in range(8)])], axis=1)
        df = df.assign(y=y)
        result.append(df)
    return pd.concat(result, axis=0)

Using this with one group gives us a dataset with 8 features and a target variable called `y`. The head of the DataFrame is shown below.

In [2]:
df = make_data(1)
df.head()

Unnamed: 0,group,x0,x1,x2,x3,x4,x5,x6,x7,y
0,group0,0.971978,-0.473416,-2.221027,2.066071,-0.371581,-1.806569,0.154867,-2.329968,-241.530733
1,group0,-1.178425,3.071536,-0.682251,2.039425,0.695598,0.340194,0.147516,-0.891319,-14.439953
2,group0,0.128803,-0.204161,0.581653,-0.76944,0.725207,0.192606,-1.250861,0.732403,78.005286
3,group0,-1.38155,1.391747,0.748421,0.187281,0.66058,1.665103,-1.303355,-1.869867,31.560241
4,group0,1.575359,0.926507,-1.364626,-0.267989,-1.179624,-1.589561,1.051079,-0.50754,-17.56424


**Training Setup**

We import some relevant libraries for model training.

In [3]:
from sklearn.linear_model import LinearRegression
from sklearn.svm import SVR
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_absolute_error

feature_cols = [f"x{n}" for n in range(8)]
target = "y"

**Creating the training manifest**

For this single dataset, we create a new dataframe with the `create_training_manifest()` function below that will be responsible for orchestrating our training runs. We call this a training manifest as each row holds the metadata for one training run. The manifest contains metadata, the `X` and `y` of our dataset along with the `model` to use for training. For now, this may look like more work for a single dataset, but later on we'll see how this can scale to Spark or Dask easily with this setup.


In [4]:
from typing import List, Dict, Any
import pickle

def create_training_manifest(df: pd.DataFrame, models: List[Any]) -> List[Dict[str,Any]]:
    group = df.iloc[0]["group"]
    X_cols = pickle.dumps(df[feature_cols])
    y_col = pickle.dumps(df[target])
    result = []
    for model in models:
        model_binary = pickle.dumps(model)
        model_name = type(model).__name__
        index = f"{group}-{model_name}"
        result.append({"index":index, "group": group, "X": X_cols, "y": y_col, "model_name": model_name,"model": model_binary})
    return result

**Why use pickle to serialize?**

There are multiple reasons to use `pickle` to serialize our data and model. First is that distributed operations are passed through a network. Using the serialized version reduces the memory footprint of the data we pass through the network. The second reason is that it lets us have a `pd.DataFrame` inside a cell of our training manifest. Last, the serialized `binary` type is also an acceptable data format for Pandas, Spark, and Dask.

We can test this function by simply invoking it.

In [5]:
create_training_manifest(df, models=[LinearRegression(), SVR()])

[{'index': 'group0-LinearRegression',
  'group': 'group0',
  'X': b'\x80\x03cpandas.core.frame\nDataFrame\nq\x00)\x81q\x01}q\x02(X\x04\x00\x00\x00_mgrq\x03cpandas.core.internals.managers\nBlockManager\nq\x04cfunctools\npartial\nq\x05cpandas.core.internals.blocks\nnew_block\nq\x06\x85q\x07Rq\x08(h\x06)}q\tX\x04\x00\x00\x00ndimq\nK\x02sNtq\x0bbcnumpy.core.multiarray\n_reconstruct\nq\x0ccnumpy\nndarray\nq\rK\x00\x85q\x0eC\x01bq\x0f\x87q\x10Rq\x11(K\x01K\x08Kd\x86q\x12cnumpy\ndtype\nq\x13X\x02\x00\x00\x00f8q\x14\x89\x88\x87q\x15Rq\x16(K\x03X\x01\x00\x00\x00<q\x17NNNJ\xff\xff\xff\xffJ\xff\xff\xff\xffK\x00tq\x18b\x89B\x00\x19\x00\x00\xfe*Sbr\x1a\xef?e\x18\xdf\x17\xd4\xda\xf2\xbf0tpr\xa1|\xc0?y\xa5\xb0\x89\xd4\x1a\xf6\xbf\x04\xda\n\xa9\xab4\xf9?H\x04<\xfc\xec[\xe7?\xaf4\xd7\xec\x81\x10\xed?\x8e\x13\xa6\x82\xb0\x81\xec\xbf\xde\xdf&m0\xc9\xf6?\x98+9\x04\xdf\xe0\xb8\xbf\xd0\x87\x0e\xa1\x86\x9b\xe6\xbf\xa8-\x9d\xb7\x82\xbf\xc3?\x83\\@\xdd8\xb9\xdb?\x96\xeaD\xaa*3\xe1?\xc2$~?\xaf7\xda?#\x82\xf4L\x

**Using Fugue transform()**

Now that we know the functionw orks, we can then use the `Fugue transform()` function to generate the `training_manifest`. This interprets the `List[Dict[str,Any]]` output type of the `create_training_manifest()` function and brings it out to a pandas DataFrame (and Spark or Dask later).  

In [6]:
from fugue import transform

training_manifest = transform(df, 
                              create_training_manifest, 
                              schema="index:str, group:str,X:binary,y:binary,model_name:str,model:binary", 
                              params=dict(models=[LinearRegression(), SVR()]))
training_manifest.head()

Unnamed: 0,index,group,X,y,model_name,model
0,group0-LinearRegression,group0,b'\x80\x03cpandas.core.frame\nDataFrame\nq\x00...,b'\x80\x03cpandas.core.series\nSeries\nq\x00)\...,LinearRegression,b'\x80\x03csklearn.linear_model._base\nLinearR...
1,group0-SVR,group0,b'\x80\x03cpandas.core.frame\nDataFrame\nq\x00...,b'\x80\x03cpandas.core.series\nSeries\nq\x00)\...,SVR,b'\x80\x03csklearn.svm._classes\nSVR\nq\x00)\x...


**One training run**

Each row in our `training_manifest` represents a model training run. All we need to do is pull the metadata, unpickle it, and then run the `.fit()` and `.predict()` method of the model. We then store the mean_absolute_error of the training run so that we can evaluate these later. This is done in the `training_run()` function below.

Because Fugue lets us be flexible about type annotations, we can use the `List[Dict[str,Any]]` structure for both input and output to make our logic easier to write.

In [7]:
def training_run(df: List[Dict[str,Any]]) -> List[Dict[str,Any]]:
    training_sample = df[0]
    index = training_sample["index"]
    group = training_sample["group"]
    X = pickle.loads(training_sample["X"])
    y = pickle.loads(training_sample["y"])
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2)
    model = pickle.loads(training_sample["model"])
    model.fit(X_train, y_train)
    y_pred = model.predict(X_test)
    mae = mean_absolute_error(y_test, y_pred)
    return [dict(index=index,
                 group=group,
                 model_name=training_sample["model_name"],
                 error=float(mae),
                 model=pickle.dumps(model))]


We can then test it by passing `List[Dict[str,Any]]` from the first row of our `training_manifest`.

In [8]:
training_run([training_manifest.iloc[0].to_dict()])

[{'index': 'group0-LinearRegression',
  'group': 'group0',
  'model_name': 'LinearRegression',
  'error': 8.757634740955517,
  'model': b'\x80\x03csklearn.linear_model._base\nLinearRegression\nq\x00)\x81q\x01}q\x02(X\r\x00\x00\x00fit_interceptq\x03\x88X\t\x00\x00\x00normalizeq\x04\x89X\x06\x00\x00\x00copy_Xq\x05\x88X\x06\x00\x00\x00n_jobsq\x06NX\x08\x00\x00\x00positiveq\x07\x89X\x0e\x00\x00\x00n_features_in_q\x08K\x08X\x05\x00\x00\x00coef_q\tcnumpy.core.multiarray\n_reconstruct\nq\ncnumpy\nndarray\nq\x0bK\x00\x85q\x0cC\x01bq\r\x87q\x0eRq\x0f(K\x01K\x08\x85q\x10cnumpy\ndtype\nq\x11X\x02\x00\x00\x00f8q\x12\x89\x88\x87q\x13Rq\x14(K\x03X\x01\x00\x00\x00<q\x15NNNJ\xff\xff\xff\xffJ\xff\xff\xff\xffK\x00tq\x16b\x89C@\\=<\x9d\x19kQ@\x05\xa4!\x86\xfa\x82E@qW/nc\x9cX@p\xc7\xc1I\x83O\xfe\xbfhC\x16\x0c)R\xf0\xbfu\xf4\x99\x90o\n\'@H\x91\x98\xd2\xaa\x8f\t@\xf3\x9a\xce)\xd1\x020@q\x17tq\x18bX\t\x00\x00\x00_residuesq\x19cnumpy.core.multiarray\nscalar\nq\x1ah\x14C\x08\x1a4\x0e\x9bL\x0c\xc1@q\x1b\x86q\x1

We can use the `Fugue transform()` again to run the training runs. Just note that we need to pass in `partition` on the `index`, which is unique per row. This means that each row will be passed in to the `training_run()` function. Fugue then takes care of aggregating the results into a DataFrame.

In [9]:
training_results = transform(training_manifest, 
                             training_run, 
                             schema="index:str,group:str,model_name:str,error:double,model:binary",
                             partition={"by": "index"})
                             
training_results.head()

Unnamed: 0,index,group,model_name,error,model
0,group0-LinearRegression,group0,LinearRegression,11.190594,b'\x80\x03csklearn.linear_model._base\nLinearR...
1,group0-SVR,group0,SVR,76.397862,b'\x80\x03csklearn.svm._classes\nSVR\nq\x00)\x...


## Multiple Models for Each Group

Everything above is now scale agnostic and we can bring the logic to Spark. We previously ran these on Fugue's default `NativeExecutionEngine`, which is sequential. In practice, doing a grid search can take multiple hours. Parallelizing the training runs over Spark or Dask will allow us to iterate faster and fully utilize compute.

We make a new larger DataFrame that contains 10 distinct groups. We want to train the models for each of these groups separately.

In [10]:
df = make_data(groups=10)

All of our logic remains the same. We simple use the `SparkExecutionEngine` to scale and speed up our model training. The same thing can be done with the DaskExecutionEngine.

In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

In [12]:
training_manifest = transform(df, 
                             create_training_manifest, 
                             schema="index:str,group:str,X:binary,y:binary,model_name:str,model:binary", 
                             params=dict(models=[LinearRegression(), SVR()]),
                             partition={"by": "group"},
                             engine=spark)

results = transform(training_manifest, 
                    training_run, 
                    schema="index:str,group:str,model_name:str,error:float,model:binary",
                    partition={"by": "index"},
                    engine=spark)

results.show()

+--------------------+------+----------------+----------+--------------------+
|               index| group|      model_name|     error|               model|
+--------------------+------+----------------+----------+--------------------+
|group3-LinearRegr...|group3|LinearRegression|  9.379544|[80 03 63 73 6B 6...|
|          group2-SVR|group2|             SVR|  75.86907|[80 03 63 73 6B 6...|
|group9-LinearRegr...|group9|LinearRegression|  9.286413|[80 03 63 73 6B 6...|
|group4-LinearRegr...|group4|LinearRegression| 6.1316595|[80 03 63 73 6B 6...|
|group6-LinearRegr...|group6|LinearRegression|  9.684757|[80 03 63 73 6B 6...|
|group1-LinearRegr...|group1|LinearRegression| 5.3616314|[80 03 63 73 6B 6...|
|          group9-SVR|group9|             SVR|114.082726|[80 03 63 73 6B 6...|
|          group1-SVR|group1|             SVR| 141.20859|[80 03 63 73 6B 6...|
|          group6-SVR|group6|             SVR|  91.27704|[80 03 63 73 6B 6...|
|          group5-SVR|group5|             SVR|  85.6

## Extending this Setup and Fugue Tune

There are two hyperparameter tuning approaches that can be used with this setup. First is Grid Search, where we try  different combinations of hyperparameters. This can easily be inserted into the `create_training_manifest()` function to create more runs. The second is Bayesian optimization (with packages like [Optuna](https://optuna.readthedocs.io/en/stable/)) where a starting point is initialized and then the hyperparameters are optimized through successive runs. This type of logic can easily go into the `training_run()` function we defined above.

Fugue actually provides a much simpler interface for this type of hyperparameter tuning with the Fugue [tune](https://github.com/fugue-project/tune) library that is built on top of the Fugue core. This library can be used for more serious hyperparameter tuning.