Execution Engine
Contents
Execution Engine#
Have questions? Chat with us on Github or Slack:
We have already seen that the transform()
function takes in an engine
. There are three main ways to define the engine for Fugue. Remember that using an execution engine requires that library to be installed.
Setup#
We can test all of the engines using this dummy setup. Note that the schema is already defined with a schema hint.
import pandas as pd
from fugue import transform
df = pd.DataFrame({"col1": [1,2,3,4], "col2": [1,2,3,4]})
# schema: *, col3:int
def add_cols(df:pd.DataFrame) -> pd.DataFrame:
return df.assign(col3 = df['col1'] + df['col2'])
Passing a String#
This is the easiest to use. Normally, this means that the engine will be spun up locally and use all cores of the machine.
Fugue can take in the following strings: "spark"
, "dask"
, "ray"
Spark
spark_df = transform(df, add_cols, engine="spark")
spark_df.show()
+----+----+----+
|col1|col2|col3|
+----+----+----+
| 1| 1| 2|
| 2| 2| 4|
| 3| 3| 6|
| 4| 4| 8|
+----+----+----+
Dask
dask_df = transform(df, add_cols, engine="dask")
dask_df.compute().head()
col1 | col2 | col3 | |
---|---|---|---|
0 | 1 | 1 | 2 |
1 | 2 | 2 | 4 |
0 | 3 | 3 | 6 |
1 | 4 | 4 | 8 |
Ray
ray_df = transform(df, add_cols, engine="ray")
ray_df.show(5)
2023-01-02 17:51:25,398 INFO worker.py:1509 -- Started a local Ray instance. View the dashboard at http://127.0.0.1:8265
Repartition: 25%|██▌ | 4/16 [00:00<00:01, 6.16it/s]
Map_Batches: 100%|██████████| 16/16 [00:01<00:00, 10.87it/s]
{'col1': 1, 'col2': 1, 'col3': 2}
{'col1': 2, 'col2': 2, 'col3': 4}
{'col1': 3, 'col2': 3, 'col3': 6}
{'col1': 4, 'col2': 4, 'col3': 8}
Passing the Client or Session#
Fugue will also know how to interpret the engine if a Client or Session is passed.
Spark
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
spark_df = transform(df, add_cols, engine=spark)
spark_df.show()
+----+----+----+
|col1|col2|col3|
+----+----+----+
| 1| 1| 2|
| 2| 2| 4|
| 3| 3| 6|
| 4| 4| 8|
+----+----+----+
Dask
from distributed import Client
dask_client = Client()
dask_df = transform(df, add_cols, engine=dask_client)
dask_df.compute().head()
col1 | col2 | col3 | |
---|---|---|---|
0 | 1 | 1 | 2 |
1 | 2 | 2 | 4 |
2 | 3 | 3 | 6 |
3 | 4 | 4 | 8 |
Ray
This one is a bit different because Ray code doesn’t invoke the Client.
import ray
ray.init(ignore_reinit_error=True)
ray_df = transform(df, add_cols, engine="ray")
ray_df.show(5)
2023-01-02 17:52:30,226 INFO worker.py:1351 -- Calling ray.init() again after it has already been called.
Repartition: 25%|██▌ | 4/16 [00:00<00:00, 418.10it/s]
Map_Batches: 100%|██████████| 16/16 [00:00<00:00, 166.52it/s]
{'col1': 1, 'col2': 1, 'col3': 2}
{'col1': 2, 'col2': 2, 'col3': 4}
{'col1': 3, 'col2': 3, 'col3': 6}
{'col1': 4, 'col2': 4, 'col3': 8}
Passing a Cluster Address#
Fugue also has utilities to interact with clusters directly. For these, you need to be authenticated with the service like Databricks, Coiled or Anyscale.
The more complete documentation can be found in the cloudprovider section
Some examples:
# Databricks
transform(tdf, dummy, engine="db", engine_conf=conf)
# Coiled
transform(df, add_cols, engine="coiled:my_cluster")
# Anyscale
transform(df, add_cols, engine="anyscale://project/cluster-1")
These will naturally require more configuration so the Fugue Cloudprovider documentation will be have more details.
Engine Conf#
As seen in the Databricks example above, we can also pass in a configuration to our execution engines. The most common one is:
spark_df = transform(df,
add_cols,
engine=spark,
engine_conf={"fugue.spark.use_pandas_udf":True})
spark_df.show(2)
+----+----+----+
|col1|col2|col3|
+----+----+----+
| 1| 1| 2|
| 2| 2| 4|
+----+----+----+
only showing top 2 rows
For a full list of configurations, check this page