Creator
Contents
Creator#
Creator
represents the logic unit to generate a DataFrame. It is used at the start of workflows. The built-in load
of Fugue is an example of a Creator.
In this tutorial are the methods to define a Creator
. There is no preferred method and Fugue makes it flexible for users to choose whatever interface works for them. The four ways are native approach, schema hint, decorator, and the class interface in order of simplicity.
Example Use Cases#
Reading special data sources like constructing a DataFrame using an API.
Querying a database using
pyodbc
and returning a DataFrameCreate mock data for unit tests.
Quick Notes on Usage#
ExecutionEngine aware
Creators run on the driver so they are aware of the
ExecutionEngine
being used. Passing a parameter with theExecutionEngine
annotation will pass in the currentExecutionEngine
. There is an example of this later.
Acceptable input DataFrame types
Creator
can’t take DataFrames in, but can take other parameters.
Acceptable output DataFrame types
DataFrame
,LocalDataFrame
,pd.DataFrame
,List[List[Any]]
,Iterable[List[Any]]
,EmptyAwareIterable[List[Any]]
,List[Dict[str, Any]]
,Iterable[Dict[str, Any]]
,EmptyAwareIterable[Dict[str, Any]]
Further notes
If the output type is NOT one of Fugue
DataFrame
,LocalDataFrame
orpd.DataFrame
, the output schema must be specified because it can’t be inferred.
Native Approach#
The native approach is using a regular function without any edits beyond type annotations. It is converted to a Fugue extension during runtime. In the example below, we have two create functions. The first one has an output type of pd.DataFrame
, which means that the schema is already known. The second one has an output type of List[List[Any]]
, which does hold schema so it has to be provided during the create
call inside FugueWorkflow
.
from typing import Iterable, Dict, Any, List
import pandas as pd
from fugue import FugueWorkflow
# fugue knows the schema because the output in pd.DataFrame
def create1(n=1) -> pd.DataFrame:
return pd.DataFrame([[n]],columns=["a"])
# schema is not known so it has to be provided later
def create2(n=1) -> List[List[Any]]:
return [[n]]
with FugueWorkflow() as dag:
dag.create(create1, params={"n":2}).show()
dag.create(create2, schema="a:int", params={"n":2}).show()
PandasDataFrame
a:long
------
2
Total count: 1
ArrayDataFrame
a:int
-----
2
Total count: 1
Schema Hint#
The schema can also be provided during the function definition through the use of the schema hint comment. Providing it during definition means it does not need to be provided inside the FugueWorkflow
.
# schema: a:int
def create2(n=1) -> List[List[Any]]:
return [[n]]
with FugueWorkflow() as dag:
dag.create(create2).show()
ArrayDataFrame
a:int
-----
1
Total count: 1
Decorator Approach#
There is no obvious advantage to use the decorator approach for defining a Creator
. In general, the decorator is good if the schema is too long to type out as a comment in one line.
from fugue import creator
@creator("a:int")
def create(n=1) -> List[List[Any]]:
return [[n]]
with FugueWorkflow() as dag:
dag.create(create).show()
ArrayDataFrame
a:int
-----
1
Total count: 1
Interface Approach (Advanced)#
All the previous methods are just wrappers of the interface approach. They cover most of use cases and are simpler to use. But if you want to get all execution context such as partition information, use interface approach.
In the interface approach, type annotations are not necessary but it’s good practice to have them.
from fugue import Creator, DataFrame
class Array(Creator):
def create(self) -> DataFrame:
engine = self.execution_engine
n = self.params.get_or_throw("n",int)
return engine.to_df([[n]],"a:int")
with FugueWorkflow() as dag:
dag.create(Array, params=dict(n=1)).show()
ArrayDataFrame
a:int
-----
1
Total count: 1
Using the ExecutionEngine#
In some cases, the Creator
has to be aware of the ExecutionEngine
. This is an example of how to write native Spark code inside Fugue.
from fugue import ExecutionEngine
from fugue_spark import SparkExecutionEngine, SparkDataFrame
# pay attention to the input and output annotations, they are both general DataFrame
def create(e:ExecutionEngine, n=1) -> DataFrame:
assert isinstance(e,SparkExecutionEngine) # this extension only works with SparkExecutionEngine
sdf= e.spark_session.createDataFrame([[n]], schema="a:int") # this is how you get spark session
return SparkDataFrame(sdf) # you must wrap as Fugue SparkDataFrame to return
with FugueWorkflow(SparkExecutionEngine) as dag:
dag.create(create, params={"n":2}).show()
SparkDataFrame
a:int
-----
2
Total count: 1