Extensions#

The FugueWorkflow object creates a Directed Acyclic Graph (DAG) where the nodes are DataFrames that are connected by extensions. Extensions are code that creates/modifies/outputs DataFrames. The Transformer we have been using is an example of an extension. In this section, we’ll cover the other types of extensions: Creator, Processor, Outputter, and CoTransformer. For all extensions, schema has to be defined. Below are the types of extensions.

extensions

OutputTransformer and OutputcCotransformer will be covered in the Deep Dive section.

We have actually already seen some built-in extensions that come with Fugue. For example, load is a Creator and save is an Outputter. There is a difference between Driver side and Worker side extensions. This will be covered at the end of this section. For now, we’ll just see the syntax and use case for each extension.

Creator#

A Creator is an extension that takes no DataFrame as input but returns a DataFrame as output. It is used to generate DataFrames. Custom Creators can be used to load data from different sources (think AWS S3 or from a Database using pyodbc). Similar to the Transformer in the previous section, Creators can be defined with the schema hint comment or with the @creator decorator. pd.DataFrame is a special output type that does not require schema. For other output type hints, the schema is unknown so it needs to be defined.

import pandas as pd
from fugue import FugueWorkflow
from typing import List, Dict, Any

# no need to define schema for pd.DataFrame
def create_data() -> pd.DataFrame:
    df = pd.DataFrame({'a': [1,2,3], 'b': [2,3,4]})
    return df

# schema: a:int, b:int
def create_data2() -> List[Dict[str, Any]]:
    df = [{'a':1, 'b':2}, {'a':2, 'b':3}]
    return df

with FugueWorkflow() as dag:
     df = dag.create(create_data)
     df2 = dag.create(create_data2)
     df2.show()
IterableDataFrame
a:int|b:int
-----+-----
1    |2    
2    |3    
Total count: 2

Processor#

A Processor is an extension that takes in one or more DataFrames and then outputs one DataFrame. Similar to the Creator, schema does not need to be specified for pd.DataFrame because it is already known. Schema needs to be specified for other output types. The Processor can be defined using the schema hint or the @processor decorator with the schema passed in.

from typing import List, Dict, Any

def concat(df1:pd.DataFrame, df2:pd.DataFrame) -> pd.DataFrame:
    return pd.concat([df1,df2]).reset_index(drop=True)

# schema: a:int, b:int
def fillna(df:List[Dict[str,Any]], n=0) -> List[Dict[str,Any]]:
    for row in df:
        if row['a'] is None:
            row['a'] = n
    return df

with FugueWorkflow() as dag:
     df = dag.create(create_data2)    # create_data2 is from earlier
     df2 = dag.create(create_data2)
     df3 = dag.process(df , df2, using=concat)
     df3 = dag.process(df3, using=fillna, params={'n': 10})
     df3.show()
IterableDataFrame
a:int|b:int
-----+-----
1    |2    
2    |3    
1    |2    
2    |3    
Total count: 4

Here we show an example of a fillna processor, but this is a common operation so there is actually a built-in operation for it.

with FugueWorkflow() as dag:
     df = dag.create(create_data2)
     df = df.fillna(10, subset=["a"])
     df.show()
PandasDataFrame
a:int|b:int
-----+-----
1    |2    
2    |3    
Total count: 2

Outputter#

Outputters are extensions with one or more DataFrames as an input and no DataFrames at the output. We mentioned earlier that save() was an example of an Outputter. show() is actually another example too. Outputters can be used to write to S3 or upload to database. The output type of Outputters must be None. No schema is needed since it is a terminal operation. There is an @outputter decorator, but it doesn’t do much because the return type is already None. Outputters are also used for plotting functions.

def head(df:List[List[Any]], n=1) -> None:
    for i in range(n):
        print(df[i])

with FugueWorkflow() as dag:
    df = dag.create(create_data2)
    dag.output(df, using=head, params={'n': 2})
[1, 2]
[2, 3]

Transformer#

Transformer is the most widely used extension. We have covered this in previous sections but more formally, a Transformer is an extension that takes in a DataFrame and returns a DataFrame. Schema needs to be explicit. Most logic will go into Transformers. Below is an example to create a new column.

#schema: *, c:int
def sum_cols(df: pd.DataFrame) -> pd.DataFrame:
    df['c'] = df['a'] + df['b']
    return df

with FugueWorkflow() as dag:
    df = dag.create(create_data2).fillna(10)
    df = df.transform(using=sum_cols)
    df.show()
PandasDataFrame
a:int|b:int|c:int
-----+-----+-----
1    |2    |3    
2    |3    |5    
Total count: 2

CoTransformer#

