Engine Context
Contents
Engine Context#
Have questions? Chat with us on Github or Slack:
So far we’ve used Fugue’s transform()
function to port Pandas code to Spark, Dask, and Ray without any rewrites. We also the Fugue API functions save()
and load()
in the previous section. In the last section, we encountered code that looked like the following:
import fugue.api as fa
df = fa.load("/tmp/f.parquet", engine="dask")
res = fa.transform(df, dummy, schema="*", engine="dask")
fa.save(res, "/tmp/f_out.parquet", engine="dask")
We had to repeat the engine multiple times. To simplify this, we can use the engine_context
to set a default execution engine for all of the Fugue API functions used inside. For example, all of the Fugue functions below will run on the Dask engine.
import pandas as pd
import fugue.api as fa
df = pd.DataFrame({"a": [1,2]})
df.to_parquet("/tmp/f.parquet")
def dummy(df:pd.DataFrame) -> pd.DataFrame:
return df
with fa.engine_context("dask"):
df = fa.load("/tmp/f.parquet")
res = fa.transform(df, dummy, schema="*")
fa.show(res)
fa.save(res, "/tmp/f_out.parquet")
DaskDataFrame
a:long
------
1
2
Total count: 2
From the output of the show()
function, we can see that Dask was used to execute the operations. Using the engine_context()
is not necessarily required, but it can heavily simplify the code.
Overriding the Engine#
The engine_context()
just sets a default engine, so it can be overridden if needed. In the example below, we use engine=None
to use Pandas, but we’ll specify the engine for the transform()
call because it may be compute intensive.
with fa.engine_context(engine=None):
df = fa.load("/tmp/f.parquet")
res = fa.transform(df, dummy, schema="*", engine="dask")
fa.show(res)
DaskDataFrame
a:long
------
1
2
Total count: 2
Even if we passed no engine to the engine_context
, the Dask engine was used in the transform()
step and returned a Dask DataFrame.
Functions and engine_context()#
The same behavior will apply if a Python function calls Fugue API functions. This allows for grouping of logic into engine-agnostic functions.
def logic():
df = fa.load("/tmp/f.parquet")
res = fa.transform(df, dummy, schema="*")
fa.show(res)
with fa.engine_context("dask"):
logic()
DaskDataFrame
a:long
------
1
2
Total count: 2
We can also wrap the whole engine_context()
block under a function and pass in the engine. The output DataFrame will follow the engine passed. In the example below, a Dask DataFrame is returned.
def logic(engine):
with fa.engine_context(engine):
df = fa.load("/tmp/f.parquet")
res = fa.transform(df, dummy, schema="*")
return res
out = logic("dask")
print(type(out))
<class 'dask.dataframe.core.DataFrame'>
Other Python Code#
The code inside the engine_context()
is not limited to Fugue API functions. For example, loops can be used if an operation is being used multiple times.
from fugue.column import col, lit
with fa.engine_context():
df = fa.load("/tmp/f.parquet")
df = fa.assign(df, x=lit(1))
for i in range(4):
df = fa.assign(df, x=col("x")*lit(2))
fa.show(df)
PandasDataFrame
a:long|x:long
------+------
1 |16
2 |16
Total count: 2
Decoupling of Logic and Execution#
This section illustrates how to piece together end-to-end workflows that can then we run on Pandas, Spark, Dask, or Ray. The logic is fully decoupled from the execution, which is one of the primary motivations of Fugue. This solves the following problems:
Users have to learn an entirely new framework to work with distributed computing problems
Logic written for a small data project is not reusable for a big data project
Testing becomes a heavyweight process for distributed computing, especially Spark
Along with number 3, iterations for distributed computing problems become slower and more expensive
Fugue’s core principle is to minimize code dependency on frameworks as much as possible, leading to flexibility and portability. By decoupling logic and execution, we can focus on our logic in a scale-agnostic way. In this section, we saw how to build end-to-end workflows with the Fugue API and the engine_context()
.
Summary#
In this section we covered the engine_context()
function, which sets the default execution engine for Fugue function calls. By wrapping it or using it alongside functions, it will be easier to group pieces of logic together to form framework-agnostic workloads. This can also be extended to create workflows that utilize different engines. A common use case is heavy processing with Spark, Dask, or Ray, and then doing post-processing with Pandas.