Fugue Configurations
Contents
Fugue Configurations#
Have questions? Chat with us on Github or Slack:
Config |
Default |
Description |
---|---|---|
fugue.spark.use_pandas_udf |
|
Automatically use pandas udf for |
fugue.sql.compile.ignore_case |
|
When this is |
fugue.rpc.server |
Full path to a sublcass of RPCServer |
Use Pandas UDF on SparkExecutionEngine#
Notice: you may not see the expected performance on binder, it’s recommended to run this tutorial on docker on a multiple core machine to get decent performance
If you don’t know pandas UDF, read this. With PyArrow and pandas, Spark is able to accelerate certain operations.
In Spark 3.0 it also starts to support some type annotations. But Fugue is more flexibile on type annotations. Besides pd.DataFrame
you can also use other annotations including List
and Iterable
, etc.
For certain cases, no matter what input type you specify, we can see great performance gain. But to maximize the gain, it’s suggested to use pd.DataFrame
as the input and output to remove conversion overhead. By doing this, it may hurt the performance on other execution engines, or on Spark without pandas_udf support. So you need to understand the pros and cons. The best way is to experiment and decide.
In Fugue, only when all of the following are satisfied, it uses pandas_udf
, otherwise, it will fall back to the common way.
config fugue.spark.use_pandas_udf is set to True (default)
partition_spec
has to have non empty partition keysoutput schema can’t have nested types
Spark config spark.sql.execution.arrow.pyspark.enabled is set to
"true"
Plus, for pyspark < 3 this environment variable must be set on driver and all executors:
ARROW_PRE_0_15_IPC_FORMAT=1
otherwise errors will be thrown.
from pyspark.sql import SparkSession
spark = SparkSession.builder\
.config("spark.sql.execution.arrow.pyspark.enabled", "true")\
.getOrCreate()
import pandas as pd
import numpy as np
from timeit import timeit
from typing import Iterable, List, Any
import fugue.api as fa
def helper(ct=2000000) -> pd.DataFrame:
np.random.seed(0)
return pd.DataFrame(np.random.randint(0,10,size=(ct, 2)), columns=list('ab'))
# schema: a:int,b:double
def median(df:pd.DataFrame) -> List[List[Any]]:
b = df["b"].median()
return [[df.loc[0,"a"], b]]
def run(engine, conf=None):
with fa.engine_context(engine):
res = fa.transform(helper(),
median,
partition={"by": "a"},
engine_conf=conf
)
fa.show(res, 5, title="pandas.median");
print(timeit(lambda: run(spark), number=1))
conf = {"fugue.spark.use_pandas_udf":True}
print(timeit(lambda: run(spark, conf=conf), number=1))
23/01/03 00:16:30 WARN TaskSetManager: Stage 6 contains a task of very large size (3910 KiB). The maximum recommended task size is 1000 KiB.
pandas.median
SparkDataFrame
a:int|b:double
-----+--------
2 |4.0
9 |4.0
3 |4.0
7 |5.0
4 |5.0
1.0500444139999985
23/01/03 00:16:31 WARN TaskSetManager: Stage 9 contains a task of very large size (3910 KiB). The maximum recommended task size is 1000 KiB.
pandas.median
SparkDataFrame
a:int|b:double
-----+--------
2 |4.0
9 |4.0
3 |4.0
7 |5.0
4 |5.0
0.9417272339999982
Ignore Case in Fugue SQL#
Normally, when writing FugueSQL, you upper case keywords by yourself
from fugue.api import fugue_sql_flow
fugue_sql_flow("""
CREATE [[0]] SCHEMA a:int
PRINT
""").run();
ArrayDataFrame
a:int
-----
0
Total count: 1
But you can turn pass fsql_ignore_case=True
fugue_sql_flow("""
create [[0]] schema a:int
print
""", fsql_ignore_case=True).run();
ArrayDataFrame
a:int
-----
0
Total count: 1
This can make the sql less readable and make you less aware of syntax abiguity or errors, but it may be handy if you want to migrate other SQL queries into FugueSQL.
If there are many fugue_sql_flow
calls, it might be easier to set fugue.sql.compile.ignore_case
on the execution engine.
RPCServer settings#
If you do not have any callbacks in your workflow, don’t set this config.
For testing callbacks on local machine, don’t set this config. NativeRPCServer Will be used.
Only when you use a distributed execution engine, and you want to use callbacks, set to a server that is distributable.
FlaskRPCServer can be used with a distributed execution engine. Unless you have special needs, you just need to follow the example below.
conf = {
"fugue.rpc.server": "fugue.rpc.flask.FlaskRPCServer",
"fugue.rpc.flask_server.host": "0.0.0.0",
"fugue.rpc.flask_server.port": "1234",
"fugue.rpc.flask_server.timeout": "2 sec",
}
To use fugue.rpc.flask.FlaskRPCServer
, you must set fugue.rpc.flask_server.host
and fugue.rpc.flask_server.port
, and it’s suggested to also set fugue.rpc.flask_server.timeout
to a reasonable timeout for your own case.