The CoTransformer is very similar to the Transformer, except that it is intended to execute on multiple DataFrames that are partitioned in the same way. In order to use a CoTransformer, the zip() method has to be used first to join them by their common keys. There is also a @cotransformer decorator that can be used to define the CoTransformer, but it will still be invoked by the zip-transform syntax.

In the example below, we will do a merge as-of operation on different groups of data. In order to align the data with events as they get distributed across the cluster, we will partition them in the same way.

data = pd.DataFrame({'group': (["A"] * 5 + ["B"] * 5),
                     'year': [2015,2016,2017,2018,2019] * 2})

events = pd.DataFrame({'group': ["A", "A", "B", "B"],
                       'year': [2014, 2016, 2014, 2018],
                       "value": [1, 2, 1, 2]})

events.head()
group year value
0 A 2014 1
1 A 2016 2
2 B 2014 1
3 B 2018 2

The pandas merge_asof function requires that the on column is sorted. To do this, we apply a partition strategy on Fugue by group and presort by the year. By the time it arrives in the CoTransformer, the dataframes are sorted and grouped.

# schema: group:str,year:int,value:int
def merge_asof(data:pd.DataFrame, events:pd.DataFrame) -> pd.DataFrame:
    return pd.merge_asof(data, events, on="year", by="group")

with FugueWorkflow() as dag:
    data = dag.df(data)
    events = dag.df(events)

    data.zip(events, partition={"by": "group", "presort": "year"}).transform(merge_asof).show()
PandasDataFrame
group:str|year:int|value:int
---------+--------+---------
A        |2015    |1        
A        |2016    |2        
A        |2017    |2        
A        |2018    |2        
A        |2019    |2        
B        |2015    |1        
B        |2016    |1        
B        |2017    |1        
B        |2018    |2        
B        |2019    |2        

In this example, the important part to note is each group uses the pandas merge_asof independently. This function is very flexible, allowing users to specify forward and backward merges along with a tolerance. This is tricky to implement well in Spark, but the CoTransformer lets us do it easily.

This operation was partitioned by the column group before the cotransform was applied. This was done through the zip command. CoTransform is a more advanced operation that may take some experience to get used to.

Summary#

In this section we have gone over the building blocks of a FugueWorkflow in Fugue extensions. Extensions are abstractions for the different kinds of operations done on DataFrames. Fugue has the most common extensions built-in, but it will be very common for users to make their own extensions (especially Transformers) to work with DataFrames.

[Optional] Driver Extensions vs Worker Extensions#

For those less familiar with distributed systems, the work is spread across multiple workers, often referred to as a cluster. The driver is the machine that orchestrates the work done by the workers. For Fugue extensions, Transformer and CoTransformer are extensions that happen on the worker. Actions that happen on the worker-level are already agnostic to the ExecutionEngine.

On the other hand, driver side extensions are ExecutionEngine-aware. This means that these extensions can use code written with Spark or Dask specifically. All we need to do is pass a first argument with the ExecutionEngine type annotation.

from fugue import ExecutionEngine, DataFrame
from fugue_spark import SparkExecutionEngine, SparkDataFrame

# pay attention to the input and output annotations, they are both general DataFrame
def create(e:ExecutionEngine, n=1) -> DataFrame:
    assert isinstance(e,SparkExecutionEngine) # this extension only works with SparkExecutionEngine
    sdf= e.spark_session.createDataFrame([[n]], schema="a:int")  # this is how you get spark session
    return SparkDataFrame(sdf) # you must wrap as Fugue SparkDataFrame to return

with FugueWorkflow(SparkExecutionEngine) as dag:
    dag.create(create, params={"n":2}).show()
SparkDataFrame
a:int
-----
2    
Total count: 1

In the code above, we lose cross-platform execution, but this can be used when users need to write Spark-specific code. createDataFrame is a Spark-specific method. This approach is Fugue’s way of exposing the underlying ExecutionEngine if users want to use it. creator, processor, and outputter are all ExecutionEngine-aware. For users who are not as familiar with Spark, the recommendation is to write ExecutionEngine-agnostic code. That offers the most benefit of using Fugue because of the portability it provides.

Something very tempting for beginner Fugue users is using something like sklearn.MinMaxScaler, which normalizes a column based on the minimum and maximum values. There is different behavior if your normalizing logic happens on the driver versus on the workers. On the workers, this happens locally without access to the global dataset. The min and max obtained for scaling happen on the partition level. On the other hand, using the Spark MinMaxScaler obtains the global min and max values for scaling.

Think in terms of map operations and aggregate where map is row-wise and aggregate is column-wise. For map operations, the behavior of Transformer and Processor will be equivalent in most use cases. For aggregate operations, you can get different values depending on whether the execution is driver-side or worker-side.