Fugue Configurations#

Config

Default

Description

fugue.workflow.auto_persist

False

If to auto persist outputs used by multiple following steps

fugue.workflow.auto_persist_value

None

Parameter for auto persist

fugue.workflow.concurrency

1

Max number of tasks that can run in parallel in a DAG (if they do not depend on each other)

fugue.spark.use_pandas_udf

False

Automatically use pandas udf for groupBY apply semantic, see details

fugue.sql.compile.ignore_case

False

When this is True, keywords in FugueSQL will be case insensitive

fugue.rpc.server

NativeRPCServer

Full path to a sublcass of RPCServer

Auto Persist#

Notice: you may not see the expected performance on binder, it’s recommended to run this tutorial on docker on a multiple core machine to get decent performance

Let’s see an example first

from fugue import FugueWorkflow, LocalDataFrame
from fugue_spark import SparkExecutionEngine
from time import sleep
from timeit import timeit

# schema: *
def tf(df:LocalDataFrame, sec=5) -> LocalDataFrame:
    sleep(sec)
    return df


dag = FugueWorkflow(SparkExecutionEngine())
a=dag.df([[0]],"a:int").transform(tf)
b=a.transform(tf)
b.show()
c=a.transform(tf)
c.show()

print(timeit(lambda: dag.run(), number=1))

Surprisingly, it runs for 20+ seconds on Spark. Acutally, the execution order is a -> b -> a -> c. This is because Spark is lazy, it will materialize only if there is an action. And when there is an action, all transformations before the action will run. If the transformation takes 5 hours not 5 seconds, you can imagine how much time wasted.

To make it a -> b -> c, 15 seconds, you can persist a (there can be a bit overhead to run spark, so maybe ~ 16 sec)

dag = FugueWorkflow(SparkExecutionEngine())
a=dag.df([[0]],"a:int").transform(tf).persist()
b=a.transform(tf)
b.show()
c=a.transform(tf)
c.show()

print(timeit(lambda: dag.run(), number=1))

This can be annoying to some people because sometimes you add the second transform c later on and forget to add persist to a. Whenever you change the code, you need to go through the whole logic again to make sure persists are added appropriately.

Fugue has a config to turn on auto persist. When it is True, whichever output that is used multiple times, will be persisted automatically.

dag = FugueWorkflow(SparkExecutionEngine(conf={"fugue.workflow.auto_persist":True}))
a=dag.df([[0]],"a:int").transform(tf)
b=a.transform(tf)
b.show()
c=a.transform(tf)
c.show()

print(timeit(lambda: dag.run(), number=1))

More over, you can set how to auto persist. For example, I want to use MEMORY_ONLY:

dag = FugueWorkflow(SparkExecutionEngine(
    conf={"fugue.workflow.auto_persist":True,
          "fugue.workflow.auto_persist_value":"MEMORY_ONLY"}))
a=dag.df([[0]],"a:int").transform(tf)
b=a.transform(tf)
b.show()
c=a.transform(tf)
c.show()

print(timeit(lambda: dag.run(), number=1))

So why not set auto persist True as default?

  • Being explicit is great

  • Fugue has not implemented auto unpersist, so if you have a lot of steps, the memory usage can be high. But in practice this is very unlikely.

  • Sometimes you want a to be recalculated for certain reasons.

That being said, for many users, you can try to make this a default config. Those are edge cases for advanced users.

Parallel Run#

Notice: you may not see the expected performance on binder, it’s recommended to run this tutorial on docker on a multiple core machine to get decent performance

Still looking at this same example with persist

dag = FugueWorkflow(SparkExecutionEngine())
a=dag.df([[0]],"a:int").transform(tf).persist()
b=a.transform(tf)
b.show()
c=a.transform(tf)
c.show()

print(timeit(lambda: dag.run(), number=1))

Step b and c are not dependent on each other, why they can’t run in parallel? In native Spark approach, because it can’t foresee the following steps, so it can’t automate it. But in Fugue, because it’s DAG, we have the context of both previous and following steps, so this is possible.

dag = FugueWorkflow(SparkExecutionEngine(conf={"fugue.workflow.concurrency":10}))
a=dag.df([[0]],"a:int").transform(tf).persist()
b=a.transform(tf)
b.show()
c=a.transform(tf)
c.show()

print(timeit(lambda: dag.run(), number=1))

fugue.workflow.concurrency means at any time in max how many DAG tasks can run in parallel. When you set it to <=1, the execution is sequential based on your python code order, otherwise, it will parallelize all possible steps.

