# Execution Engine#

It is the heart of Fugue. It is the layer that unifies core concepts of distributed computing, and separates the underlying computing frameworks from user’s high level logic. Normally you don’t directly operate on execution engines. But it’s good to understand the basics.

In Fugue, the only dataset is schemaed dataframes, so although there are other important concepts such as RDD, we don’t cover them. More options may result in more flexibility or more confusion. You can read these 1 2 to get more insights.

However, it is important to understand that, you have full access to any underlying computing frameworks, and to use any specific features including RDD. We unify certain things to make them easy and consistent, but we don’t block anything else.

## Initialization#

Although there is no hard rules for initializing an ExecutionEngine. The general way is to initialize from configs. We should use the config to describe each components, such as what logger to use, what SQLEngine to use and other properties.

Here is the best practice to initialize each built-in ExecutionEngine.

### Native Python#

from fugue import NativeExecutionEngine

engine = NativeExecutionEngine({"myconfig":"abc"})
assert engine.conf.get_or_throw("myconfig" ,str) == "abc"


from distributed import Client
client = Client() # without this, dask is not in distributed mode



### Spark#

from pyspark.sql import SparkSession
from fugue_spark import SparkExecutionEngine

# here is the place you get a spark session, it's the same as if there is no Fugue
# notice that you can configure almost everything in this way, even running mode, such as local mode or other mode
# the best way based on my experience is to only use this way + spark-defaults.conf to initialize SparkSessions.
spark_session = (SparkSession
.builder
.config("spark.executor.cores",4)
.config("fugue.dummy","dummy")
.getOrCreate())

assert engine.conf.get_or_throw("spark.executor.cores" ,int) == 4
assert engine.conf.get_or_throw("fugue.dummy" ,str) == "dummy"


A special feature of Fugue is that, the engine.conf is also accessible on workers (different machines) within all types of Fugue exetensions. The engine itself is only accessible on driver or dirver side extensions.

## Create DataFrame#

With ExecutionEngine, you only need to tell the system I need to create a DataFrame with raw data or dataframes. And with different ExecutionEngines, different types of DataFrames will be created. to_df is the common interface for ExecutionEngines, use it to create DataFrames.

from fugue import NativeExecutionEngine, ArrayDataFrame
from fugue_spark import SparkExecutionEngine

engine1 = NativeExecutionEngine()
engine2 = SparkExecutionEngine() # if spark_session is not provided, it will get the current active session

df1 = engine1.to_df([[0]],"a:int")
df2 = engine2.to_df([[0]],"a:int")
df3 = engine1.to_df(df2)  # see, NativeExecutionEngine can also take SparkDataFrame and convert to a local dataframe

print(type(df1))
print(type(df2))
assert df1.as_array() == df2.as_array() # both materialized on driver, and compare
assert df1.as_array() == df3.as_array()


For every general methods that ExecutionEngine supports, it will take arbitrary DataFrame object and will call to_df to make sure it becomes an engine compatible dataframe. So you may not need to call to_df directly

## Repartition#

This is to partition a DataFrame with PartitionSpec (read this). Normally it’s for your next operation. Calling it directly may or may not have effect.

from fugue_spark import SparkExecutionEngine
from fugue import ArrayDataFrame, PartitionSpec

engine = SparkExecutionEngine()
df = engine.repartition(ArrayDataFrame([[0],[1]],"a:int"), PartitionSpec(by=["a"]))
df.show()
# as you can see df becomes a SparkDataFrame because of the engine, but the repartition does not have any effect


## Map#

Before reading it, read the Partition tutorial first. Map in Fugue is on logical partition level. You can pass in an on_init function which will be called when starting a physical partition. (the following example will not cover on_init)

It’s not important to understand how to use it, because there is a better programming interface on top of it. It’s important to understand the meaning of this method.

from fugue_spark import SparkExecutionEngine
from fugue import ArrayDataFrame, PandasDataFrame, PartitionSpec
import pandas as pd
import numpy as np

def map_func(cursor, df):
key = cursor.key_value_dict.__repr__()
sub = df.as_array().__repr__()
return ArrayDataFrame([[key,sub]],"key:str,data:str")

df = PandasDataFrame(pd.DataFrame(np.random.randint(0,4,size=(20, 3)), columns=list('abc')))
engine = SparkExecutionEngine()

engine.map(
df,
map_func,
output_schema="key:str,data:str",
partition_spec=PartitionSpec(by=["a"])).show()

engine.map(
df,
map_func,
output_schema="key:str,data:str",
partition_spec=PartitionSpec(by=["a","b"],presort="c DESC")).show(rows=100)


