Outputter
Contents
Outputter#
Outputter
represents a terminal piece of logic in a workflow. Outputter is the only Fugue extension that does not return a DataFrame. It is called Outputter because it is normally used to save data or print on the console. Outputter
is used on entire dataframes and executes on the driver. Fugue’s save
is an example of an Outputter
In this tutorial are the methods to define an Outputter
. There is no preferred method and Fugue makes it flexible for users to choose whatever interface works for them. The four ways are native approach, schema hint, decorator, and the class interface in order of simplicity.
Example Use Cases#
Pretty printers for console and Jupyter
Writing data to a database
Unit test assertions can be done by taking in a DataFrame and checking the values.
Quick Notes on Usage#
ExecutionEngine aware
Outputters
run on the driver so they are aware of theExecutionEngine
being used. Passing a parameter with theExecutionEngine
annotation will pass in the currentExecutionEngine
. There is an example of this later.
Acceptable input DataFrame types
DataFrame
,LocalDataFrame
,pd.DataFrame
,List[List[Any]]
,Iterable[List[Any]]
,EmptyAwareIterable[List[Any]]
,List[Dict[str, Any]]
,Iterable[Dict[str, Any]]
,EmptyAwareIterable[Dict[str, Any]]
Input can also be Fugue
DataFrames
, which is a collection of FugueDataFrame
.
Acceptable output DataFrame types
Outputter
can’t output anything. The annotation has to beNone
.
Further notes
ArrayDataFrame
and other local dataframes can’t be used as annotation, you must useLocalDataFrame
orDataFrame
Variations of
LocalDataFrame
will bring the entire dataset onto driver, for an Outputter this might be an expected operation, but you need to be careful.Iterable
-like input may have different exeuction plans to bring data to driver, in some cases it can be less optimial (slower), you need to be careful.
Native Approach#
The native approach is using a regular function without any edits beyond type annotations. You just need to have acceptable type annotations for the input DataFrames and the output annotation should be None.
from typing import Iterable, Dict, Any, List
import pandas as pd
from fugue import FugueWorkflow
def out(df:List[List[Any]], n=1) -> None:
for i in range(n):
print(df)
def out2(df1:pd.DataFrame, df2:List[List[Any]]) -> None:
print(df1)
print(df2)
with FugueWorkflow() as dag:
df = dag.df([[0,1],[0,2],[1,3],[1,1]],"a:int,b:int")
df.output(out, params={"n":2})
dag.output(df,df,using=out2)
[[0, 1], [0, 2], [1, 3], [1, 1]]
[[0, 1], [0, 2], [1, 3], [1, 1]]
a b
0 0 1
1 0 2
2 1 3
3 1 1
[[0, 1], [0, 2], [1, 3], [1, 1]]
It’s also important to know how to use DataFrames
as input annotation. Because this is the only way accept a dynamic number of input DataFrames.
from fugue import DataFrames
def out(dfs:DataFrames) -> None:
for k, v in dfs.items():
v.show(title=k)
with FugueWorkflow() as dag:
df1 = dag.df([[0,1]],"a:int,b:int")
df2 = dag.df([[0,2],[1,3]],"a:int,b:int")
df3 = dag.df([[1,1]],"a:int,b:int")
dag.output(df1,df2,df3,using=out)
_0
ArrayDataFrame
a:int|b:int
-----+-----
0 |1
Total count: 1
_1
ArrayDataFrame
a:int|b:int
-----+-----
0 |2
1 |3
Total count: 2
_2
ArrayDataFrame
a:int|b:int
-----+-----
1 |1
Total count: 1
Schema Hint#
The schema hint does not apply to the output of Outputter
because the output annotation has to be None and there is no DataFrame returned. A schema hint with schema: None
can be used but it does not do anything.
from fugue import outputter
# schema: None
def out(df:List[List[Any]], n=1) -> None:
for i in range(n):
print(df)
with FugueWorkflow() as dag:
dag.df([[0,1]],"a:int,b:int").output(out)
[[0, 1]]
Decorator Approach#
Similar to the schema hint, there is no obvious advantage to use decorator for Outputter
because there is no output schema so the decorator doesn’t do much besides making the code more explicit.
from fugue import outputter
@outputter()
def out(df:List[List[Any]], n=1) -> None:
for i in range(n):
print(df)
with FugueWorkflow() as dag:
dag.df([[0,1]],"a:int,b:int").output(out)
[[0, 1]]
Interface Approach (Advanced)#
All the previous methods are just wrappers of the interface approach. They cover most of the use cases and simplify the usage. But if you want to get all execution context such as partition information, use interface.
In the interface approach, type annotations are not necessary, but again, it’s good practice to have them.
from fugue import Outputter
from fugue_spark import SparkExecutionEngine
class Save(Outputter):
def process(self, dfs:DataFrames) -> None:
assert len(dfs)==1
assert isinstance(self.execution_engine, SparkExecutionEngine)
session = self.execution_engine.spark_session
# we get the partition information from Outputter
by = self.partition_spec.partition_by
df = self.execution_engine.to_df(dfs[0])
path = self.params.get_or_throw("path",str)
df.native.write.partitionBy(*by).format("parquet").mode("overwrite").save(path)
with FugueWorkflow(SparkExecutionEngine) as dag:
df = dag.df([[0,1],[0,3],[1,2],[1,1]],"a:int,b:int")
df.partition(by=["a"]).output(Save, params=dict(path="/tmp/x.parquet"))
Using the ExecutionEngine#
In some cases, the Outputter
has to be aware of the ExecutionEngine
. This is an example of how to write native Spark code inside Fugue.
from fugue import ExecutionEngine, DataFrame
# pay attention to the input annotations
def out(e:ExecutionEngine, df:DataFrame) -> None:
assert isinstance(e,SparkExecutionEngine) # this extension only works with SparkExecutionEngine
df = e.to_df(df) # to make sure df is Spark DataFrame, or conversion is done here
df.native.show()
with FugueWorkflow(SparkExecutionEngine) as dag:
df = dag.df([[0,1],[0,2],[1,3],[1,1]],"a:int,b:int")
df.output(out)
+---+---+
| a| b|
+---+---+
| 0| 1|
| 0| 2|
| 1| 3|
| 1| 1|
+---+---+