transform() Function#

Have questions? Chat with us on Github or Slack:

Homepage Slack Status

Pandas is great for small datasets but unfortunately does not scale large datasets well. The primary reason is that Pandas is single core and does not take advantage of all available computing resources. A lot of operations also generate intermediate copies of data, utilizing more memory than necessary. To effectively handle data with Pandas, users preferably need to have 5 to 10 times as much RAM as the dataset.

Spark and Dask allow us to split computing jobs across multiple machines. They also can handle datasets that don’t fit into memory by spilling data over to the disk in some cases. But ultimately, moving to Spark or Dask still requires significant code changes to port existing Pandas code. Added to changing code, there is also a lot of knowledge required to use these frameworks effectively. Ray is a newer engine seeing increased adoption. How can we prevent being locked in to frameworks so we have the flexibility to switch in the future?

Fugue is a framework that is designed to unify the interface between Pandas, Spark, Dask and Ray, allowing one codebase to be used across all compute engines.

Fugue transform()#

The simplest way Fugue can be used to scale Pandas based code to Spark, Dask, or Ray is with the transform() function. In the example below, we’ll train a model using scikit-learn and Pandas and then perform the model predictions parallelized on top of the Spark execution engine.

import pandas as pd
import numpy as np
from sklearn.linear_model import LinearRegression

X = pd.DataFrame({"x_1": [1, 1, 2, 2], "x_2":[1, 2, 2, 3]})
y = np.dot(X, np.array([1, 2])) + 3
reg = LinearRegression().fit(X, y)

After training our model, we then wrap it in a predict() function. This function is still written in Pandas. We can easily test it on the input_df that we create. Wrapping it will allow us to bring it to Spark. Type hints are a Fugue requirement, but we’ll discuss them more in future sections.

def predict(df: pd.DataFrame, model: LinearRegression) -> pd.DataFrame:
    return df.assign(predicted=model.predict(df))

input_df = pd.DataFrame({"x_1": [3, 4, 6, 6], "x_2":[3, 3, 6, 6]})

# test the function
predict(input_df.copy(), reg)
x_1 x_2 predicted
0 3 3 12.0
1 4 3 13.0
2 6 6 21.0
3 6 6 21.0

Now we bring it to Spark using Fugue’s transform() function. This takes in a DataFrame and applies a function to it using either of the Pandas, Spark, Dask, or Ray engines. The transform() inputs will be explained later, but for now, notice that we did not make modifications to the Pandas-based predict() function in order to use it on Spark. This function can now scale to big datasets through the Spark or Dask execution engines.

All we have to do to bring it to Spark is pass a SparkSession as the engine.

from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
from fugue import transform

result = transform(
    input_df,
    predict,
    schema="*,predicted:double",
    params=dict(model=reg),
    engine=spark
)
print(type(result))
result.show()
<class 'pyspark.sql.dataframe.DataFrame'>
[Stage 8:===================>                                       (1 + 2) / 3]
+---+---+---------+
|x_1|x_2|predicted|
+---+---+---------+
|  3|  3|     12.0|
|  4|  3|     13.0|
|  6|  6|     21.0|
|  6|  6|     21.0|
+---+---+---------+
                                                                                

The transform() function takes in the following arguments:

  • df - input DataFrame (can be a Pandas, Spark, or Dask DataFrame)

  • using - a Python function with valid type annotations

  • schema - required output schema of the operation

  • params - a dictionary of parameters to pass in the function

  • engine - the execution engine to run the operation on (Spark, Dask, Ray)

Because we supplied spark as the engine, the predict() function will be applied on input_df on top of the Spark ExecutionEngine. Fugue will handle the conversion from a Pandas DataFrame to a Spark DataFrame. Similarly, a Spark DataFrame can be passed to the transform() call. Supplying no engine uses the Pandas-based engine. Fugue also has a Dask and Ray engines available.

Explicit schema is a hard requirement in distributed computing frameworks, so we need to supply the output schema of the operation. When compared to the Spark equivalent (see below), this is a much simpler interface to handle the schema.

Testability and Maintainability#

Is the native Python or Pandas implementation of map_phone_to_location() better or is the native Spark implementation better?

The main concern of Fugue is clear, readable code. Users can write code in whatever expresses their logic the best. The computing efficiency lost by using Fugue is unlikely to be significant, especially in comparison to the developer efficiency gained through more rapid iterations and easier maintenance. In fact, Fugue is designed in a way that often sees more speed-ups than inexperienced users working with native Spark code because it handles a lot of the tricks necessary to use Spark effectively.

