Extension Input Data Validation#

When using extensions in Fugue, you may add input data validation logic inside your code. However, there is standard way to add your validation logic. Here is a simple example:

from typing import List, Dict, Any

# partitionby_has: a
# schema: a:int,ct:int
def get_count(df:List[Dict[str,Any]]) -> List[List[Any]]:
    return [[df[0]["a"],len(df)]]

The following commented-out code will fail, because of the hint partitionby_has: a requires the input dataframe to be prepartitioned by at least column a.

from fugue import FugueWorkflow

with FugueWorkflow() as dag:
    df = dag.df([[0,1],[1,1],[0,2]], "a:int,b:int")
    # df.transform(get_count).show()  # will fail because of no partition by
    df.partition(by=["a"]).transform(get_count).show()
    df.partition(by=["b","a"]).transform(get_count).show()  # b,a is a super set of a

You can also have multiple rules, the following requires partition keys to contain a, and presort to be exactly b asc (b == b asc)

from typing import List, Dict, Any

# partitionby_has: a
# presort_is: b
# schema: a:int,ct:int
def get_count2(df:List[Dict[str,Any]]) -> List[List[Any]]:
    return [[df[0]["a"],len(df)]]
from fugue import FugueWorkflow

with FugueWorkflow() as dag:
    df = dag.df([[0,1],[1,1],[0,2]], "a:int,b:int")
    # df.partition(by=["a"]).transform(get_count).show()  # will fail because of no presort
    df.partition(by=["a"], presort="b asc").transform(get_count).show()

Supported Validations#

The following are all supported validations. Compile time validations will happen when you construct the FugueWorkflow while runtime validations happen during execution. Compile time validations are very useful to quickly identify logical issues. Runtime validations may take longer time to happen but they are still useful.On Fugue level, we are trying to move runtime validations to compile time as much as we can.

Rule

Description

Compile Time

Order Matters

Examples

partitionby_has

assert the input dataframe is prepartitioned, and the partition keys contain these values

Yes

No

partitionby_has: a,b means the partition keys must contain a and b columns

partitionby_is

assert the input dataframe is prepartitioned, and the partition keys are exactly these values

Yes

Yes

partitionby_is: a,b means the partition keys must contain and only contain a and b columns

presort_has

assert the input dataframe is prepartitioned and presorted, and the presort keys contain these values

Yes

No

presort_has: a,b desc means the presort contains a asc and b desc (a == a asc)

presort_is

assert the input dataframe is prepartitioned and presorted, and the presort keys are exactly these values

Yes

Yes

presort_is: a,b desc means the presort is exactly a asc, b desc

schema_has

assert input dataframe schema has certain keys or key type pairs

No

No

schema_has: a,b:str means input dataframe schema contains column a regardless of type, and b of type string, order doesn’t matter. So b:str,a:int is valid, b:int,a:int is invalid because of b type, and b:str is invalid because a is not in the schema

schema_is

assert input dataframe schema is exactly this value (the value must be a schema expression)

No

Yes

schema_is: a:int,b:str, then b:str,a:int is invalid because of order, a:str,b:str is invalid because of a type

Extensions Compatibility#

Extension Type

Supported

Not Supported

Transformer

partitionby_has, partitionby_is, presort_has, presort_is, schema_has, schema_is

None

CoTransformer

None

partitionby_has, partitionby_is, presort_has, presort_is, schema_has, schema_is

OutputTransformer

partitionby_has, partitionby_is, presort_has, presort_is, schema_has, schema_is

None

OutputCoTransformer

None

partitionby_has, partitionby_is, presort_has, presort_is, schema_has, schema_is

Creator

N/A

N/A

Processor

partitionby_has, partitionby_is, presort_has, presort_is, schema_has, schema_is

None

Outputter

partitionby_has, partitionby_is, presort_has, presort_is, schema_has, schema_is

None

How To Add Validations#

It depends on how you write your extension, by comment, by decorator or by interface, feature wise, they are equivalent.

By Comment#

from typing import List, Dict, Any

# schema: a:int,ct:int
def get_count2(df:List[Dict[str,Any]]) -> List[List[Any]]:
    return [[df[0]["a"],len(df)]]

By Decorator#

import pandas as pd
from typing import List, Dict, Any
from fugue import processor, transformer

@transformer(schema="*", partitionby_has=["a","d"], presort_is="b, c desc")
def example1(df:pd.DataFrame) -> pd.DataFrame:
    return df

@transformer(schema="*", partitionby_has="a,d", presort_is=["b",("c",False)])
def example2(df:pd.DataFrame) -> pd.DataFrame:
    return df

# partitionby_has: a
# presort_is: b
@transformer(schema="*")
def example3(df:pd.DataFrame) -> pd.DataFrame:
    return df

@processor(partitionby_has=["a","d"], presort_is="b, c desc")
def example4(df:pd.DataFrame) -> pd.DataFrame:
    return df

By Interface#

In every extension, you can override validation_rules

from fugue import Transformer

class T(Transformer):
    @property
    def validation_rules(self):
        return {
            "partitionby_has": ["a"]
        }

    def get_output_schema(self, df):
        return df.schema
    
    def transform(self, df):
        return df