Fugue Interface#

In the previous section, we talked about the motivation of Fugue and showed a simple example of FugueWorkflow. Here we provide more details about the Fugue syntax that will be needed in actual applications. We will also talk about some of the underlying concepts that were shown in the previous section but not really explained.

The Directed Acyclic Graph (DAG)#

In the last section, we had a code block that used with FugueWorkflow() as dag:. We’ll demonstrate another simple example below. Here we have a simple transformer that takes in two columns and adds them to create a third column called col3. This is then used in the FugueWorkflow.

import pandas as pd
from fugue import FugueWorkflow

data = pd.DataFrame({'col1': [1,2,3], 'col2':[2,3,4]})
data2 = data.copy()

def make_new_col(df: pd.DataFrame) -> pd.DataFrame:
    df['col3'] = df['col1'] + df['col2']
    return df 

with FugueWorkflow() as dag:
    df = dag.df(data2)
    df = df.transform(make_new_col, schema="*, col3:int")
    df.show()
PandasDataFrame
col1:long|col2:long|col3:int
---------+---------+--------
1        |2        |3       
2        |3        |5       
3        |4        |7       
Total count: 3

Nothing here is new. This should all be familiar from the last section; however, we did not really dive into what the FugueWorkflow actually does. The FugueWorkflow is responsible for constructing a Directed Acyclic Graph, also called a DAG. A lot of people associate the DAG concept with workflow orchestration tools like Airflow, Prefect, or Dagster. While these tools also use DAGs, they use it in a different way than the distributed computing frameworks (Spark and Dask). For orchestration frameworks, the DAG is used to manage dependencies of scheduled tasks. For computing frameworks, the DAG represents a computation graph that is built, validated, and then executed. DAGs are used because distributed computing operations are very expensive and have a lot of room to be optimized. Also, mistakes in a distributed setting are very expensive.

Fugue follows these distributed computing frameworks in using the DAG for validation before execution. DAGs can catch errors significantly earlier, in a way similar to compiling the computing job. For Fugue specifically, the built DAG validates schema, as well as provides the basis for further optimizations. For example, Fugue can detect which DataFrames are re-used in the computation graph and then persist them automatically to avoid recomputation. The DAG is a graph where the nodes are DataFrames connected by Fugue extensions. We already introduced the most common extension, which is the transformer. Schema is tracked throughout the DAG. More extensions will be introduced later.

Schema#

This leads us to schema. In this context, schema maps column fields with their corresponding data types. Schema is explicit in Fugue for a couple of reasons:

  1. It allows quick validation if the computation job contains all of the necessary columns.

  2. It guarantees that operations are performed as expected across all machines. As a DataFrame can be split across multiple machines, each machine only sees the data it holds. A strong schema prevents an operation intended for, say, an integer column from unexpectedly having to deal with string values.

  3. Inferring schema is an expensive and error-prone operation. To infer the schema, distributed computing frameworks have to go through at least one partition of data to figure out the possible schema. If the inferred schema is inconsistent across partitions, the result will be wrong.

  4. Schema is a requirement for using Spark and Dask.

Method 1: Schema Hint

In this case the comment is read and enforced by FugueWorkflow. This is the least invasive to code and is not even dependent on Fugue. If a user chooses to move away from Fugue, these are still helpful comments that can remain in the code.

# schema: *, col3:int
def make_new_col(df: pd.DataFrame) -> pd.DataFrame:
    df['col3'] = df['col1'] + df['col2']
    return df 

Method 2: Decorator

One of the limitations of the schema hint is that linters often complain if there is a very long schema (past 70 or 80 characters). In that situation, users can import a long string into their script and pass it to the transformer decorator. This is also more explicit that this function is being wrapped into a Fugue transformer.

from fugue import transformer

@transformer(schema="*, col3:int")
def make_new_col(df: pd.DataFrame) -> pd.DataFrame:
    df['col3'] = df['col1'] + df['col2']
    return df 

Method 3: FugueWorkflow

If users don’t want to use any of the above options, they can provide the schema in the transform call under the FugueWorkflow context manager like the example below. This converts the function to a transformer during execution. This leaves the original function untouched and only brings it to Spark or Dask during runtime when needed.

data2 = data.copy()

def make_new_col(df: pd.DataFrame) -> pd.DataFrame:
    df['col3'] = df['col1'] + df['col2']
    return df 
    
with FugueWorkflow() as dag:
    df = dag.df(data2)
    df = df.transform(make_new_col, schema="*, col3:int")
    df.show()
PandasDataFrame
col1:long|col2:long|col3:int
---------+---------+--------
1        |2        |3       
2        |3        |5       
3        |4        |7       
Total count: 3

Passing Parameters#

We saw in the first section how to pass parameters into the transform() function but we haven’t seen how to do this yet with FugueWorkflow. Passing parameters is identical in both approaches. We pass them as a dictionary when we use the transform() method.

data2 = data.copy()

# schema: *, col3:int
def make_new_col(df: pd.DataFrame, n=1) -> pd.DataFrame:
    df['col3'] = df['col1'] + df['col2'] + n
    return df 
    
with FugueWorkflow() as dag:
    df = dag.df(data2)
    df = df.transform(make_new_col, params={'n': 10})  # Pass parameters
    df.show()
PandasDataFrame
col1:long|col2:long|col3:int
---------+---------+--------
1        |2        |13      
2        |3        |15      
3        |4        |17      
Total count: 3

Loading and Saving Data#

Load and save operations are done inside the FugueWorkflow and use the appropriate saver/loader for the file extension (.csv, .json, .parquet, .avro) and ExecutionEngine (Pandas, Spark, or Dask). For distributed computing, parquet and avro tend to be the most used due to compression.

with FugueWorkflow() as dag:
    df = dag.df(data2)
    df.save('/tmp/data.parquet', mode="overwrite", single=True)
    df.save("/tmp/data.csv", mode="overwrite", header=True)
    df2 = dag.load('/tmp/data.parquet')
    df3 = dag.load("/tmp/data.csv", header=True, columns="col1:int,col2:int")
    df3.show()
PandasDataFrame
col1:int|col2:int
--------+--------
1       |2       
2       |3       
3       |4       
Total count: 3

Summary#

In this section we covered some concepts, such as the DAG, and why explicit schema is needed. We also covered how to define schema and pass in parameters. Combined with loading and saving of files, users can already start using Fugue for working with data.