# Execution Engine#

Have questions? Chat with us on Github or Slack:

We have already seen that the transform() function takes in an engine. There are three main ways to define the engine for Fugue. Remember that using an execution engine requires that library to be installed.

## Setup#

We can test all of the engines using this dummy setup. Note that the schema is already defined with a schema hint.

import pandas as pd
from fugue import transform

df = pd.DataFrame({"col1": [1,2,3,4], "col2": [1,2,3,4]})

# schema: *, col3:int
return df.assign(col3 = df['col1'] + df['col2'])


## Passing a String#

This is the easiest to use. Normally, this means that the engine will be spun up locally and use all cores of the machine.

Fugue can take in the following strings: "spark", "dask", "ray"

Spark

spark_df = transform(df, add_cols, engine="spark")
spark_df.show()

+----+----+----+
|col1|col2|col3|
+----+----+----+
|   1|   1|   2|
|   2|   2|   4|
|   3|   3|   6|
|   4|   4|   8|
+----+----+----+


dask_df = transform(df, add_cols, engine="dask")

col1 col2 col3
0 1 1 2
1 2 2 4
0 3 3 6
1 4 4 8

Ray

ray_df = transform(df, add_cols, engine="ray")
ray_df.show(5)

2023-01-02 17:51:25,398	INFO worker.py:1509 -- Started a local Ray instance. View the dashboard at http://127.0.0.1:8265
Repartition:  25%|██▌       | 4/16 [00:00<00:01,  6.16it/s]
Map_Batches: 100%|██████████| 16/16 [00:01<00:00, 10.87it/s]

{'col1': 1, 'col2': 1, 'col3': 2}
{'col1': 2, 'col2': 2, 'col3': 4}
{'col1': 3, 'col2': 3, 'col3': 6}
{'col1': 4, 'col2': 4, 'col3': 8}




## Passing the Client or Session#

Fugue will also know how to interpret the engine if a Client or Session is passed.

Spark

from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

spark_df.show()

+----+----+----+
|col1|col2|col3|
+----+----+----+
|   1|   1|   2|
|   2|   2|   4|
|   3|   3|   6|
|   4|   4|   8|
+----+----+----+


from distributed import Client


col1 col2 col3
0 1 1 2
1 2 2 4
2 3 3 6
3 4 4 8

Ray

This one is a bit different because Ray code doesn’t invoke the Client.

import ray
ray.init(ignore_reinit_error=True)

ray_df.show(5)

2023-01-02 17:52:30,226	INFO worker.py:1351 -- Calling ray.init() again after it has already been called.
Repartition:  25%|██▌       | 4/16 [00:00<00:00, 418.10it/s]
Map_Batches: 100%|██████████| 16/16 [00:00<00:00, 166.52it/s]

{'col1': 1, 'col2': 1, 'col3': 2}
{'col1': 2, 'col2': 2, 'col3': 4}
{'col1': 3, 'col2': 3, 'col3': 6}
{'col1': 4, 'col2': 4, 'col3': 8}




Fugue also has utilities to interact with clusters directly. For these, you need to be authenticated with the service like Databricks, Coiled or Anyscale.

The more complete documentation can be found in the cloudprovider section

Some examples:

# Databricks
transform(tdf, dummy, engine="db", engine_conf=conf)

# Coiled

# Anyscale


These will naturally require more configuration so the Fugue Cloudprovider documentation will be have more details.

## Engine Conf#

As seen in the Databricks example above, we can also pass in a configuration to our execution engines. The most common one is:

spark_df = transform(df,
engine=spark,
engine_conf={"fugue.spark.use_pandas_udf":True})
spark_df.show(2)

+----+----+----+
|col1|col2|col3|
+----+----+----+
|   1|   1|   2|
|   2|   2|   4|
+----+----+----+
only showing top 2 rows