Fugue with Polars#

Have questions? Chat with us on Github or Slack:

Homepage Slack Status

Polars is a Rust-based DataFrame library that supports multi-threaded and out-of-core operations. The performance of Polars is already very good on a local machine, so the focus of the Fugue-Polars integration is scaling out to a cluster. Fugue also has FugueSQL to run SQL on top of DataFrames, but it is a lower priority for Polars because of the existing DuckDB integration that can be used pretty easily with Polars. Because both are based on Apache Arrow, they can be used together with zero-copy as will also be shown here.

Here are the use cases for this scenario:

  • Using DuckDB and Polars together seamlessly.

  • Scale out polars code on top of a Spark, Dask, Ray cluster to speed up computation.

  • In some cases, combinations like using Spark to run Polars jobs can be faster than native Spark itself (more in this later).

Note that for distributed computing operations, a big overhead comes from data transfer over the cluster. This means that even if the actual compute executes quickly, the transfer may make total execution time longer. Users need to test whether bring Polars code to a cluster will be worth it.

Setup#

First, we create a Polars DataFrame and a function named diff() to get the difference (note there is no group). The type annotation of pl.DataFrame in the function below is needed by Fugue to eventually bring the code to Spark, Dask, or Ray. Note that the diff() function should be applied per group, but we can handle it on the Fugue level later. The Polars function is meant to handle one group at a time. This concept will be more important when we start to use the distributed engines like Spark, Dask, and Ray.

For those new to Fugue, the comment above the function is used by Fugue to bring functions to Spark, Dask, and Ray.

import polars as pl

data = {"id": ["A", "A", "A", "B", "B", "B", "C", "C", "C"],
        "number": [10, 20, 30, 15, 25, 35, 20, 30, 40]}
df = pl.DataFrame(data)

# schema: *, diff:float
def diff(df: pl.DataFrame) -> pl.DataFrame:
    return df.with_columns(pl.col("number").diff().alias("diff"))

diff(df).head()
shape: (5, 3)
idnumberdiff
stri64i64
"A"10null
"A"2010
"A"3010
"B"15-15
"B"2510

Using DuckDB and Polars through FugueSQL#

There is great synergy with DuckDB and Polars because they are both based on Apache Arrow, which allows them to be interoperable with zero copy. There is no performance degradation by using these tools together. DuckDB is used to perform the SQL operations, and Polars handled the Python transformations that may be more tedious to express in SQL.

from fugue_jupyter import setup
setup()
%%fsql duckdb
SELECT *
  FROM df
 WHERE id IN ('B', 'C')

TRANSFORM PREPARTITION BY id USING diff 
PRINT
id:str number:long diff:float
0 B 15 NaN
1 B 25 10.0
2 B 35 10.0
3 C 20 NaN
4 C 30 10.0
5 C 40 10.0
PandasDataFrame: id:str,number:long,diff:float

Similarly, to use it in a script:

import fugue.api as fa
from fugue.api import fugue_sql

res = fugue_sql("""
                SELECT *
                FROM df
                WHERE id IN ('B', 'C')

                TRANSFORM PREPARTITION BY id USING diff 
                """, engine="duckdb")

# Bring it into Polars from Arrow if needed.
pl.from_arrow(fa.as_arrow(res))
shape: (6, 3)
idnumberdiff
stri64f32
"B"15null
"B"2510.0
"B"3510.0
"C"20null
"C"3010.0
"C"4010.0

fa.as_arrow will convert whatever the SQL output is to Arrow. If the output is already an Arrow table, nothing will happen. Still, it’s good to be explicitl about the conversion. From there, Polars can read in with the pl.from_arrow.

Running Polars Code Distributedly#

Polars already has amazing support for streaming larger than RAM datasets. Still, there are use cases for combining Polars and distributed backends like Spark, Dask, and Ray.

  • Instead of vertically scaling compute, we can scale horizontally instead by adding machines to our cluster. This can be more cost effective if we only need more workers for some more expensive steps.

  • If operations take too long on a local machine, it might be worth it to use a cluster to accelerate it.

We can bring it to Spark without much code change. For Dask and Ray, it will be similar. Check the execution engine tutorial.

from fugue.api import transform
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

# Output is Spark DataFrame
res = transform(df, diff, partition={"by": "id"}, engine=spark)

pl.from_arrow(fa.as_arrow(res))
shape: (9, 3)
idnumberdiff
stri64f32
"B"15null
"B"2510.0
"B"3510.0
"C"20null
"C"3010.0
"C"4010.0
"A"10null
"A"2010.0
"A"3010.0

Tuning Performance#

As mentioned in the introduction, using Polars distributedly takes some tuning. This is true for distributed computing in general; if a local data processing with Pandas or Polars is good enough, there is no need to introduce distributed computing. Bringing data and code to the cluster can just introduce overhead and slow down the work.

With that in mind, the best operations to scale to Spark will like be compute-bound operations that take long. For example, you have 10 groups of data and processing each group can take 30 mins, it may make sense to spin up a cluster of 10 machines and have each one process a group. But there is still an open question when it comes to partitioning.

Take the scenario where we have 100 distinct groups to operate on and 10 machines. We can:

  1. Create 100 partitions with Spark and run 100 Polars jobs

  2. Create 10 partitions in Spark (each holding 10 groups), and then each Polars job handles 10 groups

The Fugue team worked on benchmarks and found that the second scenario can actually be significantly faster. In some cases (like calculating the z-score per group), it can actually be faster than native Spark. Either way, the point of Fugue is not to necessarily optimize for speed, but to adjust to the grammar users want to use.

Still, this exact scenario led Fugue to the concept of “coarse partitioning”, where we have few but larger partitiong. It can be used with one line of code, but bear in mind we need to adjust the function. Note the time can be inaccurate on a local machine.

# schema: *, diff:float
def diff(df: pl.DataFrame) -> pl.DataFrame:
    return df.with_columns(pl.col("number").diff().over('id').alias("diff"))

res = transform(df, diff, partition={"by": "id", "algo": "coarse"}, engine=spark)

pl.from_arrow(fa.as_arrow(res))
shape: (9, 3)
idnumberdiff
stri64f32
"B"15null
"B"2510.0
"B"3510.0
"C"20null
"C"3010.0
"C"4010.0
"A"10null
"A"2010.0
"A"3010.0

Other Input and Output Types#

To apply a function per group of data, we can use the Iterable[pl.DataFrame] input annotation and the Iterator[pl.DataFrame] output annotation. Note that the min_per_group() function below gets the max() value. This aggregation is done for each partition (or group).

For this specific example, the logic can also be expressed in Polars, but for more complicated transformations on each logical group, this setup is an option to express the logic. It will also be scalable to Spark/Dask/Ray.

from typing import Iterable, Iterator

# schema: *, c:int
def min_per_group(dfs: Iterable[pl.DataFrame]) -> Iterator[pl.DataFrame]:
    for df in dfs:
        tdf = df.max().with_columns(pl.lit(1, pl.Int32()).alias("c"))
        yield tdf
transform(df, min_per_group, partition={"by": "id"})
id number c
0 A 30 1
1 B 35 1
2 C 40 1