Extension Input Data Validation
Contents
Extension Input Data Validation#
Have questions? Chat with us on Github or Slack:
When using extensions in Fugue, you may add input data validation logic inside your code. However, there is standard way to add your validation logic. Here is a simple example:
from typing import List, Dict, Any
# partitionby_has: a
# schema: a:int,ct:int
def get_count(df:List[Dict[str,Any]]) -> List[List[Any]]:
return [[df[0]["a"],len(df)]]
The code inside the try
block will fail, because of the hint partitionby_has: a
requires the input DataFrame to be prepartitioned by at least column a
.
import pandas as pd
from fugue import transform
df = pd.DataFrame({"a": [0,1,2], "b": [1,1,2]})
try:
transform(df, get_count)
except Exception as e:
print(e)
transform(df, get_count, partition={"by": "a"})
transform(df, get_count, partition={"by": ["b","a"]}) # ["a"] is a subset of ["b", "a"]
required partition key a is not in PartitionSpec(num='0', by=[], presort='')
a | ct | |
---|---|---|
0 | 0 | 1 |
1 | 1 | 1 |
2 | 2 | 1 |
You can also have multiple rules, the following requires partition keys to contain a
, and presort to be exactly b asc
(b == b asc
)
from typing import List, Dict, Any
# partitionby_has: a
# presort_is: b
# schema: a:int,ct:int
def get_count2(df:List[Dict[str,Any]]) -> List[List[Any]]:
return [[df[0]["a"],len(df)]]
try:
transform(df, get_count2)
except Exception as e:
print(e)
transform(df, get_count2, partition={"by":"a", "presort": "b asc"})
required partition key a is not in PartitionSpec(num='0', by=[], presort='')
a | ct | |
---|---|---|
0 | 0 | 1 |
1 | 1 | 1 |
2 | 2 | 1 |
Supported Validations#
The following are all supported validations. Compile time validations will happen when you construct the FugueWorkflow while runtime validations happen during execution. Compile time validations are very useful to quickly identify logical issues. Runtime validations may take longer time to happen but they are still useful.On Fugue level, we are trying to move runtime validations to compile time as much as we can.
Rule |
Description |
Compile Time |
Order Matters |
Examples |
---|---|---|---|---|
partitionby_has |
assert the input dataframe is prepartitioned, and the partition keys contain these values |
Yes |
No |
|
partitionby_is |
assert the input dataframe is prepartitioned, and the partition keys are exactly these values |
Yes |
Yes |
|
presort_has |
assert the input dataframe is prepartitioned and presorted, and the presort keys contain these values |
Yes |
No |
|
presort_is |
assert the input dataframe is prepartitioned and presorted, and the presort keys are exactly these values |
Yes |
Yes |
|
schema_has |
assert input dataframe schema has certain keys or key type pairs |
No |
No |
|
schema_is |
assert input dataframe schema is exactly this value (the value must be a schema expression) |
No |
Yes |
|
Extensions Compatibility#
Extension Type |
Supported |
Not Supported |
---|---|---|
Transformer |
|
None |
CoTransformer |
None |
|
OutputTransformer |
|
None |
OutputCoTransformer |
None |
|
Creator |
N/A |
N/A |
Processor |
|
None |
Outputter |
|
None |
How To Add Validations#
It depends on how you write your extension, by comment, by decorator or by interface, feature wise, they are equivalent.
By Comment#
from typing import List, Dict, Any
# schema: a:int,ct:int
def get_count2(df:List[Dict[str,Any]]) -> List[List[Any]]:
return [[df[0]["a"],len(df)]]
By Decorator#
import pandas as pd
from typing import List, Dict, Any
from fugue import processor, transformer
@transformer(schema="*", partitionby_has=["a","d"], presort_is="b, c desc")
def example1(df:pd.DataFrame) -> pd.DataFrame:
return df
@transformer(schema="*", partitionby_has="a,d", presort_is=["b",("c",False)])
def example2(df:pd.DataFrame) -> pd.DataFrame:
return df
# partitionby_has: a
# presort_is: b
@transformer(schema="*")
def example3(df:pd.DataFrame) -> pd.DataFrame:
return df
@processor(partitionby_has=["a","d"], presort_is="b, c desc")
def example4(df:pd.DataFrame) -> pd.DataFrame:
return df
By Interface#
In every extension, you can override validation_rules
from fugue import Transformer
class T(Transformer):
@property
def validation_rules(self):
return {
"partitionby_has": ["a"]
}
def get_output_schema(self, df):
return df.schema
def transform(self, df):
return df