Schema#

Have questions? Chat with us on Github or Slack:

Homepage Slack Status

In the previous section, we saw how type hints are used in Fugue to provide more flexibility to express operations in different ways. In this section, we’ll talk about schema and why it’s an important requirement for distributed computing.

Explicit Schema#

The first thing to recognize is that explicit schema is pretty much a requirement for distributed computing. Frameworks like Spark and Dask support schema inference, but it can be expensive, and error prone. For more information, check the best practices section about it.

This is why we require it on the Fugue-level. In all cases, removing the need for schema inference will speed up code, which translates to meaningful cost reductions when dealing with big data. Also, some of the transformations we perform can be black-boxes to the underlying execution engine. Frameworks such as Spark, Dask, and Ray will need to know the output schema to handle custom transformations.

Fugue’s Schema#

Again, Fugue does not invent schema, it uses pyarrow schema. But Fugue creates a special syntax to represent schema. Each column is represented as : , separated by commas.

The Fugue project has a utility called triad, which contains the Schema class. In practice, you will just need to interact with the string representation.

from triad.collections.schema import Schema

s = Schema("a:int, b:str")
s == "a:int,b:str"
True

For this section we create a DataFrame to use that will be used throughout the examples provided below:

Schema Expressions#

import pandas as pd
df = pd.DataFrame({"a": [1,2,3], "b": [1,2,3], "c": [1,2,3]})

Adding a column

When using the transform(), the * in a schema expression means all existing columns. From there we can add new columns by adding ",col:type".

from fugue import transform 

def add_col(df: pd.DataFrame) -> pd.DataFrame:
    """
    Function that creates a column with a value of column a + 1.
    """
    return df.assign(new_col=df["a"] + 1)

transform(
    df=df, 
    using=add_col, 
    schema="*,new_col:int"
    )
a b c new_col
0 1 1 1 2
1 2 2 2 3
2 3 3 3 4

Entirely new schema

There is no need to use the * operation. We can just specify all columns.

def new_df(df: pd.DataFrame) -> pd.DataFrame:
    """
    Function that creates a new DataFrame.
    """
    return pd.DataFrame({"x": [1,2,3]})

transform(
    df=df, 
    using=new_df, 
    schema="x:int"
    )
x
0 1
1 2
2 3

Dropping Columns

To drop a column, use -col without ",".

def drop_col(df: pd.DataFrame) -> pd.DataFrame:
    """
    A function used to drop a column labelled 'b'.
    """
    return df.drop("b", axis=1)

transform(
    df=df, 
    using=drop_col, 
    schema="*-b"
    )
a c
0 1 1
1 2 2
2 3 3

Altering Types

If a column is remaining but the type is being altered, use +col:type.

def alter_col(df: pd.DataFrame) -> pd.DataFrame:
    """
    Function that changes column a to string.
    """
    return df.assign(a=df['a'].astype("str")+"a")

transform(
    df=df, 
    using=alter_col, 
    schema="*+a:str"
    )
a b c
0 1a 1 1
1 2a 2 2
2 3a 3 3

Drop if Present

Use ~ to drop a column from the result if it is present.

def no_op(df: pd.DataFrame) -> pd.DataFrame:
    """
    A function that returns its input.
    """
    return df

transform(
    df=df, 
    using=no_op, 
    schema="*~b"
    )
a c
0 1 1
1 2 2
2 3 3

Schema Result Mismatch

If the transform() output has columns not in the defined schema, they will not be returned.

If the transform() output has an inconsistent type with the defined schema, it will be coerced.

def no_op(df: pd.DataFrame) -> pd.DataFrame:
    """
    A function that returns its input.
    """
    return df

transform(
    df=df, 
    using=no_op, 
    schema="a:float"
    )
a
0 1.0
1 2.0
2 3.0

Schema#

This leads us to schema. In this context, schema maps column fields with their corresponding data types. Schema is explicit in Fugue for a couple of reasons:

  1. It allows quick validation if the computation job contains all of the necessary columns.

  2. It guarantees that operations are performed as expected across all machines. As a DataFrame can be split across multiple machines, each machine only sees the data it holds. A strong schema prevents an operation intended for, say, an integer column from unexpectedly having to deal with string values.

  3. Inferring schema is an expensive and error-prone operation. To infer the schema, distributed computing frameworks have to go through at least one partition of data to figure out the possible schema. If the inferred schema is inconsistent across partitions, the result will be wrong.

  4. Schema is a requirement for using Spark and Dask.

Defining Schema#

There are a few ways to define the schema. Fugue has no preference and some of the approaches may be friendlier depending on the situation. Previously, we were passing it during the runtime with something like:

transform(
    df=df, 
    using=func,
    schema="*, new_col:int
    )

We can actually define the schema ahead of time during the function declaration as well.

Schema Hint#

In this case the comment is read and enforced by FugueWorkflow. This is the least invasive to code and is not even dependent on Fugue. If a user chooses to move away from Fugue, these are still helpful comments that can remain in the code.

in the above examples we supplied the schema as an argument in the transform() function. The schema can also be supplied on the function as a comment similar to the partition validation described earlier in this chapter. If done this way, the schema should not be supplied to the transform() function.

# schema: a:int
def no_op(df: pd.DataFrame) -> pd.DataFrame:
    """
    A function that returns its input.
    """
    return df

transform(
    df=df, 
    using=no_op
    )
a
0 1
1 2
2 3

Decorator#

One of the limitations of the schema hint is that linters often complain if there is a very long schema (past 70 or 80 characters). In that situation, users can import a long string into their script and pass it to the transformer decorator. This is also more explicit that this function is being wrapped into a Fugue transformer.

Using the decorator is a bit more invasive, but it can me used when the schema is very long. There is no functional difference, and it is discouraged because it injects a Fugue dependency.

from fugue import transformer

@transformer("a:int")
def no_op(df: pd.DataFrame) -> pd.DataFrame:
    """
    A function that returns its input.
    """
    return df

transform(
    df=df, 
    using=no_op
    )
a
0 1
1 2
2 3

Conclusion#

In this section, we have shown the different ways to define the schema for an operation. We also discussed why it’s important to be explicit about the schema when working with distributed computing. In the next section, we take a look at the different partitioning strategies Fugue provides to control how data is distributed across the cluster.