Fugue in 10 minutes#

Have questions? Chat with us on Github or Slack:

Homepage Slack Status

This is a short introduction to Fugue geared towards new users. The Fugue project aims to make big data effortless by accelerating iteration speed and providing a simpler interface for users to utilize distributed computing engines.

This tutorial covers the Python interface only. For SQL, check the FugueSQL in 10 minutes section.

Fugue’s intended audience consists of but is not limited to:

  1. Data scientists who need to bring business logic written in Python or Pandas to bigger datasets

  2. Data practitioners looking to parallelize existing code with distributed computing

  3. Data teams that want to reduce the maintenance and testing of boilerplate Spark code

Setup#

For this tutorial, we firstly need to run through some quick setup to instantiate a Spark session

from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/10/13 00:39:06 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable

Bringing a function to Spark, Dask, or Ray#

The simplest way to scale pandas based code to Spark or Dask is with the transform() function. With the addition of this minimal wrapper, we can bring existing Pandas and Python code to distributed execution with minimal refactoring. The transform() function also provides quality of life enhancements that can eliminate boilerplate code for users.

Let’s quickly demonstrate how this concept can be applied. In the following code snippets below we will train a model using scikit-learn and pandas. Then we will perform predictions using this model in parallel on top of Spark through Fugue.

import pandas as pd
import numpy as np
from sklearn.linear_model import LinearRegression

X = pd.DataFrame({"x_1": [1, 1, 2, 2], "x_2":[1, 2, 2, 3]})
y = np.dot(X, np.array([1, 2])) + 3
reg = LinearRegression().fit(X, y)

After training our model, we then wrap it in a predict() function. Bear in mind that this function is still written in pandas making it easy to test on the input_df that we create. Wrapping our model in predict() will allow us to bridge execution to Spark or Dask.

# define our predict function
def predict(df: pd.DataFrame, model: LinearRegression) -> pd.DataFrame:
    """
    Function to predict results using a pre-built model
    """
    return df.assign(predicted=model.predict(df))

# create test data
input_df = pd.DataFrame({"x_1": [3, 4, 6, 6], "x_2":[3, 3, 6, 6]})

# test the predict function
predict(input_df, reg)
x_1 x_2 predicted
0 3 3 12.0
1 4 3 13.0
2 6 6 21.0
3 6 6 21.0

Now this is where it starts to get interesting, let’s bring the same code defined above to Spark using Fugue transform(). We take our DataFrame and apply the predict() function to it using either one of the Spark, Dask, or Ray engines. The transform() function parameters will be explained in detail later on, but for now, notice how we made no modifications to the predict() function in order to switch the execution from Pandas to Spark. All we have to do is pass in the SparkSession as the engine.

# import Fugue
from fugue import transform

# create a spark dataframe
sdf = spark.createDataFrame(input_df)

# use Fugue transform to switch exection to spark
result = transform(
    df=sdf,
    using=predict,
    schema="*,predicted:double",
    params=dict(model=reg),
    engine=spark
)

# display results
print(type(result))
result.show()
<class 'pyspark.sql.dataframe.DataFrame'>
[Stage 2:=======================================>                   (2 + 1) / 3]
+---+---+---------+
|x_1|x_2|predicted|
+---+---+---------+
|  3|  3|     12.0|
|  4|  3|     13.0|
|  6|  6|     21.0|
|  6|  6|     21.0|
+---+---+---------+
                                                                                

The transform() function provides much more flexibility for users than what we have just described above. This is just a simple use case designed to give you a flavor of what Fugue has to offer. For this example, the transform() function took in the following arguments:

  • df - input DataFrame (can be a pandas, Spark, Dask, or Ray DataFrame)

  • using - a Python function with valid input and output types

  • schema - output schema of the operation

  • params - a dictionary of parameters to pass in the function

  • engine - the execution engine to run the operation on pandas, Spark, or Dask

