Outputter
Contents
Outputter#
Outputter represents a terminal piece of logic in a workflow. Outputter is the only Fugue extension that does not return a DataFrame. It is called Outputter because it is normally used to save data or print on the console. Outputter is used on entire dataframes and executes on the driver. Fugue’s save is an example of an Outputter
In this tutorial are the methods to define an Outputter. There is no preferred method and Fugue makes it flexible for users to choose whatever interface works for them. The four ways are native approach, schema hint, decorator, and the class interface in order of simplicity.
Example Use Cases#
Pretty printers for console and Jupyter
Writing data to a database
Unit test assertions can be done by taking in a DataFrame and checking the values.
Quick Notes on Usage#
ExecutionEngine aware
Outputtersrun on the driver so they are aware of theExecutionEnginebeing used. Passing a parameter with theExecutionEngineannotation will pass in the currentExecutionEngine. There is an example of this later.
Acceptable input DataFrame types
DataFrame,LocalDataFrame,pd.DataFrame,List[List[Any]],Iterable[List[Any]],EmptyAwareIterable[List[Any]],List[Dict[str, Any]],Iterable[Dict[str, Any]],EmptyAwareIterable[Dict[str, Any]]Input can also be Fugue
DataFrames, which is a collection of FugueDataFrame.
Acceptable output DataFrame types
Outputtercan’t output anything. The annotation has to beNone.
Further notes
ArrayDataFrameand other local dataframes can’t be used as annotation, you must useLocalDataFrameorDataFrameVariations of
LocalDataFramewill bring the entire dataset onto driver, for an Outputter this might be an expected operation, but you need to be careful.Iterable-like input may have different execution plans to bring data to driver, in some cases it can be less optimal (slower), you need to be careful.
Native Approach#
The native approach is using a regular function without any edits beyond type annotations. You just need to have acceptable type annotations for the input DataFrames and the output annotation should be None.
from typing import Iterable, Dict, Any, List
import pandas as pd
from fugue import FugueWorkflow
def out(df:List[List[Any]], n=1) -> None:
for i in range(n):
print(df)
def out2(df1:pd.DataFrame, df2:List[List[Any]]) -> None:
print(df1)
print(df2)
with FugueWorkflow() as dag:
df = dag.df([[0,1],[0,2],[1,3],[1,1]],"a:int,b:int")
df.output(out, params={"n":2})
dag.output(df,df,using=out2)
[[0, 1], [0, 2], [1, 3], [1, 1]]
[[0, 1], [0, 2], [1, 3], [1, 1]]
a b
0 0 1
1 0 2
2 1 3
3 1 1
[[0, 1], [0, 2], [1, 3], [1, 1]]
It’s also important to know how to use DataFrames as input annotation. Because this is the only way accept a dynamic number of input DataFrames.
from fugue import DataFrames
def out(dfs:DataFrames) -> None:
for k, v in dfs.items():
v.show(title=k)
with FugueWorkflow() as dag:
df1 = dag.df([[0,1]],"a:int,b:int")
df2 = dag.df([[0,2],[1,3]],"a:int,b:int")
df3 = dag.df([[1,1]],"a:int,b:int")
dag.output(df1,df2,df3,using=out)
_0
ArrayDataFrame
a:int|b:int
-----+-----
0 |1
Total count: 1
_1
ArrayDataFrame
a:int|b:int
-----+-----
0 |2
1 |3
Total count: 2
_2
ArrayDataFrame
a:int|b:int
-----+-----
1 |1
Total count: 1
Schema Hint#
The schema hint does not apply to the output of Outputter because the output annotation has to be None and there is no DataFrame returned. A schema hint with schema: None can be used but it does not do anything.
from fugue import outputter
# schema: None
def out(df:List[List[Any]], n=1) -> None:
for i in range(n):
print(df)
with FugueWorkflow() as dag:
dag.df([[0,1]],"a:int,b:int").output(out)
[[0, 1]]
Decorator Approach#
Similar to the schema hint, there is no obvious advantage to use decorator for Outputter because there is no output schema so the decorator doesn’t do much besides making the code more explicit.
from fugue import outputter
@outputter()
def out(df:List[List[Any]], n=1) -> None:
for i in range(n):
print(df)
with FugueWorkflow() as dag:
dag.df([[0,1]],"a:int,b:int").output(out)
[[0, 1]]
Interface Approach (Advanced)#
All the previous methods are just wrappers of the interface approach. They cover most of the use cases and simplify the usage. But if you want to get all execution context such as partition information, use interface.
In the interface approach, type annotations are not necessary, but again, it’s good practice to have them.
from fugue import Outputter
from fugue_spark import SparkExecutionEngine
class Save(Outputter):
def process(self, dfs:DataFrames) -> None:
assert len(dfs)==1
assert isinstance(self.execution_engine, SparkExecutionEngine)
session = self.execution_engine.spark_session
# we get the partition information from Outputter
by = self.partition_spec.partition_by
df = self.execution_engine.to_df(dfs[0])
path = self.params.get_or_throw("path",str)
df.native.write.partitionBy(*by).format("parquet").mode("overwrite").save(path)
with FugueWorkflow(SparkExecutionEngine) as dag:
df = dag.df([[0,1],[0,3],[1,2],[1,1]],"a:int,b:int")
df.partition(by=["a"]).output(Save, params=dict(path="/tmp/x.parquet"))
Using the ExecutionEngine#
In some cases, the Outputter has to be aware of the ExecutionEngine. This is an example of how to write native Spark code inside Fugue.
from fugue import ExecutionEngine, DataFrame
# pay attention to the input annotations
def out(e:ExecutionEngine, df:DataFrame) -> None:
assert isinstance(e,SparkExecutionEngine) # this extension only works with SparkExecutionEngine
df = e.to_df(df) # to make sure df is Spark DataFrame, or conversion is done here
df.native.show()
with FugueWorkflow(SparkExecutionEngine) as dag:
df = dag.df([[0,1],[0,2],[1,3],[1,1]],"a:int,b:int")
df.output(out)
+---+---+
| a| b|
+---+---+
| 0| 1|
| 0| 2|
| 1| 3|
| 1| 1|
+---+---+