Output Transformer (Advanced)
Contents
Output Transformer (Advanced)#
OutputTransfomer
is in general similar to Transformer
. And any Transformer
can be used as OutputTransformer
. It is important to understand the difference between the operations transform
and out_transform
.
transform
is lazy, Fugue does not ensure the compute immediately. For example, if usingSparkExecutionEngine
, the real compute oftransform
happens only when hitting an action, for examplesave
.out_transform
is an action, Fugue ensures the compute happening immediately, regardless of what execution engine is used.transform
outputs a transformed dataframe for the following steps to useout_transform
is the last compute of a branch in the DAG, it outputs nothing.
You may find that transform().persist()
can be an alternative to out_transform
, it’s in general ok, but you must notice that, the output dataframe of a transformation can be very large, if you persist or checkpoint it, it can take up great portion of memory or disk space. In contrast, out_transform
does not take any space. Plus, it is a more explicit way to show what you want to do.
A typical use case of output_transform is to save the dataframe in a custom way, for example, pushing to redis.
In this tutorial are the methods to define an OutputTransformer
. There is no preferred method and Fugue makes it flexible for users to choose whatever interface works for them. The three ways are native approach, decorator, and the class interface in order of simplicity. Note schema hints do not work.
Native Approach#
An OutputTransformer
normally returns nothing, so the default schema is None
. Because of this, it will work if no schema is specified. The OutputTransformer
is not meant to mutate schema so it will not respect any schema hint.
from typing import Iterable, Dict, Any, List
from fugue import FugueWorkflow
def push_to_redis(df:Iterable[Dict[str,Any]]) -> Iterable[Dict[str,Any]]:
for row in df:
print("pushing1", row)
return df
with FugueWorkflow() as dag:
df = dag.df([[0,1],[0,2],[1,3],[1,1]],"a:int,b:int")
df.out_transform(push_to_redis)
pushing1 {'a': 0, 'b': 1}
pushing1 {'a': 0, 'b': 2}
pushing1 {'a': 1, 'b': 3}
pushing1 {'a': 1, 'b': 1}
Decorator Approach#
There is no obvious advantage to use decorator for OutputTransformer
from fugue.extensions import output_transformer
@output_transformer()
def push_to_redis(df:Iterable[Dict[str,Any]]) -> None:
for row in df:
print("pushing2", row)
continue
with FugueWorkflow() as dag:
df = dag.df([[0,1],[0,2],[1,3],[1,1]],"a:int,b:int")
df.partition(by=["a"], presort="b").out_transform(push_to_redis)
pushing2 {'a': 0, 'b': 1}
pushing2 {'a': 0, 'b': 2}
pushing2 {'a': 1, 'b': 1}
pushing2 {'a': 1, 'b': 3}
Interface Approach (Advanced)#
Just like the interface approach of Transformer
, you get all the flexibilities and control over your transformation
from fugue.extensions import OutputTransformer
class Push(OutputTransformer):
# Notice OutputTransformer has different interface than Transformer
def process(self, df):
print("pushing2", self.cursor.key_value_dict)
with FugueWorkflow() as dag:
df = dag.df([[0,1],[0,2],[1,3],[1,1]],"a:int,b:int")
df.partition(by=["a"], presort="b").out_transform(Push)
pushing2 {'a': 0}
pushing2 {'a': 1}