Extensions
Contents
Extensions#
The FugueWorkflow object creates a Directed Acyclic Graph (DAG) where the nodes are DataFrames that are connected by extensions. Extensions are code that creates/modifies/outputs DataFrames. The Transformer
we have been using is an example of an extension. In this section, we’ll cover the other types of extensions: Creator
, Processor
, Outputter
, and CoTransformer
. For all extensions, schema has to be defined. Below are the types of extensions.
OutputTransformer
and OutputcCotransformer
will be covered in the Deep Dive section.
We have actually already seen some built-in extensions that come with Fugue. For example, load
is a Creator
and save
is an Outputter
. There is a difference between Driver side
and Worker side
extensions. This will be covered at the end of this section. For now, we’ll just see the syntax and use case for each extension.
Creator#
A Creator
is an extension that takes no DataFrame as input but returns a DataFrame as output. It is used to generate DataFrames. Custom Creators
can be used to load data from different sources (think AWS S3 or from a Database using pyodbc). Similar to the Transformer
in the previous section, Creators
can be defined with the schema hint comment or with the @creator
decorator. pd.DataFrame
is a special output type that does not require schema. For other output type hints, the schema is unknown so it needs to be defined.
import pandas as pd
from fugue import FugueWorkflow
from typing import List, Dict, Any
# no need to define schema for pd.DataFrame
def create_data() -> pd.DataFrame:
df = pd.DataFrame({'a': [1,2,3], 'b': [2,3,4]})
return df
# schema: a:int, b:int
def create_data2() -> List[Dict[str, Any]]:
df = [{'a':1, 'b':2}, {'a':2, 'b':3}]
return df
with FugueWorkflow() as dag:
df = dag.create(create_data)
df2 = dag.create(create_data2)
df2.show()
IterableDataFrame
a:int|b:int
-----+-----
1 |2
2 |3
Total count: 2
Processor#
A Processor
is an extension that takes in one or more DataFrames and then outputs one DataFrame. Similar to the Creator
, schema does not need to be specified for pd.DataFrame because it is already known. Schema needs to be specified for other output types. The Processor
can be defined using the schema hint or the @processor
decorator with the schema passed in.
from typing import List, Dict, Any
def concat(df1:pd.DataFrame, df2:pd.DataFrame) -> pd.DataFrame:
return pd.concat([df1,df2]).reset_index(drop=True)
# schema: a:int, b:int
def fillna(df:List[Dict[str,Any]], n=0) -> List[Dict[str,Any]]:
for row in df:
if row['a'] is None:
row['a'] = n
return df
with FugueWorkflow() as dag:
df = dag.create(create_data2) # create_data2 is from earlier
df2 = dag.create(create_data2)
df3 = dag.process(df , df2, using=concat)
df3 = dag.process(df3, using=fillna, params={'n': 10})
df3.show()
IterableDataFrame
a:int|b:int
-----+-----
1 |2
2 |3
1 |2
2 |3
Total count: 4
Here we show an example of a fillna
processor, but this is a common operation so there is actually a built-in operation for it.
with FugueWorkflow() as dag:
df = dag.create(create_data2)
df = df.fillna(10, subset=["a"])
df.show()
PandasDataFrame
a:int|b:int
-----+-----
1 |2
2 |3
Total count: 2
Outputter#
Outputters
are extensions with one or more DataFrames as an input and no DataFrames at the output. We mentioned earlier that save()
was an example of an Outputter
. show()
is actually another example too. Outputters can be used to write to S3 or upload to database. The output type of Outputters must be None
. No schema is needed since it is a terminal operation. There is an @outputter
decorator, but it doesn’t do much because the return type is already None
. Outputters are also used for plotting functions.
def head(df:List[List[Any]], n=1) -> None:
for i in range(n):
print(df[i])
with FugueWorkflow() as dag:
df = dag.create(create_data2)
dag.output(df, using=head, params={'n': 2})
[1, 2]
[2, 3]
Transformer#
Transformer
is the most widely used extension. We have covered this in previous sections but more formally, a Transformer
is an extension that takes in a DataFrame and returns a DataFrame. Schema needs to be explicit. Most logic will go into Transformers
. Below is an example to create a new column.
#schema: *, c:int
def sum_cols(df: pd.DataFrame) -> pd.DataFrame:
df['c'] = df['a'] + df['b']
return df
with FugueWorkflow() as dag:
df = dag.create(create_data2).fillna(10)
df = df.transform(using=sum_cols)
df.show()
PandasDataFrame
a:int|b:int|c:int
-----+-----+-----
1 |2 |3
2 |3 |5
Total count: 2
CoTransformer#
The CoTransformer
is very similar to the Transformer
, except that it is intended to execute on multiple DataFrames that are partitioned in the same way. In order to use a CoTransformer
, the zip()
method has to be used first to join them by their common keys. There is also a @cotransformer
decorator that can be used to define the CoTransformer
, but it will still be invoked by the zip-transform
syntax.
In the example below, we will do a merge as-of operation on different groups of data. In order to align the data with events as they get distributed across the cluster, we will partition them in the same way.
data = pd.DataFrame({'group': (["A"] * 5 + ["B"] * 5),
'year': [2015,2016,2017,2018,2019] * 2})
events = pd.DataFrame({'group': ["A", "A", "B", "B"],
'year': [2014, 2016, 2014, 2018],
"value": [1, 2, 1, 2]})
events.head()
group | year | value | |
---|---|---|---|
0 | A | 2014 | 1 |
1 | A | 2016 | 2 |
2 | B | 2014 | 1 |
3 | B | 2018 | 2 |
The pandas merge_asof
function requires that the on
column is sorted. To do this, we apply a partition
strategy on Fugue by group and presort by the year. By the time it arrives in the CoTransformer
, the dataframes are sorted and grouped.
# schema: group:str,year:int,value:int
def merge_asof(data:pd.DataFrame, events:pd.DataFrame) -> pd.DataFrame:
return pd.merge_asof(data, events, on="year", by="group")
with FugueWorkflow() as dag:
data = dag.df(data)
events = dag.df(events)
data.zip(events, partition={"by": "group", "presort": "year"}).transform(merge_asof).show()
PandasDataFrame
group:str|year:int|value:int
---------+--------+---------
A |2015 |1
A |2016 |2
A |2017 |2
A |2018 |2
A |2019 |2
B |2015 |1
B |2016 |1
B |2017 |1
B |2018 |2
B |2019 |2
In this example, the important part to note is each group uses the pandas merge_asof
independently. This function is very flexible, allowing users to specify forward and backward merges along with a tolerance. This is tricky to implement well in Spark, but the CoTransformer
lets us do it easily.
This operation was partitioned by the column group
before the cotransform
was applied. This was done through the zip
command. CoTransform
is a more advanced operation that may take some experience to get used to.
Summary#
In this section we have gone over the building blocks of a FugueWorkflow
in Fugue extensions. Extensions are abstractions for the different kinds of operations done on DataFrames. Fugue has the most common extensions built-in, but it will be very common for users to make their own extensions (especially Transformers
) to work with DataFrames.
[Optional] Driver Extensions vs Worker Extensions#
For those less familiar with distributed systems, the work is spread across multiple workers, often referred to as a cluster. The driver is the machine that orchestrates the work done by the workers. For Fugue extensions, Transformer
and CoTransformer
are extensions that happen on the worker. Actions that happen on the worker-level are already agnostic to the ExecutionEngine
.
On the other hand, driver side extensions are ExecutionEngine
-aware. This means that these extensions can use code written with Spark or Dask specifically. All we need to do is pass a first argument with the ExecutionEngine
type annotation.
from fugue import ExecutionEngine, DataFrame
from fugue_spark import SparkExecutionEngine, SparkDataFrame
# pay attention to the input and output annotations, they are both general DataFrame
def create(e:ExecutionEngine, n=1) -> DataFrame:
assert isinstance(e,SparkExecutionEngine) # this extension only works with SparkExecutionEngine
sdf= e.spark_session.createDataFrame([[n]], schema="a:int") # this is how you get spark session
return SparkDataFrame(sdf) # you must wrap as Fugue SparkDataFrame to return
with FugueWorkflow(SparkExecutionEngine) as dag:
dag.create(create, params={"n":2}).show()
SparkDataFrame
a:int
-----
2
Total count: 1
In the code above, we lose cross-platform execution, but this can be used when users need to write Spark-specific code. createDataFrame
is a Spark-specific method. This approach is Fugue’s way of exposing the underlying ExecutionEngine if users want to use it. creator
, processor
, and outputter
are all ExecutionEngine-aware. For users who are not as familiar with Spark, the recommendation is to write ExecutionEngine-agnostic code. That offers the most benefit of using Fugue because of the portability it provides.
Something very tempting for beginner Fugue users is using something like sklearn.MinMaxScaler
, which normalizes a column based on the minimum and maximum values. There is different behavior if your normalizing logic happens on the driver versus on the workers. On the workers, this happens locally without access to the global dataset. The min and max obtained for scaling happen on the partition level. On the other hand, using the Spark MinMaxScaler obtains the global min and max values for scaling.
Think in terms of map
operations and aggregate
where map
is row-wise and aggregate
is column-wise. For map
operations, the behavior of Transformer
and Processor
will be equivalent in most use cases. For aggregate
operations, you can get different values depending on whether the execution is driver-side or worker-side.