This config normally should be turned on for distributed frameworks, but should be turned off for NativeExecutionEngine, because when you use NativeExecutionEngine, you want to validate or debug certain piece of your code, so running it on single thread, the error message and your output can be much easier to understand.

So compare with the original example, we can reduce the total run time to half without changing your logic but only by changing configs.

dag = FugueWorkflow(SparkExecutionEngine(
    conf={"fugue.workflow.concurrency":10,
          "fugue.workflow.auto_persist":True}))
a=dag.df([[0]],"a:int").transform(tf)
b=a.transform(tf)
b.show()
c=a.transform(tf)
c.show()

print(timeit(lambda: dag.run(), number=1))

Use Pandas UDF on SparkExecutionEngine#

Notice: you may not see the expected performance on binder, it’s recommended to run this tutorial on docker on a multiple core machine to get decent performance

If you don’t know pandas UDF, read this. With PyArrow and pandas, Spark is able to accelerate certain operations.

In Spark 3.0 it also starts to support some type annotations. But Fugue is more flexibile on type annotations. Besides pd.DataFrame you can also use other annotations including List and Iterable, etc.

For certain cases, no matter what input type you specify, we can see great performance gain. But to maximize the gain, it’s suggested to use pd.DataFrame as the input and output to remove conversion overhead. By doing this, it may hurt the performance on other ExecutionEngines, or SparkExecutionEngine without pandas_udf support. So you need to understand the pros and cons. The best way is to experiment and decide.

In Fugue, only when all of the following are satisfied, it uses pandas_udf, otherwise, it will fall back to the common way.

  • config fugue.spark.use_pandas_udf is set to true

  • partition_spec has to have non empty partition keys

  • output schema can’t have nested types

Plus, for pyspark < 3 this environment variable must be set on driver and all executors:

ARROW_PRE_0_15_IPC_FORMAT=1

otherwise errors will be thrown.

from fugue import FugueWorkflow, DataFrame, ArrayDataFrame, DataFrames
from fugue_spark import SparkExecutionEngine
import pandas as pd
import numpy as np
from timeit import timeit
from typing import Iterable, List, Any

def helper(ct=2000000) -> pd.DataFrame:
    np.random.seed(0)
    return pd.DataFrame(np.random.randint(0,10,size=(ct, 2)), columns=list('ab'))

# schema: a:int,b:double
def median(df:pd.DataFrame) -> List[List[Any]]:
    b = df["b"].median()
    return [[df.loc[0,"a"], b]]

dag = FugueWorkflow()
dag.create(helper).partition(by="a").transform(median).show(title="pandas.median")

engine = SparkExecutionEngine() # normal way
print(timeit(lambda: dag.run(engine), number=1))

engine = SparkExecutionEngine(conf={"fugue.spark.use_pandas_udf":True}) # use pandas_udf in the workflow
print(timeit(lambda: dag.run(engine), number=1))

Ignore Case in Fugue SQL#

Normally, when writing Fugue SQL, you upper case keywords by yourself

from fugue_sql import FugueSQLWorkflow
from fugue import NativeExecutionEngine

with FugueSQLWorkflow() as dag:
    dag("""
    CREATE [[0]] SCHEMA a:int
    PRINT
    """)

But you can turn on fugue.sql.compile.ignore_case

with FugueSQLWorkflow(NativeExecutionEngine(conf={"fugue.sql.compile.ignore_case":True})) as dag:
    dag("""
    create [[0]] schema a:int
    print
    """)

This can make the sql less readable and make you less aware of syntax abiguity or errors, but it may be handy if you want to migrate other sqls into fugue.

RPCServer settings#

If you do not have any callbacks in your workflow, don’t set this config.

For testing callbacks on local machine, don’t set this config. NativeRPCServer Will be used.

Only when you use a distributed execution engine, and you want to use callbacks, set to a server that is distributable.

FlaskRPCServer can be used with a distributed execution engine. Unless you have special needs, you just need to follow the example below.

conf = {
    "fugue.rpc.server": "fugue.rpc.flask.FlaskRPCServer",
    "fugue.rpc.flask_server.host": "0.0.0.0",
    "fugue.rpc.flask_server.port": "1234",
    "fugue.rpc.flask_server.timeout": "2 sec",
}

To use fugue.rpc.flask.FlaskRPCServer, you must set fugue.rpc.flask_server.host and fugue.rpc.flask_server.port, and it’s suggested to also set fugue.rpc.flask_server.timeout to a reasonable timeout for your own case.