Output Transformer (Advanced)
Contents
Output Transformer (Advanced)#
OutputTransfomer is in general similar to Transformer. And any Transformer can be used as OutputTransformer. It is important to understand the difference between the operations transform and out_transform.
transformis lazy, Fugue does not ensure the compute immediately. For example, if usingSparkExecutionEngine, the real compute oftransformhappens only when hitting an action, for examplesave.out_transformis an action, Fugue ensures the compute happening immediately, regardless of what execution engine is used.transformoutputs a transformed dataframe for the following steps to useout_transformis the last compute of a branch in the DAG, it outputs nothing.
You may find that transform().persist() can be an alternative to out_transform, it’s in general ok, but you must notice that, the output dataframe of a transformation can be very large, if you persist or checkpoint it, it can take up great portion of memory or disk space. In contrast, out_transform does not take any space. Plus, it is a more explicit way to show what you want to do.
A typical use case of output_transform is to save the dataframe in a custom way, for example, pushing to redis.
In this tutorial are the methods to define an OutputTransformer. There is no preferred method and Fugue makes it flexible for users to choose whatever interface works for them. The three ways are native approach, decorator, and the class interface in order of simplicity. Note schema hints do not work.
Native Approach#
An OutputTransformer normally returns nothing, so the default schema is None. Because of this, it will work if no schema is specified. The OutputTransformer is not meant to mutate schema so it will not respect any schema hint.
from typing import Iterable, Dict, Any, List
from fugue import FugueWorkflow
def push_to_redis(df:Iterable[Dict[str,Any]]) -> Iterable[Dict[str,Any]]:
for row in df:
print("pushing1", row)
return df
with FugueWorkflow() as dag:
df = dag.df([[0,1],[0,2],[1,3],[1,1]],"a:int,b:int")
df.out_transform(push_to_redis)
pushing1 {'a': 0, 'b': 1}
pushing1 {'a': 0, 'b': 2}
pushing1 {'a': 1, 'b': 3}
pushing1 {'a': 1, 'b': 1}
Decorator Approach#
There is no obvious advantage to use decorator for OutputTransformer
from fugue.extensions import output_transformer
@output_transformer()
def push_to_redis(df:Iterable[Dict[str,Any]]) -> None:
for row in df:
print("pushing2", row)
continue
with FugueWorkflow() as dag:
df = dag.df([[0,1],[0,2],[1,3],[1,1]],"a:int,b:int")
df.partition(by=["a"], presort="b").out_transform(push_to_redis)
pushing2 {'a': 0, 'b': 1}
pushing2 {'a': 0, 'b': 2}
pushing2 {'a': 1, 'b': 1}
pushing2 {'a': 1, 'b': 3}
Interface Approach (Advanced)#
Just like the interface approach of Transformer, you get all the flexibilities and control over your transformation
from fugue.extensions import OutputTransformer
class Push(OutputTransformer):
# Notice OutputTransformer has different interface than Transformer
def process(self, df):
print("pushing2", self.cursor.key_value_dict)
with FugueWorkflow() as dag:
df = dag.df([[0,1],[0,2],[1,3],[1,1]],"a:int,b:int")
df.partition(by=["a"], presort="b").out_transform(Push)
pushing2 {'a': 0}
pushing2 {'a': 1}