Similar to Spark, Fugue is lazy, so persist is a very important operation to control the execution plan. Notice that different from Spark, when calling persist in Fugue, it will materialize the dataframe immediately. Broadcast is also similar to Spark, read this.

Both persist and broadcast are on Fugue dataframes, for one dataframe, if you call persist or broadcast multiple times, only the first calls will take effect.

If you want auto broadcast, it’s supported on Spark, read this, so you can configure the Fugue SparkExecutionEngine with these parameters.

If you want auto persist, it’s supported on Fugue, read this.

from fugue_spark import SparkExecutionEngine
from fugue import ArrayDataFrame, PandasDataFrame, PartitionSpec
from time import sleep
import timeit

def map_func(cursor, df):
sleep(5)
return df

engine = SparkExecutionEngine()

def run(persist):
df = engine.map(
ArrayDataFrame([[0]],"a:int"),
map_func,
output_schema="a:int",
partition_spec=PartitionSpec())
if persist:
df=engine.persist(df)
df.as_array()
df.as_array() # without persist, this will trigger map to run again

print(timeit.timeit(lambda: run(False), number=3)) # 30 sec + overhead
print(timeit.timeit(lambda: run(True), number=3)) # 15 sec + overhead


## Join#

Join is a very common operation for big data problems, it is also highly optimized by all computing frameworks. So if you can convert some task to join operation, it normally will be faster and also scale agnostic.

Currently, these join types are supported: CROSS, LEFT SEMI, LEFT ANTI, INNER, LEFT OUTER, RIGHT OUTER, FULL OUTER.

If you directly use SQL to interact with the underlying frameworks, you may be able to use more join types.

from fugue import NativeExecutionEngine, ArrayDataFrame

df1 = ArrayDataFrame([["k1","a"],["k2","b"]],"a:str,b:str")
df2 = ArrayDataFrame([["k2","c"],["k3","d"]],"a:str,c:str")
df3 = ArrayDataFrame([["k4","c"],["k5","d"]],"d:str,e:str")

engine = NativeExecutionEngine()

# in joins, except for on columns, all other columns must be different in df1 and df2
engine.join(df1,df2,how="semi",on=["a"]).show(title="LEFT SEMI")
engine.join(df1,df2,how="anti",on=["a"]).show(title="LEFT ANTI")
engine.join(df1,df2,how="inner",on=["a"]).show(title="INNER")
engine.join(df1,df2,how="left_outer",on=["a"]).show(title="LEFT OUTER")
engine.join(df1,df2,how="right_outer",on=["a"]).show(title="RIGHT OUTER")
engine.join(df1,df2,how="full_outer",on=["a"]).show(title="FULL OUTER")

# on parameter can be inferred, so you don't really need to specify that
engine.join(df1,df2,how="semi").show(title="LEFT SEMI")
engine.join(df1,df2,how="anti").show(title="LEFT ANTI")
engine.join(df1,df2,how="inner").show(title="INNER")
engine.join(df1,df2,how="left_outer").show(title="LEFT OUTER")
engine.join(df1,df2,how="right_outer").show(title="RIGHT OUTER")
engine.join(df1,df2,how="full_outer").show(title="FULL OUTER")
engine.join(df2,df3,how="cross").show(title="CROSS")

# but what if we want to join two dfs with common columns that is not a join key?
df1_ = df1.rename({"b":"c"}) # rename these columns before join
engine.join(df1, df1_, how="inner").show()


## Zip & Comap#

Sometimes, you need to partition multiple dataframes in the same way (on the same keys), and then on each logical partition, you process them together as a collection. This will require you to zip multiple dataframes partitioned in the same way, and then process them using comap

Read through the following code and run them to see the results. It’s not important to learn how to use them directly because there will be programming interface on top of it. It’s important to see the results and to understand the basic ideas of zip and comap.

from fugue import NativeExecutionEngine, ArrayDataFrame, PartitionSpec

df1 = ArrayDataFrame([["k1","a"],["k1","b"],["k2","b"],["k2","c"]],"a:str,b:str")
df2 = ArrayDataFrame([["k2","c"],["k2","d"],["k3","d"],["k3","e"]],"a:str,b:str")
df3 = ArrayDataFrame([["k4","c"]],"d:str,e:str")

