Introduction#

Utilizing Distributed Compute#

pandas is great for small datasets but unfortunately does not scale large datasets well. The primary reason is that pandas is single core and does not take advantage of all available computing resources. A lot of operations also generate intermediate copies of data, utilizing more memory than necessary. To effectively handle data with pandas, users preferably need to have 5 to 10 times as much RAM as the dataset.

Spark and Dask allow us to split computing jobs across multiple machines. They also can handle datasets that don’t fit into memory by spilling data over to the disk in some cases. But ultimately, moving to Spark or Dask still requires significant code changes to port existing pandas code. Added to changing code, there is also a lot of knowledge required to use these frameworks effectively.

Fugue is a framework that is designed to unify the interface between pandas, Spark, and Dask, allowing one codebase to be used across all three engines.

Fugue transform()#

The simplest way Fugue can be used to scale pandas based code to Spark or Dask is with the transform() function. In the example below, we’ll train a model using scikit-learn and pandas and then perform the inference parallelized on top of the Spark execution engine.

import pandas as pd
import numpy as np
from sklearn.linear_model import LinearRegression

X = pd.DataFrame({"x_1": [1, 1, 2, 2], "x_2":[1, 2, 2, 3]})
y = np.dot(X, np.array([1, 2])) + 3
reg = LinearRegression().fit(X, y)

After training our model, we then wrap it in a predict() function. This function is still written in pandas. We can easily test it on the input_df that we create. Wrapping it will allow us to bring it to Spark.

def predict(df: pd.DataFrame, model: LinearRegression) -> pd.DataFrame:
    return df.assign(predicted=model.predict(df))

input_df = pd.DataFrame({"x_1": [3, 4, 6, 6], "x_2":[3, 3, 6, 6]})

# test the function
predict(input_df.copy(), reg)
x_1 x_2 predicted
0 3 3 12.0
1 4 3 13.0
2 6 6 21.0
3 6 6 21.0

Now we bring it to Spark using Fugue’s transform() function. This takes in a DataFrame and applies a function to it using either of the pandas, Spark, or Dask engines. The transform() inputs will be explained later, but for now, notice that we did not make modifications to the pandas-based predict() function in order to use it on Spark. This function can now scale to big datasets through the Spark or Dask execution engines.

Even if there is no Spark cluster available, the SparkExecutionEngine will start a local Spark instance and parallelize the jobs with all cores of the machine. Note the importing fugue_spark registers the "spark" string.

from fugue import transform
import fugue_spark

result = transform(
    input_df,
    predict,
    schema="*,predicted:double",
    params=dict(model=reg),
    engine="spark"
)
print(type(result))
result.show()
<class 'pyspark.sql.dataframe.DataFrame'>
+---+---+---------+
|x_1|x_2|predicted|
+---+---+---------+
|  3|  3|     12.0|
|  4|  3|     13.0|
|  6|  6|     21.0|
|  6|  6|     21.0|
+---+---+---------+

The transform() function takes in the following arguments:

  • df - input DataFrame (can be a pandas, Spark, or Dask DataFrame)

  • using - a Python function with valid type annotations

  • schema - output schema of the operation

  • params - a dictionary of parameters to pass in the function

  • engine - the execution engine to run the operation on pandas, Spark, or Dask

Because we supplied "spark" as the engine, the predict() function will be applied on input_df on top of the Spark ExecutionEngine. Fugue will handle the conversion from a pandas DataFrame to a Spark DataFrame. Similarly, a Spark DataFrame can be passed to the transform() call. Supplying no engine uses the pandas-based NativeExecutionEngine. Fugue also has a DaskExecutionEngine available.

Explicit schema is a hard requirement in distributed computing frameworks, so we need to supply the output schema of the operation. When compared to the Spark equivalent (see below), this is a much simpler interface to handle the schema.

Conclusion#

Fugue’s transform() function can scale pandas-written code to Spark or Dask, without altering the existing functions. In the next section, we’ll take a deeper look at the transform() function. While we used pandas here, we’ll also show that native Python functions can also be used across the different execution engines.

[Optional] Spark Equivalent of transform()#

Below is an example of how the predict() function would be brought to Spark without the transform() function. This implementation uses the Spark’s mapInPandas() method available in Spark 3.0. Note how the schema has to be handled inside the run_predict function. This is the schema requirement we mentioned earlier that Fugue provides a simpler interface for.

from typing import Iterator, Any, Union
from pyspark.sql.types import StructType, StructField, DoubleType
from pyspark.sql import DataFrame, SparkSession

spark_session = SparkSession.builder.getOrCreate()

def predict_wrapper(dfs: Iterator[pd.DataFrame], model):
    for df in dfs:
        yield predict(df, model)

def run_predict(input_df: Union[DataFrame, pd.DataFrame], model):
    # conversion
    if isinstance(input_df, pd.DataFrame):
        sdf = spark_session.createDataFrame(input_df.copy())
    else:
        sdf = input_df.copy()

    schema = StructType(list(sdf.schema.fields))
    schema.add(StructField("predicted", DoubleType()))
    return sdf.mapInPandas(lambda dfs: predict_wrapper(dfs, model), 
                           schema=schema)

result = run_predict(input_df.copy(), reg)
result.show()
+---+---+---------+
|x_1|x_2|predicted|
+---+---+---------+
|  3|  3|     12.0|
|  4|  3|     13.0|
|  6|  6|     21.0|
|  6|  6|     21.0|
+---+---+---------+

It’s very easy to see why it becomes very difficult to bring a pandas codebase to Spark with this approach. We had to define two additional functions in the predict_wrapper() and the run_predict() to bring it to Spark. If this had to be done for tens of functions, it could easily fill the codebase with boilerplate code, making it hard to focus on the logic.