Fugue code becomes easily testable because the function contains logic that is portable across all Pandas, Spark, and Dask. We can test code without the need to spin up computing resources (such as Spark or Dask clusters). This hardware often takes time to spin up just for a simple test, making it painful to run unit tests on Spark. Now, we can test quickly with native Python or Pandas and then execute on Spark when needed. Developers that use Fugue benefit from more rapid iterations in their data projects.

If we use a pure Python function, such as the one below, all we have to do to test it is run some values through the defined function.

from typing import List, Dict, Any

def map_phone_to_location2(df: List[Dict[str,Any]]) -> List[Dict[str,Any]]:
    for row in df:
        row["location"] = _area_code_map[row["phone"][1:4]]
    return df

# Remember the input was List[Dict[str,Any]]
map_phone_to_location2([{'phone': '(407)-234-5678'}, 
                       {'phone': '(407)-234-5679'}])
[{'phone': '(407)-234-5678', 'location': 'Orlando, FL'},
 {'phone': '(407)-234-5679', 'location': 'Orlando, FL'}]

Even if the output here is a List[Dict[str,Any]], Fugue takes care of converting it back to a DataFrame.

Conclusion#

Fugue’s transform() function can scale Pandas-written code to Spark or Dask, without altering the existing functions. In the next section, we’ll take a deeper look at type hinting and the role they play in Fugue. While we used Pandas here, we’ll also show that native Python functions can also be used across the different execution engines.

Fugue as a Mindset#

Fugue is a framework, but more importantly, it is a mindset.

  1. Fugue believes that the framework should adapt to the user, not the other way around.

  2. Fugue lets users code express logic in a scale-agnostic way, with the tools they prefer.

  3. Fugue values readability and maintainability of code over deep framework-specific optimizations

Using distributed computing is currently harder than it needs to be. However, these systems often follow similar patterns, which have been abstracted to create a framework that lets users focus on defining their logic. We cover these concepts in the rest of tutorials. If you’re new to distributed computing, Fugue is the perfect place to get started.

[Optional] Spark Equivalent of transform()#

Below is an example of how the predict() function would be brought to Spark without the transform() function. This implementation uses the Spark’s mapInPandas() method available in Spark 3.0. Note how the schema has to be handled inside the run_predict function. This is the schema requirement we mentioned earlier that Fugue provides a simpler interface for.

from typing import Iterator, Any, Union
from pyspark.sql.types import StructType, StructField, DoubleType
from pyspark.sql import DataFrame, SparkSession

def predict_wrapper(dfs: Iterator[pd.DataFrame], model):
    for df in dfs:
        yield predict(df, model)

def run_predict(input_df: Union[DataFrame, pd.DataFrame], model):
    # conversion
    if isinstance(input_df, pd.DataFrame):
        sdf = spark.createDataFrame(input_df.copy())
    else:
        sdf = input_df.copy()

    schema = StructType(list(sdf.schema.fields))
    schema.add(StructField("predicted", DoubleType()))
    return sdf.mapInPandas(lambda dfs: predict_wrapper(dfs, model), 
                           schema=schema)

result = run_predict(input_df.copy(), reg)
result.show()
[Stage 4:==============>                                            (1 + 3) / 4]
+---+---+---------+
|x_1|x_2|predicted|
+---+---+---------+
|  3|  3|     12.0|
|  4|  3|     13.0|
|  6|  6|     21.0|
|  6|  6|     21.0|
+---+---+---------+
                                                                                

It’s easy to see why it becomes very difficult to bring a Pandas codebase to Spark with this approach. We had to define two additional functions in the predict_wrapper() and the run_predict() to bring it to Spark. If this had to be done for tens of functions, it could easily fill the codebase with boilerplate code, making it hard to focus on the logic. These also add additional unit tests to the code base.

[Optional] Comparison to Modin and Koalas#

Fugue gets compared a lot to Modin and Koalas. Modin is a Pandas interface for execution on Dask, and Koalas is a Pandas interface for execution on Spark. Fugue, Modin, and Koalas have similar goals in making an easier distributed computing experience. The main difference is that Modin and Koalas use Pandas as the grammar for distributed computing. Fugue, on the other hand, uses native Python and SQL as the grammar for distributed computing (though Pandas is also supported). For more information, check this page.

The clearest example of Pandas not being compatible with Spark is the acceptance of mixed-typed columns. A single column can have numeric and string values. Spark, on the other hand, is strongly typed and enforces the schema. More than that, Pandas is strongly reliant on the index for operations. As users transition to Spark, the index mindset does not hold as well. Order is not always guaranteed in a distributed system; there is an overhead to maintain a global index, and, moreover, it is often not necessary.