Transformer
Contents
Transformer#
This is the most used extension in Fugue. The Transformer
represents the logic unit executing on partitions of the input dataframe. Because the Transformer
is concerned with the logic on a logical partition level, it is unaware of the ExecutionEngine
and is executed on the workers as opposed to the driver. Partitioning logic is also not a concern of Transformer
and should be specified in a previous step.
Fugue’s partition-transform
semantic is similar to the groupby-apply
semantic of Pandas. The main difference is that the partition-transform
semantic is scalable to distributed compute as the distribution of logical groups across workers is accounted for. For more information, read about partitioning in Fugue.
In this tutorial are the methods to define an Transformer
. There is no preferred method and Fugue makes it flexible for users to choose whatever interface works for them. The four ways are native approach, schema hint, decorator, and the class interface in order of simplicity. All of these methods are also compatible with Fugue’s transform
function.
Example Use Cases#
Shift and diff for each group in a timeseries
Training seperate ML models for each group of data
Applying different validations for each partition
Quick Notes on Usage#
ExecutionEngine unaware
Transformers
are executed on the workers, meaning that they are not unaware of theExecutionEngine
.
Acceptable input DataFrame types
LocalDataFrame
,pd.DataFrame
,List[List[Any]]
,Iterable[List[Any]]
,EmptyAwareIterable[List[Any]]
,List[Dict[str, Any]]
,Iterable[Dict[str, Any]]
,EmptyAwareIterable[Dict[str, Any]]
Acceptable output DataFrame types
LocalDataFrame
,pd.DataFrame
,List[List[Any]]
,Iterable[List[Any]]
,EmptyAwareIterable[List[Any]]
,List[Dict[str, Any]]
,Iterable[Dict[str, Any]]
,EmptyAwareIterable[Dict[str, Any]]
Further notes
Notice that
ArrayDataFrame
and other local dataframes can’t be used as annotation, you must useLocalDataFrame
.Transformer
requires more explicitness on the output schema compared toProcessor
. This is because schema inference on workers is expensive and unreliable. The schema can be specified through schema hint, decorator, or in the Fugue code.All valid transformers can be used with Fugue’s
transform
in cases where users just want to bring one function to Spark or Dask.
Native Approach#
The native approach is using a regular function without any edits beyond type annotations for both the input dataframes and output. It is converted to a Fugue extension during runtime. Since schema needs to be explicit, the schema needs to be supplied when the transformer
is used.
The example below also shows how to partition
a DataFrame before applying a transformer
on it. This will apply the transformer on each partition.
from typing import Iterable, Dict, Any, List
import pandas as pd
from fugue import FugueWorkflow
def add(df:pd.DataFrame, n=1) -> pd.DataFrame:
df["b"]+=n
return df
def get_top(df:Iterable[Dict[str,Any]]) -> Iterable[Dict[str,Any]]:
yield next(df)
return
with FugueWorkflow() as dag:
df = dag.df([[0,1],[0,2],[1,3],[1,1]],"a:int,b:int")
# with out schema hint you have to specify schema in Fugue code
df = df.transform(add, schema="*").transform(add, schema="*", params=dict(n=2))
# get smallest b of each partition
df.partition(by=["a"], presort="b").transform(get_top, schema="*").show()
# get largest b of each partition
df.partition(by=["a"], presort="b DESC").transform(get_top, schema="*").show()
PandasDataFrame
a:int|b:int
-----+-----
0 |4
1 |4
Total count: 2
PandasDataFrame
a:int|b:int
-----+-----
0 |5
1 |6
Total count: 2
Schema Hint#
The schema can also be provided during the function definition through the use of the schema hint comment. Providing it during definition means it does not need to be provided inside the FugueWorkflow
.
# schema: *
def add(df:pd.DataFrame, n=1) -> pd.DataFrame:
df["b"]+=n
return df
# schema: *
def get_top(df:Iterable[Dict[str,Any]]) -> Iterable[Dict[str,Any]]:
yield next(df)
return
with FugueWorkflow() as dag:
df = dag.df([[0,1],[0,2],[1,3],[1,1]],"a:int,b:int")
# syntax for setting parameters
df = df.transform(add).transform(add, params=dict(n=2))
df.partition(by=["a"], presort="b").transform(get_top).show()
PandasDataFrame
a:int|b:int
-----+-----
0 |4
1 |4
Total count: 2
Schema Hint Syntax#
There is a special syntax for schema only available to Transformers
Please read this for detailed syntax, here we only show some examples.
# schema: *,c:int
def with_c(df:pd.DataFrame) -> pd.DataFrame:
df["c"]=1
return df
# schema: *-b
def drop_b(df:pd.DataFrame) -> pd.DataFrame:
return df.drop("b", axis=1)
# schema: *~b,c
def drop_b_c_if_exists(df:pd.DataFrame) -> pd.DataFrame:
return df.drop(["b","c"], axis=1, errors='ignore')
with FugueWorkflow() as dag:
df = dag.df([[0,1],[0,2]],"a:int,b:int")
df = df.transform(with_c)
df.show()
df = df.transform(drop_b)
df.show()
df = df.transform(drop_b_c_if_exists)
df.show()
PandasDataFrame
a:int|b:int|c:int
-----+-----+-----
0 |1 |1
0 |2 |1
Total count: 2
PandasDataFrame
a:int|c:int
-----+-----
0 |1
0 |1
Total count: 2
PandasDataFrame
a:int
-----
0
0
Total count: 2
Decorator Approach#
The decorator approach also has the special schema syntax and it can also take a function that generates the schema. This can be used to create new column names or types based on transformer parameters.
from fugue import transformer
# df is the zipped DataFrames, **kwargs is the parameters passed in from Fugue
# the syntax below is equivalent to @transformer("*,c:int")
@transformer(lambda df, **kwargs: df.schema+"c:int")
def with_c(df:pd.DataFrame) -> pd.DataFrame:
df["c"]=1
return df
with FugueWorkflow() as dag:
df = dag.df([[0,1],[0,2],[1,3],[1,1]],"a:int,b:int")
df = df.transform(with_c)
df.show()
PandasDataFrame
a:int|b:int|c:int
-----+-----+-----
0 |1 |1
0 |2 |1
1 |3 |1
1 |1 |1
Total count: 4
Interface Approach (Advanced)#
All the previous methods are just wrappers of the interface approach. They cover most of the use cases and simplify the usage. But for certain cases, implementing the interface approach significantly improves performance. Example scenarios to use the interface approach are:
The output schema needs partition information, such as partition keys, schema, and current values of the keys.
The transformer has an expensive but common initialization step for processing each logical partition. Initialization should then happen when initialiazing physical partition, meaning it doesn’t unnecessarily repeat.
The biggest advantage of interface approach is that you can customize physical partition level initialization, and you have all the up-to-date context variables to use. In the interface approach, type annotations are not necessary, but again, it’s good practice to have them.
From here onwards, we will we using a create_helper
function that creates a random pandas DataFrame for us.
import numpy as np
def create_helper(ct=20) -> pd.DataFrame:
np.random.seed(0)
return pd.DataFrame(np.random.randint(0,10,size=(ct, 3)), columns=list('abc'))
The following examples focuses on performance comparisons. To see how to use context variables, see the CoTransfromer example. In the example below, pay attention to the get_output_schema
method and the on_init
method. The on_init
calls the expensive_init
function which just sleeps for the given amount of time. This represents an operation with significant overhead.
from fugue import Transformer, PandasDataFrame, DataFrame, LocalDataFrame
from time import sleep
def expensive_init(sec=5):
sleep(sec)
class Median(Transformer):
# this is invoked on driver side
def get_output_schema(self, df):
return df.schema + (self.params.get_or_throw("col", str),float)
# on initialization of the physical partition
def on_init(self, df: DataFrame) -> None:
self.col = self.params.get_or_throw("col", str)
expensive_init(self.params.get("sec",0))
def transform(self, df):
pdf = df.as_pandas()
pdf[self.col]=pdf["b"].median()
return PandasDataFrame(pdf)
with FugueWorkflow() as dag:
df = dag.create(create_helper)
df.partition(by=["a"]).transform(Median, params={"col":"med", "sec": 1}).show(rows=5)
PandasDataFrame
a:long|b:long|c:long|med:double
------+------+------+----------
0 |4 |7 |4.0
1 |3 |3 |4.0
1 |9 |9 |4.0
1 |4 |9 |4.0
2 |3 |8 |1.5
As a side note, this example shows parameters can be retrieved using self.params.get
or self.params.get_or_throw
. self.params
is a dictionary so the get
method is just the same as accessing a dictionary. self.params.get_or_throw
throws an error if the param does not match the given type.
In order to show the benefit of on_init
we also create another version of the Median
transformer using the schema hint. This also calls expensive_init
in that function for each logical partition. Also, in the run function, we set num=2
to show the effect when using 2 workers. So for Median
transformer that used the interface, the expensive_init
will be called at most twice, but for version which used the schema hint, it will be called for more times.
The numbers may be off if you run this on binder, but focus on the difference in magnitude.
from fugue_spark import SparkExecutionEngine
from timeit import timeit
# schema: *, m:double
def median(df:pd.DataFrame, sec=0) -> pd.DataFrame:
expensive_init(sec)
df["m"]=df["b"].median()
return df
def run(engine, interfaceless, sec):
with FugueWorkflow(engine) as dag:
df = dag.create(create_helper)
if interfaceless:
df.partition(by=["a"], num=2).transform(median, params={"sec": sec}).show(rows=5)
else:
df.partition(by=["a"], num=2).transform(Median, params={"col":"m", "sec": sec}).show(rows=5)
engine = SparkExecutionEngine()
print(f"Interfaceless Execution time: {timeit(lambda: run(engine, True, 1), number=1)}")
print(f"Interface Execution time: {timeit(lambda: run(engine, False, 1), number=1)}")
SparkDataFrame
a:long|b:long|c:long|m:double
------+------+------+--------
2 |3 |8 |1.5
2 |0 |0 |1.5
4 |7 |6 |5.0
4 |3 |0 |5.0
4 |5 |5 |5.0
Interfaceless Execution time: 10.233126380999238
SparkDataFrame
a:long|b:long|c:long|m:double
------+------+------+--------
2 |3 |8 |1.5
2 |0 |0 |1.5
4 |7 |6 |5.0
4 |3 |0 |5.0
4 |5 |5 |5.0
Interface Execution time: 1.8927552820005076
Using the on_init
method tremendously sped up the operation because the expensive_init
was not unnecessarily repeated.
Fugue transform#
All of these transformers above can be used with the Fugue transform
function. The transform
function takes in a function or transformer and applies it to the dataframe immediately. As with other Transformers, schema needs to be explicit to it either needs to be supplied during the transformer definition, or during runtime with the schema
argument.
The transform
function is useful for paralellizing one function over Spark or Dask.
# schema not supplied, so it is passed later
def median1(df:pd.DataFrame, sec=0) -> pd.DataFrame:
df["m"]=df["b"].median()
return df
# schema: *, m:double
def median2(df:pd.DataFrame, sec=0) -> pd.DataFrame:
df["m"]=df["b"].median()
return df
These two transformers can then be used. median1
was written with the Native Approach
, so schema will be required for the transform
call below. median2
on the other hand, uses the schema hint to provide the schema, so it does not need to be provided during runtime.
In both cases below, we pass in the SparkExecutionEngine
, which converts the initial df
into a Spark DataFrame and executes the median
functions in a distributed way. This also returns a Spark DataFrame because compute was run on SparkExecutionEngine
. It can be converted back to pandas
using the toPandas()
method of Spark DataFrames, but this method is only performant for smaller data.
from fugue import transform
# sample pandas DataFrame
df = create_helper()
df1 = transform(df,
median,
schema="*, m:double",
engine=SparkExecutionEngine,
partition=dict(by="a")
)
# schema is known for median2
df2 = transform(df,
median2,
engine=SparkExecutionEngine,
partition=dict(by="a")
)
df1.show(2)
# convert back to pandas
df2.toPandas().head(2)
+---+---+---+---+
| a| b| c| m|
+---+---+---+---+
| 0| 4| 7|4.0|
| 6| 7| 7|7.5|
+---+---+---+---+
only showing top 2 rows
a | b | c | m | |
---|---|---|---|---|
0 | 0 | 4 | 7 | 4.0 |
1 | 6 | 7 | 7 | 7.5 |