Execution Engine#

Have questions? Chat with us on Github or Slack:

Homepage Slack Status

We have already seen that the transform() function takes in an engine. There are three main ways to define the engine for Fugue. Remember that using an execution engine requires that library to be installed.

Setup#

We can test all of the engines using this dummy setup. Note that the schema is already defined with a schema hint.

import pandas as pd
from fugue import transform 

df = pd.DataFrame({"col1": [1,2,3,4], "col2": [1,2,3,4]})

# schema: *, col3:int
def add_cols(df:pd.DataFrame) -> pd.DataFrame:
    return df.assign(col3 = df['col1'] + df['col2'])

Passing a String#

This is the easiest to use. Normally, this means that the engine will be spun up locally and use all cores of the machine.

Fugue can take in the following strings: "spark", "dask", "ray"

Spark

spark_df = transform(df, add_cols, engine="spark")
spark_df.show()
+----+----+----+
|col1|col2|col3|
+----+----+----+
|   1|   1|   2|
|   2|   2|   4|
|   3|   3|   6|
|   4|   4|   8|
+----+----+----+

Dask

dask_df = transform(df, add_cols, engine="dask")
dask_df.compute().head()
col1 col2 col3
0 1 1 2
1 2 2 4
0 3 3 6
1 4 4 8

Ray

ray_df = transform(df, add_cols, engine="ray")
ray_df.show(5)
2023-01-02 17:51:25,398	INFO worker.py:1509 -- Started a local Ray instance. View the dashboard at http://127.0.0.1:8265 
Repartition:  25%|██▌       | 4/16 [00:00<00:01,  6.16it/s]
Map_Batches: 100%|██████████| 16/16 [00:01<00:00, 10.87it/s]
{'col1': 1, 'col2': 1, 'col3': 2}
{'col1': 2, 'col2': 2, 'col3': 4}
{'col1': 3, 'col2': 3, 'col3': 6}
{'col1': 4, 'col2': 4, 'col3': 8}

Passing the Client or Session#

Fugue will also know how to interpret the engine if a Client or Session is passed.

Spark

from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

spark_df = transform(df, add_cols, engine=spark)
spark_df.show()
+----+----+----+
|col1|col2|col3|
+----+----+----+
|   1|   1|   2|
|   2|   2|   4|
|   3|   3|   6|
|   4|   4|   8|
+----+----+----+

Dask

from distributed import Client
dask_client = Client()

dask_df = transform(df, add_cols, engine=dask_client)
dask_df.compute().head()
col1 col2 col3
0 1 1 2
1 2 2 4
2 3 3 6
3 4 4 8

Ray

This one is a bit different because Ray code doesn’t invoke the Client.

import ray
ray.init(ignore_reinit_error=True)

ray_df = transform(df, add_cols, engine="ray")
ray_df.show(5)
2023-01-02 17:52:30,226	INFO worker.py:1351 -- Calling ray.init() again after it has already been called.
Repartition:  25%|██▌       | 4/16 [00:00<00:00, 418.10it/s]
Map_Batches: 100%|██████████| 16/16 [00:00<00:00, 166.52it/s]
{'col1': 1, 'col2': 1, 'col3': 2}
{'col1': 2, 'col2': 2, 'col3': 4}
{'col1': 3, 'col2': 3, 'col3': 6}
{'col1': 4, 'col2': 4, 'col3': 8}

Passing a Cluster Address#

Fugue also has utilities to interact with clusters directly. For these, you need to be authenticated with the service like Databricks, Coiled or Anyscale.

The more complete documentation can be found in the cloudprovider section

Some examples:

# Databricks
transform(tdf, dummy, engine="db", engine_conf=conf)

# Coiled
transform(df, add_cols, engine="coiled:my_cluster")

# Anyscale
transform(df, add_cols, engine="anyscale://project/cluster-1")

These will naturally require more configuration so the Fugue Cloudprovider documentation will be have more details.

Engine Conf#

As seen in the Databricks example above, we can also pass in a configuration to our execution engines. The most common one is:

spark_df = transform(df, 
                     add_cols, 
                     engine=spark, 
                     engine_conf={"fugue.spark.use_pandas_udf":True})
spark_df.show(2)
+----+----+----+
|col1|col2|col3|
+----+----+----+
|   1|   1|   2|
|   2|   2|   4|
+----+----+----+
only showing top 2 rows

For a full list of configurations, check this page