Fugue API in 10 minutes#

Have questions? Chat with us on Github or Slack:

Homepage Slack Status

This is a short introduction to the Fugue API geared towards new users. The Fugue project aims to make big data effortless by accelerating iteration speed and providing a simpler interface for users to utilize distributed computing engines.

This tutorial covers the Python interface only. For SQL, check the FugueSQL in 10 minutes section.

Fugue’s intended audience consists of but is not limited to:

  1. Data scientists who need to bring business logic written in Python or Pandas to bigger datasets

  2. Data practitioners looking to parallelize existing code with distributed computing

  3. Data teams that want to reduce the maintenance and testing of Spark/Dask/Ray code

Setup#

For this tutorial, we firstly need to run through some quick setup to instantiate a Spark session for later use.

from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

Bringing a function to Spark, Dask, or Ray#

The simplest way to scale Pandas-based code to Spark or Dask is with the transform() function. With the addition of this minimal wrapper, we can bring existing Pandas and Python code to distributed execution with minimal refactoring. The transform() function also provides quality of life enhancements that can eliminate boilerplate code for users.

Let’s quickly demonstrate how this concept can be applied. In the following code snippets below we will train a model using scikit-learn and Pandas. Then we will perform predictions using this model in parallel on top of Spark through Fugue.

import pandas as pd
import numpy as np
from sklearn.linear_model import LinearRegression

X = pd.DataFrame({"x_1": [1, 1, 2, 2], "x_2":[1, 2, 2, 3]})
y = np.dot(X, np.array([1, 2])) + 3
reg = LinearRegression().fit(X, y)

After training our model, we then wrap it in a predict() function. Bear in mind that this function is still written in Pandas making it easy to test on the input_df that we create. Wrapping our model in predict() will allow us to bridge execution to Spark, Dask, or Ray.

# define our predict function
def predict(df: pd.DataFrame, model: LinearRegression) -> pd.DataFrame:
    """
    Function to predict results using a pre-built model
    """
    return df.assign(predicted=model.predict(df))

# create test data
input_df = pd.DataFrame({"x_1": [3, 4, 6, 6], "x_2":[3, 3, 6, 6]})

# test the predict function
predict(input_df, reg)
x_1 x_2 predicted
0 3 3 12.0
1 4 3 13.0
2 6 6 21.0
3 6 6 21.0

Now this is where it starts to get interesting, let’s bring the same code defined above to Spark using Fugue transform(). We take our DataFrame and apply the predict() function to it using either one of the Spark, Dask, or Ray engines. The transform() function parameters will be explained in detail later on, but for now, notice how we made no modifications to the predict() function in order to switch the execution from Pandas to Spark. All we have to do is pass in the SparkSession as the engine.

# import Fugue
from fugue import transform

# use Fugue transform to switch execution to spark
result = transform(
    df=input_df,
    using=predict,
    schema="*,predicted:double",
    params=dict(model=reg),
    engine=spark
)

# result is a Spark DataFrame
print(type(result))
result.show()
<class 'pyspark.sql.dataframe.DataFrame'>
+---+---+---------+
|x_1|x_2|predicted|
+---+---+---------+
|  3|  3|     12.0|
|  4|  3|     13.0|
|  6|  6|     21.0|
|  6|  6|     21.0|
+---+---+---------+

The transform() function provides much more flexibility for users than what we have just described above. This is just a simple use case designed to give you a flavor of what Fugue has to offer. For this example, the transform() function took in the following arguments:

  • df - input DataFrame (can be a Pandas, Spark, Dask, or Ray DataFrame)

  • using - a Python function with valid input and output types

  • schema - output schema of the operation

  • params - a dictionary of parameters to pass in the function

  • engine - the execution engine to run the operation on (Pandas, Spark, Dask, or Ray)

We will delve into these arguments in more detail later on, this will include an explanation of the roles that type hints and schema play. For now, the most important thing to discuss is the engine.

Execution Engines#

In the example above, we supplied the spark variable as the engine so the predict() function will be applied on the input_df using Spark.

To provide flexibility, a Pandas DataFrame input will be converted the engine’s DataFrame class before applying the operation automatically. The input will be converted in the examples below.

transform(df, fn, ..., engine=spark_session)  # output is Spark DataFrame
transform(df, fn, ..., engine=dask_client)    # output is Dask DataFrame
transform(df, fn, ..., engine="ray")          # output is Ray Dataset

We can also use the "dask" or "spark" strings to spin up a Dask Client or SparkSession.

As of Fugue 0.8.0, if no engine is supplied to transform(), it will infer to use the engine associated with the DataFrame by default.

transform(df, fn, ...)          # runs on Pandas
transform(spark_df, fn, ...)    # runs on Spark
transform(dask_df, fn, ...)     # runs on Dask
transform(ray_df, fn, ...)      # runs on Ray

Returning a Local DataFrame#

While Fugue can convert Pandas DataFrames to Spark, Dask or Ray DataFrames, it will not convert a distributed DataFrame to Pandas, unless explicitly specified by using as_local=True.