We will delve into these arguments in more detail later on, this will include an explanation of the roles that type hints and schema play. For now, the most important thing to discuss is the engine.

Execution Engines#

The transform() function is able to infer the execution engine associated with the DataFrame type. If the function passes Spark/Dask/Ray DataFrame, the function will run the respective engine by default. Because we supplied the spark variable as the engine, the predict() function will be applied on the Spark DataFrame sdf using Spark. Similarly, passing a Dask or Ray Client as the engine will run the operation on Dask or Ray. This further provides flexibility and ease of use.

# No need to specify engine for these guys
transform(spark_df, fn, ...)
transform(dask_df, fn, ...)
transform(ray_df, fn, ...)

On the other hand, a Pandas DataFrame will require developers to specify the execution engine needed for the function to run.

transform(df, fn, ..., engine="spark")
transform(df, fn, ..., engine="dask")
transform(df, fn, ..., engine="ray")

The above function will convert Pandas DataFrame to that engine’s DataFrame class before applying the operation. Below we see an example of Pandas DataFrame input and Dask DataFrame output. This is the same operation as illustrated above, note how we use the "dask" string to spin up a Dask client.

# using transform to bring predict to dask execution
result = transform(
    df=input_df.copy(),
    using=predict,
    schema="*,predicted:double",
    params=dict(model=reg),
    engine="dask"                 # ensure you have dask installed before running -> pip install dask
)

# display results
print(type(result))
result.compute().head()
<class 'dask.dataframe.core.DataFrame'>
x_1 x_2 predicted
0 3 3 12.0
0 4 3 13.0
0 6 6 21.0
0 6 6 21.0

If no engine is supplied to transform() when using Pandas DataFrame, the function will be executed on the default Pandas-based engine.

Returning a Local DataFrame#

While Fugue can convert Pandas DataFrames to Spark, Dask or Ray DataFrames, it will not convert a distributed DataFrame, unless explicitly specified.

It sometimes more sense to save the output data as a parquet file after transformations. Returning a local Pandas DataFrame after transform() is only recommended for smaller data.

# use as_local=True to return a Pandas DataFrame
local_result = transform(
    df=input_df,
    using=predict,
    schema="*,predicted:double",
    params=dict(model=reg),
    engine="dask",
    as_local=True
)

print(type(local_result))
local_result.head()
<class 'pandas.core.frame.DataFrame'>
x_1 x_2 predicted
0 3 3 12.0
1 4 3 13.0
2 6 6 21.0
3 6 6 21.0

Type Hint Conversion#

In the previous section, we successfully displayed how Fugue can facilitate distributed execution by passing a Pandas based function with valid input and output types to the Spark, Dask or Ray engines. The predict() function we defined above accepted pd.DataFrame as input and pd.DataFrame as output. These type annotations are essentially used by Fugue as a guide in order to allow Fugue to convert the distributed partitions before the function is applied. Loosely speaking, Spark partitions will be converted to multiple Pandas DataFrames.

Fugue can also handle other DataFrame-like input and output types. Take the following function that sums up a whole row and returns one column. We add a special condition that if the value in the summed column is greater than 10, we drop the row.

df = pd.DataFrame({"a": [1,2,3,4], "b": [1,2,3,4], "c": [1,2,3,4]})

def add_row(df: pd.DataFrame) -> pd.DataFrame:
    """
    A function that sums each row in a dataframe and drops the row
    if the sum is greater than 10.
    """
    df = df.assign(total=df.sum(axis=1))
    df = df.loc[df["total"] < 10]
    return df

This can be ran using transform() on all engines.

transform(
    df=df, 
    using=add_row, 
    schema="*,total:int"
    )
a b c total
0 1 1 1 3
1 2 2 2 6
2 3 3 3 9

This same logic can be represented in multiple ways

from typing import List, Iterable, Any, Dict

