Testing PySpark Applications#

Testing Spark applications is a very common painpoint for big data developers. Because of the difficulty, developers often avoid writing robust tests. Using Fugue helps testing by doing the following:

  1. Lessening the amount of boilerplate code needed for testing

  2. Eliminating the need for a Spark cluster to unit test

  3. Accelerating development by enabling rapid testing of code snippets

In this walkthrough, we’ll go through each of these items in more detail.

Reducing Boilerplate Code#

Recall in the beginner tutorial that in order to bring a simple prediction function to PySpark for execution using mapInPandas(), we need to construct two helper functions. Using the same example as the introduction, we train a LinearRegression model and then create a predict() function that will apply this model to a DataFrame. This following code is just pandas for now.

import pandas as pd
import numpy as np
from sklearn.linear_model import LinearRegression

X = pd.DataFrame({"x_1": [1, 1, 2, 2], "x_2":[1, 2, 2, 3]})
y = np.dot(X, np.array([1, 2])) + 3
reg = LinearRegression().fit(X, y)

def predict(df: pd.DataFrame, model: LinearRegression) -> pd.DataFrame:
    return df.assign(predicted=model.predict(df))

We can then quickly test this with another DataFrame

input_df = pd.DataFrame({"x_1": [3, 4, 6, 6], "x_2":[3, 3, 6, 6]})
predict(input_df.copy(), reg)
x_1 x_2 predicted
0 3 3 12.0
1 4 3 13.0
2 6 6 21.0
3 6 6 21.0

Now in order to bring this to Spark, need to wrap it with some helper functions as seen below. This is the same as the introduction section so there is no need to spend too much time looking at the code. The goal here is just to see the amount of boilerplate code we need to introduce to apply it to a Spark function. We are adding two functions.

from pyspark.sql import SparkSession

spark_session = SparkSession.builder.getOrCreate()
from typing import Iterator, Any, Union
from pyspark.sql.types import StructType, StructField, DoubleType
from pyspark.sql import DataFrame 

def predict_wrapper(dfs: Iterator[pd.DataFrame], model):
    for df in dfs:
        yield predict(df, model)

def run_predict(input_df: Union[DataFrame, pd.DataFrame], model):
    # conversion
    if isinstance(input_df, pd.DataFrame):
        sdf = spark_session.createDataFrame(input_df.copy())
    else:
        sdf = input_df.copy()

    schema = StructType(list(sdf.schema.fields))
    schema.add(StructField("predicted", DoubleType()))
    return sdf.mapInPandas(lambda dfs: predict_wrapper(dfs, model), 
                           schema=schema)

result = run_predict(input_df.copy(), reg)
result.show()
+---+---+---------+
|x_1|x_2|predicted|
+---+---+---------+
|  3|  3|     12.0|
|  4|  3|     13.0|
|  6|  6|     21.0|
|  6|  6|     21.0|
+---+---+---------+

Using Fugue, the equivalent code would be to invoke the transform() function.

from fugue import transform
from fugue_spark import SparkExecutionEngine

result = transform(
    input_df,
    predict,
    schema="*,predicted:double",
    params=dict(model=reg),
    engine=SparkExecutionEngine(spark_session)
)
result.show()
+---+---+---------+
|x_1|x_2|predicted|
+---+---+---------+
|  3|  3|     12.0|
|  4|  3|     13.0|
|  6|  6|     21.0|
|  6|  6|     21.0|
+---+---+---------+

In order to write unit tests for our logic, we just need to test the predict() function because the transform() function is extensively tested on the Fugue side. Using the native Spark approach, we need to write two unit tests, one for each helper function. If you have 10 pandas-based functions that you are bring to Spark, this could add a lot of tests just for the boilerplate code.

Decoupling of Spark Hardware#

A lot of Spark users use databricks-connect to run their PySpark jobs on Databricks. This allows developers to write code on their local machine, and then run it against the cluster. While databricks-connect is convenient for leveraging cluster compute resources, it also makes it difficult to conduct tests quickly. Spinning up a cluster takes around 5 minutes, slowing down iterations. Development also becomes very expensive as the databricks-connect configuration is attached to a cluster, meaning that it’s inconvenient to switch to a smaller cluster (or local Spark) for smaller tests.

Why do we need to test on Spark though? Recall our predict() function that we brought to Spark. We can just unit test this locally now using pandas because it’s written without any dependency on Spark. In fact, we already did it earlier.

input_df = pd.DataFrame({"x_1": [3, 4, 6, 6], "x_2":[3, 3, 6, 6]})
predict(input_df.copy(), reg)
x_1 x_2 predicted
0 3 3 12.0
1 4 3 13.0
2 6 6 21.0
3 6 6 21.0

For Spark users that manually submit their job to Spark using spark-submit, this lessens the number of times that the code has to be sent to the cluster to test it. Even testing Spark code with local Spark takes a significant amount of time to spin-up compared to testing on pandas.

Rapid Prototyping#

This brings us to the last point. It is very common to test simple code snippets when working on projects. With Spark code and big data, testing code snippets is a significant bottleneck to development because of the frequent cluster spin-up necessary (even on local). By using Fugue and decoupling logic and execution, we can test on local machine quickly then bring it to the cluster for integration tests when ready. This increases the velocity of big data projects.