Fugue Configurations#

Have questions? Chat with us on Github or Slack:

Homepage Slack Status

Config

Default

Description

fugue.spark.use_pandas_udf

True

Automatically use pandas udf for groupBY apply semantic, see details

fugue.sql.compile.ignore_case

False

When this is True, keywords in FugueSQL will be case insensitive

fugue.rpc.server

NativeRPCServer

Full path to a subclass of RPCServer

Use Pandas UDF on SparkExecutionEngine#

Notice: you may not see the expected performance on binder, it’s recommended to run this tutorial on docker on a multiple core machine to get decent performance

If you don’t know pandas UDF, read this. With PyArrow and pandas, Spark is able to accelerate certain operations.

In Spark 3.0 it also starts to support some type annotations. But Fugue is more flexibile on type annotations. Besides pd.DataFrame you can also use other annotations including List and Iterable, etc.

For certain cases, no matter what input type you specify, we can see great performance gain. But to maximize the gain, it’s suggested to use pd.DataFrame as the input and output to remove conversion overhead. By doing this, it may hurt the performance on other execution engines, or on Spark without pandas_udf support. So you need to understand the pros and cons. The best way is to experiment and decide.

In Fugue, only when all of the following are satisfied, it uses pandas_udf, otherwise, it will fall back to the common way.

  • config fugue.spark.use_pandas_udf is set to True (default)

  • partition_spec has to have non empty partition keys

  • output schema can’t have nested types

  • Spark config spark.sql.execution.arrow.pyspark.enabled is set to "true"

Plus, for pyspark < 3 this environment variable must be set on driver and all executors:

ARROW_PRE_0_15_IPC_FORMAT=1

otherwise errors will be thrown.

from pyspark.sql import SparkSession
spark = SparkSession.builder\
    .config("spark.sql.execution.arrow.pyspark.enabled", "true")\
    .getOrCreate()
import pandas as pd
import numpy as np
from timeit import timeit
from typing import Iterable, List, Any
import fugue.api as fa

def helper(ct=2000000) -> pd.DataFrame:
    np.random.seed(0)
    return pd.DataFrame(np.random.randint(0,10,size=(ct, 2)), columns=list('ab'))

# schema: a:int,b:double
def median(df:pd.DataFrame) -> List[List[Any]]:
    b = df["b"].median()
    return [[df.loc[0,"a"], b]]

def run(engine, conf=None):
    with fa.engine_context(engine):
        res = fa.transform(helper(), 
                           median,
                           partition={"by": "a"}, 
                           engine_conf=conf
                           )
        fa.show(res, 5, title="pandas.median");

print(timeit(lambda: run(spark), number=1))

conf = {"fugue.spark.use_pandas_udf":True}
print(timeit(lambda: run(spark, conf=conf), number=1))
23/01/03 00:16:30 WARN TaskSetManager: Stage 6 contains a task of very large size (3910 KiB). The maximum recommended task size is 1000 KiB.
pandas.median
SparkDataFrame
a:int|b:double
-----+--------
2    |4.0     
9    |4.0     
3    |4.0     
7    |5.0     
4    |5.0     
1.0500444139999985
23/01/03 00:16:31 WARN TaskSetManager: Stage 9 contains a task of very large size (3910 KiB). The maximum recommended task size is 1000 KiB.
pandas.median
SparkDataFrame
a:int|b:double
-----+--------
2    |4.0     
9    |4.0     
3    |4.0     
7    |5.0     
4    |5.0     
0.9417272339999982

Ignore Case in Fugue SQL#

Normally, when writing FugueSQL, you upper case keywords by yourself

from fugue.api import fugue_sql_flow

fugue_sql_flow("""
               CREATE [[0]] SCHEMA a:int
               PRINT
               """).run();
ArrayDataFrame
a:int
-----
0    
Total count: 1

But you can turn pass fsql_ignore_case=True

fugue_sql_flow("""
               create [[0]] schema a:int
               print
               """, fsql_ignore_case=True).run();
ArrayDataFrame
a:int
-----
0    
Total count: 1

This can make the sql less readable and make you less aware of syntax ambiguity or errors, but it may be handy if you want to migrate other SQL queries into FugueSQL.

If there are many fugue_sql_flow calls, it might be easier to set fugue.sql.compile.ignore_case on the execution engine.

RPCServer settings#

If you do not have any callbacks in your workflow, don’t set this config.

For testing callbacks on local machine, don’t set this config. NativeRPCServer Will be used.

Only when you use a distributed execution engine, and you want to use callbacks, set to a server that is distributable.

FlaskRPCServer can be used with a distributed execution engine. Unless you have special needs, you just need to follow the example below.

conf = {
    "fugue.rpc.server": "fugue.rpc.flask.FlaskRPCServer",
    "fugue.rpc.flask_server.host": "0.0.0.0",
    "fugue.rpc.flask_server.port": "1234",
    "fugue.rpc.flask_server.timeout": "2 sec",
}

To use fugue.rpc.flask.FlaskRPCServer, you must set fugue.rpc.flask_server.host and fugue.rpc.flask_server.port, and it’s suggested to also set fugue.rpc.flask_server.timeout to a reasonable timeout for your own case.