engine = NativeExecutionEngine()
# different from engine.join, engine.zip can accept two dataframes with common columns that are not partition keys
# so it's good practice to be explicit about the the partition spec
print(engine.zip(df1,df2, how="inner", partition_spec = PartitionSpec(by=["a"])).schema)
# if you let zip to infer, it will find the common schemas as the keys
print(engine.zip(df1,df2, how="inner").schema) # notice the difference?
print(engine.zip(df1,df2, how="left_outer", partition_spec = PartitionSpec(by=["a"])).schema)

# dfs is of type DataFames, it always contains all zipped dataframes in order,
# in an outer join some dataframe can be empty instead if None
def map_func(cursor, dfs):
key = cursor.key_value_dict.__repr__()
value = [x.as_array().__repr__() for x in dfs.values()]
return ArrayDataFrame([[key,value]],"key:str,value:[str]")

# comap must operate on zipped dataframe, or error will be thrown

df = engine.zip(df1,df2, how="left_outer", partition_spec = PartitionSpec(by=["a"]))
# for this left outer join case, df2 has no key k1, so in the map_func, it will receive an empty dataframe as df2 on the key k1
engine.comap(df, map_func, output_schema = "key:str,value:[str]", partition_spec = PartitionSpec()).show()

df = engine.zip(df1,df2, how="inner")
engine.comap(df, map_func, output_schema = "key:str,value:[str]", partition_spec = PartitionSpec()).show()


Load and save are also in common interface for ExecutionEngines. Across all ExecutionEngines, parquet, csv and json are supported.

Fugue depends on PyFilesystem2. For details, read this

from fugue_spark import SparkExecutionEngine
from fugue import ArrayDataFrame

df1 = ArrayDataFrame([["1","a"],["1","b"],["2","b"],["2","c"]],"a:str,b:str")

fs = FileSystem()

# simplest examples
engine = SparkExecutionEngine()
engine.save_df(df1,"/tmp/t1.parquet")
print(fs.listdir("/tmp/t1.parquet"))

# you can use format hint to specify file types
engine.save_df(df1,"/tmp/t1", format_hint="parquet")
print(fs.listdir("/tmp/t1"))
# you can load just certain columns, this can be faster for certain execution engine and certain file types

# by default, the save mode is overwrite, you can also use append and error (throw if the file exists)
engine.save_df(df1,"/tmp/t1.parquet", mode="append")
# engine.save_df(df1,"/tmp/t1.parquet", mode="error")  # this will throw exception

# distributed engines can save files as a folder, but sometimes you need a single file, here is how
fs.removetree("/tmp/t1.parquet")
engine.save_df(df1,"/tmp/t1.parquet", force_single = True)
print(fs.isfile("/tmp/t1.parquet"))

# here is how you work with csv
# you can use columns to define the column types. The column names has to match the header

engine.save_df(df1, "/tmp/t1.csv") # by default header will not be saved
# you can use columns to define the column types. The column names has to match the header
engine.load_df("/tmp/t1.csv", columns="a:int,b:str").show() # if header is not saved, you must specify the schema


## SQLEngine#

SQLEngine is based on ExecutionEngine, but is separated from ExecutionEngine. You can write your own SQLEngine to work with different ExecutionEngines.

For most users, you only need to know that you can switch SQLEngine when you have chosen a certain ExecutionEngine. A real use case is that if you have a PrestoSQLEngine to send queries to presto and get dataframe back. you can use it as well as the built in SparkSQLEngine to get data from mutliple sources and process in SparkExecutionEngine

from fugue import SqliteEngine # default engine for NativeExecutionEngine and DaskExecutionEngine
from fugue_spark.execution_engine import SparkSQLEngine # default engine for SparkExecutionEngine
from fugue import ArrayDataFrame, DataFrames, NativeExecutionEngine
from fugue_spark import SparkExecutionEngine

df1 = ArrayDataFrame([[0,1],[1,2]],"a:long,b:long")
df2 = ArrayDataFrame([[1,1],[2,2]],"a:long,c:long")
dfs = DataFrames(x=df1,y=df2)

engine = NativeExecutionEngine()
sql_engine = SqliteEngine(engine)
sql_engine.select(dfs,"SELECT x.*,c FROM x INNER JOIN y ON x.a=y.a").show()

engine = SparkExecutionEngine()
sql_engine = SparkSQLEngine(engine)
sql_engine.select(dfs,"SELECT x.*,c FROM x INNER JOIN y ON x.a=y.a").show()
# spark execution engine can also use SqliteEngine, it's just to show you can switch SQLEngines
sql_engine = SqliteEngine(engine)
sql_engine.select(dfs,"SELECT x.*,c FROM x INNER JOIN y ON x.a=y.a").show()