def add_row2(df: List[Dict[str,Any]]) -> List[Dict[str,Any]]:
    """
    A function that sums each row in a dataframe and drops the row
    if the sum is greater than 10.
    """
    result = []
    for row in df:
        row["total"] = row["a"] + row["b"] + row["c"]
        if row["total"] < 10:
            result.append(row)
    return result

def add_row3(df: List[List[Any]]) -> Iterable[List[Any]]:
    """
    A function that sums each row in a dataframe and drops the row
    if the sum is greater than 10.
    """
    for row in df:
        row.append(sum(row))
        if row[-1] < 10:
            yield row

The input type annotation tells Fugue what to convert the input data to before the function is applied whereas the output type annotation informs Fugue how to convert it back to a Pandas, Spark, Dask, or Ray DataFrame. Notice that these functions are not even dependent on Pandas and can be tested easily. For example:

print(add_row2([{"a": 1, "b": 2, "c": 3}]))
print(list(add_row3([[1,2,3]])))
[{'a': 1, 'b': 2, 'c': 3, 'total': 6}]
[[1, 2, 3, 6]]

This is one of the core offerings of Fugue. Testing code that uses Spark, Dask or Ray is hard because of the dependency on the hardware. Even if running the tests locally, iteration speed is significantly slower than using Pandas. This setup allows developers to unit test Python or Pandas code and bring it to Spark or Dask when ready.

These definitions are compatible with transform() across all execution engines. For example, we can use add_row2 with the Spark engine.

transform(
    df=df, 
    using=add_row2, 
    schema="*,total:int", 
    engine=spark
    ).show()
                                                                                
+---+---+---+-----+
|  a|  b|  c|total|
+---+---+---+-----+
|  1|  1|  1|    3|
|  2|  2|  2|    6|
|  3|  3|  3|    9|
+---+---+---+-----+

The full list of acceptable input and output types for transform() can be found in the Transformer section

Schema#

We have seen a couple of transform() calls by now and each of them has had the schema passed in. The schema is a requirement for Spark, and heavily recommended for Dask and Ray. When data lives across multiple machines, schema inference can be very expensive, and take long. Data processing can also end up as inconsistent without explicit schema.

Fugue enforces best practices so that code can run effectively at scale. Here we see how to use Fugue’s representation, which is minimal compared to Spark’s.

For this section we create a DataFrame to use that will be used throughout the examples provided below:

df = pd.DataFrame({"a": [1,2,3], "b": [1,2,3], "c": [1,2,3]})

Adding a column

When using the transform(), the * in a schema expression means all existing columns. From there we can add new columns by adding ",column_name:type".

def add_col(df: pd.DataFrame) -> pd.DataFrame:
    """
    Function that creates a column with a value of column a + 1.
    """
    return df.assign(new_col=df["a"] + 1)

transform(
    df=df, 
    using=add_col, 
    schema="*,new_col:int"
    )
a b c new_col
0 1 1 1 2
1 2 2 2 3
2 3 3 3 4

Entirely new schema

There is no need to use the * operation. We can just specify all columns.

def new_df(df: pd.DataFrame) -> pd.DataFrame:
    """
    Function that creates a new DataFrame.
    """
    return pd.DataFrame({"x": [1,2,3], "y": ["a","b","c"]})

transform(
    df=df, 
    using=new_df, 
    schema="x:int,y:str"
    )
x y
0 1 a
1 2 b
2 3 c

Partitioning#

The type hint conversion we saw earlier is not applied at the DataFrame level but rather on the partition level. If no partitions are supplied, the default engine partitions are used. To get a better clue of partitions, look at the following function that calculates the size of a DataFrame.

# create a DataFrame
df = pd.DataFrame({"a": [1,2,3,4], "b": [1,2,3,4], "c": [1,2,3,4]})

def size(df: pd.DataFrame) -> Iterable[Dict[str,Any]]:
    """
    Function that calculates the size of a DataFrame.
    """
    yield {"size":df.shape[0]}

Then we run it on Dask using the transform() function.

