Engine Context#

Have questions? Chat with us on Github or Slack:

Homepage Slack Status

So far we’ve used Fugue’s transform() function to port Pandas code to Spark, Dask, and Ray without any rewrites. We also the Fugue API functions save() and load() in the previous section. In the last section, we encountered code that looked like the following:

import fugue.api as fa

df = fa.load("/tmp/f.parquet", engine="dask")
res = fa.transform(df, dummy, schema="*", engine="dask")
fa.save(res, "/tmp/f_out.parquet", engine="dask")

We had to repeat the engine multiple times. To simplify this, we can use the engine_context to set a default execution engine for all of the Fugue API functions used inside. For example, all of the Fugue functions below will run on the Dask engine.

import pandas as pd
import fugue.api as fa 

df = pd.DataFrame({"a": [1,2]})
df.to_parquet("/tmp/f.parquet")

def dummy(df:pd.DataFrame) -> pd.DataFrame:
    return df

with fa.engine_context("dask"):
    df = fa.load("/tmp/f.parquet")
    res = fa.transform(df, dummy, schema="*")
    fa.show(res)
    fa.save(res, "/tmp/f_out.parquet")
DaskDataFrame
a:long
------
1     
2     
Total count: 2

From the output of the show() function, we can see that Dask was used to execute the operations. Using the engine_context() is not necessarily required, but it can heavily simplify the code.

Overriding the Engine#

The engine_context() just sets a default engine, so it can be overridden if needed. In the example below, we use engine=None to use Pandas, but we’ll specify the engine for the transform() call because it may be compute intensive.

with fa.engine_context(engine=None):
    df = fa.load("/tmp/f.parquet")
    res = fa.transform(df, dummy, schema="*", engine="dask")
    fa.show(res)
DaskDataFrame
a:long
------
1     
2     
Total count: 2

Even if we passed no engine to the engine_context, the Dask engine was used in the transform() step and returned a Dask DataFrame.

Functions and engine_context()#

The same behavior will apply if a Python function calls Fugue API functions. This allows for grouping of logic into engine-agnostic functions.

def logic():
    df = fa.load("/tmp/f.parquet")
    res = fa.transform(df, dummy, schema="*")
    fa.show(res)

with fa.engine_context("dask"):
    logic()
DaskDataFrame
a:long
------
1     
2     
Total count: 2

We can also wrap the whole engine_context() block under a function and pass in the engine. The output DataFrame will follow the engine passed. In the example below, a Dask DataFrame is returned.

def logic(engine):
    with fa.engine_context(engine):
        df = fa.load("/tmp/f.parquet")
        res = fa.transform(df, dummy, schema="*")
    return res


out = logic("dask")
print(type(out))
<class 'dask.dataframe.core.DataFrame'>

Other Python Code#

The code inside the engine_context() is not limited to Fugue API functions. For example, loops can be used if an operation is being used multiple times.

from fugue.column import col, lit

with fa.engine_context():
    df = fa.load("/tmp/f.parquet")
    df = fa.assign(df, x=lit(1))
    for i in range(4):
        df = fa.assign(df, x=col("x")*lit(2))
    fa.show(df)
PandasDataFrame
a:long|x:long
------+------
1     |16    
2     |16    
Total count: 2

Decoupling of Logic and Execution#

This section illustrates how to piece together end-to-end workflows that can then we run on Pandas, Spark, Dask, or Ray. The logic is fully decoupled from the execution, which is one of the primary motivations of Fugue. This solves the following problems:

  1. Users have to learn an entirely new framework to work with distributed computing problems

  2. Logic written for a small data project is not reusable for a big data project

  3. Testing becomes a heavyweight process for distributed computing, especially Spark

  4. Along with number 3, iterations for distributed computing problems become slower and more expensive

Fugue’s core principle is to minimize code dependency on frameworks as much as possible, leading to flexibility and portability. By decoupling logic and execution, we can focus on our logic in a scale-agnostic way. In this section, we saw how to build end-to-end workflows with the Fugue API and the engine_context().

Summary#

In this section we covered the engine_context() function, which sets the default execution engine for Fugue function calls. By wrapping it or using it alongside functions, it will be easier to group pieces of logic together to form framework-agnostic workloads. This can also be extended to create workflows that utilize different engines. A common use case is heavy processing with Spark, Dask, or Ray, and then doing post-processing with Pandas.