It sometimes more sense to save the output data as a parquet file after transformations. Returning a local Pandas DataFrame after transform() is only recommended for smaller data because it can overload the driver node of the cluster.

# use as_local=True to return a Pandas DataFrame
local_result = transform(
    df=input_df,
    using=predict,
    schema="*,predicted:double",
    params=dict(model=reg),
    engine=spark,
    as_local=True
)

print(type(local_result))
local_result.head()
<class 'pandas.core.frame.DataFrame'>
x_1 x_2 predicted
0 3 3 12.0
1 4 3 13.0
2 6 6 21.0
3 6 6 21.0

Type Hint Conversion#

In the previous section, we successfully displayed how Fugue can facilitate distributed execution by passing a Pandas-based function with valid input and output types to the Spark, Dask or Ray engines. The predict() function we defined above accepted pd.DataFrame as input and pd.DataFrame as output. These type annotations are essentially used by Fugue as a guide in order to allow Fugue to convert the distributed partitions before the function is applied. For those less experienced with Spark, it can be thought of as Spark partitions being converted to multiple Pandas DataFrames.

In practice, not all data problems fit the Pandas semantics. Fugue can also handle other DataFrame-like input and output types. Take the following function that sums up a whole row and returns one column. We add a special condition that if the value in the summed column is greater than 10, we drop the row.

df = pd.DataFrame({"a": [1,2,3,4], "b": [1,2,3,4], "c": [1,2,3,4]})

def add_row(df: pd.DataFrame) -> pd.DataFrame:
    """
    A function that sums each row in a dataframe and drops the row
    if the sum is greater than 10.
    """
    df = df.assign(total=df.sum(axis=1))
    df = df.loc[df["total"] < 10]
    return df

This can be ran using transform() on all engines. In the example below, we don’t use pass an engine so it uses Pandas.

transform(
    df=df, 
    using=add_row, 
    schema="*,total:int"
    )
a b c total
0 1 1 1 3
1 2 2 2 6
2 3 3 3 9

This same logic can be represented in multiple ways, and Fugue will be able to handle these due to the type annotation.

from typing import List, Iterable, Any, Dict

def add_row2(df: List[Dict[str,Any]]) -> List[Dict[str,Any]]:
    result = []
    for row in df:
        row["total"] = row["a"] + row["b"] + row["c"]
        if row["total"] < 10:
            result.append(row)
    return result

def add_row3(df: List[List[Any]]) -> Iterable[List[Any]]:
    for row in df:
        row.append(sum(row))
        if row[-1] < 10:
            yield row

The input type annotation tells Fugue what to convert the input data to before the function is applied whereas the output type annotation informs Fugue how to convert it back to a Pandas, Spark, Dask, or Ray DataFrame. Notice that these functions are not even dependent on Pandas and can be tested easily. For example:

print(add_row2([{"a": 1, "b": 2, "c": 3}]))
print(list(add_row3([[1,2,3]])))
[{'a': 1, 'b': 2, 'c': 3, 'total': 6}]
[[1, 2, 3, 6]]

This is one of the core offerings of Fugue. Testing code that uses Spark, Dask or Ray is hard because of the dependency on the hardware. Even if running the tests locally, iteration speed is significantly slower than using Pandas. This setup allows developers to unit test Python or Pandas code and bring it to the distributed setting when ready.

These definitions are compatible with transform() across all execution engines. For example, we can use add_row2 with the Spark engine.

transform(
    df=df, 
    using=add_row2, 
    schema="*,total:int", 
    engine=spark
    ).show()
+---+---+---+-----+
|  a|  b|  c|total|
+---+---+---+-----+
|  1|  1|  1|    3|
|  2|  2|  2|    6|
|  3|  3|  3|    9|
+---+---+---+-----+

The full list of acceptable input and output types for transform() can be found in the Transformer section.

Schema#

We have seen a couple of transform() calls by now and each of them has had the schema passed in. The schema is a requirement for Spark, and heavily recommended for Dask and Ray. When data lives across multiple machines, schema inference can be computationally expensive. Data processing can also end up as inconsistent without explicit schema.

Fugue enforces best practices so that code can run effectively at scale. Here we see how to use Fugue’s representation, which is minimal compared to Spark’s.

For this section we create a DataFrame to use that will be used throughout the examples provided below:

df = pd.DataFrame({"a": [1,2,3], "b": [1,2,3], "c": [1,2,3]})

Adding a column

When using the transform(), the * in a schema expression means all existing columns. From there we can add new columns by adding ",column_name:type".

def add_col(df: pd.DataFrame) -> pd.DataFrame:
    return df.assign(new_col=df["a"] + 1)

transform(
    df=df, 
    using=add_col, 
    schema="*,new_col:int"
    )
a b c new_col
0 1 1 1 2
1 2 2 2 3
2 3 3 3 4

Entirely new schema

There is no need to use the * operation. We can just specify all columns.

def new_df(df: pd.DataFrame) -> pd.DataFrame:
    return pd.DataFrame({"x": [1,2,3], "y": ["a","b","c"]})