transform(
    df=df, 
    using=size, 
    schema="size:int", 
    engine="dask",
    as_local=True
    )
size
0 1
1 1
2 1
3 1

The results display 4 rows of data, each with a value of 1. This indicates that the size() function defined above, ran on 4 partitions of the data, each of which contained one row. This demonstrates that the type hint conversion happens on each partition. The concept of partitioning is important for distributed computing. When dealing with big data, it’s more effective to find logically independent groups of data that can serve as the partitioning strategy. Take the following DataFrame:

df = pd.DataFrame({"col1": ["a","a","a","b","b","b"], 
                   "col2": [1,2,3,4,5,6]})
df
col1 col2
0 a 1
1 a 2
2 a 3
3 b 4
4 b 5
5 b 6

Suppose we want to get the min and max values of col2 for each group in col1. First, we define the logic for one group of data. Again, we take advantage of the type hint conversions.

def min_max(df:pd.DataFrame) -> List[Dict[str,Any]]:
    """
    Calculates the min and max of a given column based
    on the grouping of a separate column.
    """
    return [{"group": df.iloc[0]["col1"], 
             "max": df['col2'].max(), 
             "min": df['col2'].min()}]

We can specify the partitioning strategy on the transform() function by doing:

transform(
    df=df, 
    using=min_max, 
    schema="group:str, max:int, min:int",
    partition={"by": "col1"}
    )
group max min
0 a 3 1
1 b 6 4

On Pandas, the partition-transform semantic is close to a groupby-apply. The difference is that the partition-transform paradigm also extends to distributed computing where we control the movement of the physical location of the data. Again, the expression above will also work on Spark and Dask by supplying the engine.

transform(
    df=df,
    using=min_max,
    schema="group:str,max:int,min:int",
    partition={"by": "col1"},
    engine="dask",
    as_local=True
    )
group max min
0 b 6 4
1 a 3 1

Presort#

During the partition operation, we can specify a presort so that the data comes in sorted before the function is applied. For example, we can get the top 2 rows of each group using

def top_two(df:List[Dict[str,Any]]) -> Iterable[Dict[str,Any]]:
    """
    Function that returns the top 2 rows of an iterable.
    """
    n = 0
    while n < 2:
        yield df[n]
        n = n + 1

transform(
    df=df, 
    using=top_two, 
    schema="*", 
    partition={"by":"col1", "presort": "col2 desc"}
    )        
col1 col2
0 a 3
1 a 2
2 b 6
3 b 5

File IO with transform()#

Input file

The transform() function also supports loading parquet files from source. With this, users don’t need to worry about passing a Pandas or Spark DataFrame to transform(). It will be loaded by the engine.

Output file

Users can also write out the result of the transform() operation as a parquet file. This can be done by providing a save_path.

Only parquet files are supported because CSVs often have additional configuration that make it harder to deal with. Parquet files are also a best practice for big data because they are partitioned and hold schema information.

Absolute paths are also a best practice for big data.

df = pd.DataFrame({"a": [1,2,3], "b": [1,2,3], "c": [1,2,3]})
df.to_parquet("/tmp/df.parquet")
def drop_col(df: pd.DataFrame) -> pd.DataFrame:
    """
    A function that drops a column labelled 'b'.
    """
    return df.drop("b", axis=1)

transform(
    df="/tmp/df.parquet",
    using=drop_col,
    schema="*-b",
    engine=spark,
    save_path="/tmp/processed.parquet"
    )

pd.read_parquet("/tmp/processed.parquet/").head()
a c
0 1 1
1 2 2
2 3 3

This expression makes it easy for users to toggle between running Pandas with sampled data and using Spark or Dask on the full dataset.

Conclusion#

The Fugue transform() function is the simplest interface for Fugue. It handles the execution of one function across Pandas, Spark, Dask, and Ray. Most users can easily adopt this minimal wrapper to parallelize existing code that they have already written. For end-to-end workflows, see FugueWorkflow.

For any questions, free free to reach out on Slack