transform(
    df=df, 
    using=new_df, 
    schema="x:int,y:str"
    )
x y
0 1 a
1 2 b
2 3 c

There are schema operations. For a deeper look, check the Schema section.

Partitioning#

The type hint conversion we saw earlier is not applied at the DataFrame level but rather on the partition level. If no partitions are supplied, the default engine partitions are used. To get a better clue of partitions, look at the following data.

df = pd.DataFrame({"col1": ["a","a","a","b","b","b"], 
                   "col2": [1,2,3,4,5,6]})
df.head()
col1 col2
0 a 1
1 a 2
2 a 3
3 b 4
4 b 5

First, we create a function that gets the min and max of each group. By the time this function is run, the data will already be split such that there is one group per partition. Note the output is a List[Dict[str,Any]] but we are taking advantage of Fugue to handle the conversion.

def min_max(df:pd.DataFrame) -> List[Dict[str,Any]]:
    return [{"group": df.iloc[0]["col1"], 
             "max": df['col2'].max(), 
             "min": df['col2'].min()}]

We can then pass the partitioning strategy to the transform() function. In this example, we use Dask as the engine.

transform(df, 
          min_max, 
          schema="group:str,max:int,min:int", 
          engine="dask",
          partition={"by":"col1"},
          as_local=True)
group max min
0 a 3 1
1 b 6 4

On Pandas, the partition-transform semantic is close to a groupby-apply. The difference is that the partition-transform paradigm also extends to distributed computing where we control the movement of the physical location of the data. Again, the expression above will also work on Spark, Dask, and Ray by supplying the engine.

Presort#

During the partition operation, we can specify a presort so that the data comes in sorted before the function is applied. For example, we can get the top 2 rows of each group using the function below. This is needed because distributed engines do not guarantee order is preserved when data is partitioned.

def top_two(df:List[Dict[str,Any]]) -> Iterable[Dict[str,Any]]:
    n = 0
    while n < 2:
        yield df[n]
        n = n + 1

transform(
    df=df, 
    using=top_two, 
    schema="*", 
    partition={"by":"col1", "presort": "col2 desc"}
    )        
col1 col2
0 a 3
1 a 2
2 b 6
3 b 5

Loading and Saving Files#

The transform() function is the most minimal function of Fugue, allowing users to distribute one step in their workflow. However, it will not be enough to express end-to-end data workflows that are agnostic to the execution engine. For example, users may still have code that looks like this:

result = transform(df, fn, engine=spark)
result.write.parquet("/tmp/out.parquet")

This still has a dependency on Spark. To support end-to-end workflows, Fugue has other functions compatible with any backend. Here, we look at loading and saving. First we create an example file:

df = pd.DataFrame({"a": [1,2,3], "b": [1,2,3], "c": [1,2,3]})
df.to_parquet("/tmp/df.parquet")

Then we use the Fugue API to call engine-agnostic functions load and save. The fa.transform() function seen below is the same as the transform() function used earlier.

import fugue.api as fa

def add_col(df:pd.DataFrame) -> pd.DataFrame:
    return df.assign(sum=df['a']+df['b']+df['c'])

df = fa.load("/tmp/df.parquet", engine=spark)
out = fa.transform(df ,add_col, schema="*,sum:int", engine=spark)
fa.save(out, "/tmp/out.parquet" ,engine=spark)

This gives us an end-to-end workflow where we can pass in the engine as a variable. If the engine is not passed or is None, all the operations will run on Pandas.

Engine Context#

The load, transform, save operations above all use spark as the engine. Instead of having to write out the engine multiple times, it will be more convenient to specify the execution engine once by using the engine_context.

with fa.engine_context(spark):
    df = fa.load("/tmp/df.parquet")
    out = fa.transform(df ,add_col, schema="*,sum:int")
    fa.save(out, "/tmp/out.parquet")

Now all of these operations will use the Spark Execution Engine. Similar to the ways to specified the execution earlier, passing nothing will use Pandas. Dask and Ray can also be used in the engine_context. This allows users to split out their logic into functional groups based on the execution engine, allowing for workflows that combine Pandas and Spark (for example) elegantly.

The engine specified in the engine_context is a default, meaning that the default can be overridden inside the context if we pass it in a Fugue API function. For example:

with fa.engine_context(spark):
    df = fa.load("/tmp/df.parquet", engine="pandas")         # run this step on Pandas
    out = fa.transform(df ,add_col, schema="*,sum:int")
    fa.save(out, "/tmp/out.parquet")

Conclusion#

The Fugue transform() function is the simplest interface for Fugue. It handles the execution of one function across Pandas, Spark, Dask, and Ray. Most users can easily adopt this minimal wrapper to parallelize existing code that they have already written. It is also minimally invasive, and a lot of users just use transform() for a single step they want to distribute.

For end-to-end workflows that are engine-agnostic, see the Fugue API tutorial. There are many more engine-agnostic functions available for users to create end-to-end workflows that can be run on Pandas, Spark, Dask, or Ray just by changing the engine.

To see other functions Fugue has, check the top level API docs

For any questions, free free to reach out on Slack