{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Extension Input Data Validation\n",
"\n",
"Have questions? Chat with us on Github or Slack:\n",
"\n",
"[![Homepage](https://img.shields.io/badge/fugue-source--code-red?logo=github)](https://github.com/fugue-project/fugue)\n",
"[![Slack Status](https://img.shields.io/badge/slack-join_chat-white.svg?logo=slack&style=social)](http://slack.fugue.ai)\n",
"\n",
"When using extensions in Fugue, you may add input data validation logic inside your code. However, there is standard way to add your validation logic. Here is a simple example:"
]
},
{
"cell_type": "code",
"execution_count": 1,
"metadata": {},
"outputs": [],
"source": [
"from typing import List, Dict, Any\n",
"\n",
"# partitionby_has: a\n",
"# schema: a:int,ct:int\n",
"def get_count(df:List[Dict[str,Any]]) -> List[List[Any]]:\n",
" return [[df[0][\"a\"],len(df)]]"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"The code inside the `try` block will fail, because of the hint `partitionby_has: a` requires the input DataFrame to be prepartitioned by at least column `a`."
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"required partition key a is not in PartitionSpec(num='0', by=[], presort='')\n"
]
},
{
"data": {
"text/html": [
"
\n",
"\n",
"
\n",
" \n",
" \n",
" | \n",
" a | \n",
" ct | \n",
"
\n",
" \n",
" \n",
" \n",
" 0 | \n",
" 0 | \n",
" 1 | \n",
"
\n",
" \n",
" 1 | \n",
" 1 | \n",
" 1 | \n",
"
\n",
" \n",
" 2 | \n",
" 2 | \n",
" 1 | \n",
"
\n",
" \n",
"
\n",
"
"
],
"text/plain": [
" a ct\n",
"0 0 1\n",
"1 1 1\n",
"2 2 1"
]
},
"execution_count": 3,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"import pandas as pd\n",
"from fugue import transform\n",
"\n",
"df = pd.DataFrame({\"a\": [0,1,2], \"b\": [1,1,2]})\n",
"try:\n",
" transform(df, get_count)\n",
"except Exception as e:\n",
" print(e)\n",
"\n",
"transform(df, get_count, partition={\"by\": \"a\"})\n",
"transform(df, get_count, partition={\"by\": [\"b\",\"a\"]}) # [\"a\"] is a subset of [\"b\", \"a\"]"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"You can also have multiple rules, the following requires partition keys to contain `a`, and presort to be exactly `b asc` (`b == b asc`)"
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {},
"outputs": [],
"source": [
"from typing import List, Dict, Any\n",
"\n",
"# partitionby_has: a\n",
"# presort_is: b\n",
"# schema: a:int,ct:int\n",
"def get_count2(df:List[Dict[str,Any]]) -> List[List[Any]]:\n",
" return [[df[0][\"a\"],len(df)]]"
]
},
{
"cell_type": "code",
"execution_count": 6,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"required partition key a is not in PartitionSpec(num='0', by=[], presort='')\n"
]
},
{
"data": {
"text/html": [
"\n",
"\n",
"
\n",
" \n",
" \n",
" | \n",
" a | \n",
" ct | \n",
"
\n",
" \n",
" \n",
" \n",
" 0 | \n",
" 0 | \n",
" 1 | \n",
"
\n",
" \n",
" 1 | \n",
" 1 | \n",
" 1 | \n",
"
\n",
" \n",
" 2 | \n",
" 2 | \n",
" 1 | \n",
"
\n",
" \n",
"
\n",
"
"
],
"text/plain": [
" a ct\n",
"0 0 1\n",
"1 1 1\n",
"2 2 1"
]
},
"execution_count": 6,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"try:\n",
" transform(df, get_count2)\n",
"except Exception as e:\n",
" print(e)\n",
"\n",
"transform(df, get_count2, partition={\"by\":\"a\", \"presort\": \"b asc\"})"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Supported Validations\n",
"\n",
"The following are all supported validations. **Compile time validations** will happen when you construct the [FugueWorkflow](/dag.ipynb) while **runtime validations** happen during execution. Compile time validations are very useful to quickly identify logical issues. Runtime validations may take longer time to happen but they are still useful.On Fugue level, we are trying to move runtime validations to compile time as much as we can.\n",
"\n",
" Rule | Description | Compile Time | Order Matters | Examples\n",
":---|:---|:---|:---|:---\n",
"**partitionby_has** | assert the input dataframe is prepartitioned, and the partition keys contain these values | Yes | No | `partitionby_has: a,b` means the partition keys must contain `a` and `b` columns\n",
"**partitionby_is** | assert the input dataframe is prepartitioned, and the partition keys are exactly these values | Yes | Yes | `partitionby_is: a,b` means the partition keys must contain and only contain `a` and `b` columns\n",
"**presort_has** | assert the input dataframe is prepartitioned and [presorted](./partition.ipynb#Presort), and the presort keys contain these values | Yes | No | `presort_has: a,b desc` means the presort contains `a asc` and `b desc` (`a == a asc`)\n",
"**presort_is** | assert the input dataframe is prepartitioned and [presorted](./partition.ipynb#Presort), and the presort keys are exactly these values | Yes | Yes | `presort_is: a,b desc` means the presort is exactly `a asc, b desc`\n",
"**schema_has** | assert input dataframe schema has certain keys or key type pairs | No | No | `schema_has: a,b:str` means input dataframe schema contains column `a` regardless of type, and `b` of type string, order doesn't matter. So `b:str,a:int` is valid, `b:int,a:int` is invalid because of `b` type, and `b:str` is invalid because `a` is not in the schema\n",
"**schema_is** | assert input dataframe schema is exactly this value (the value must be a [schema expression](./schema_dataframes.ipynb#Schema)) | No | Yes | `schema_is: a:int,b:str`, then `b:str,a:int` is invalid because of order, `a:str,b:str` is invalid because of `a` type\n",
"\n",
"\n",
"## Extensions Compatibility\n",
"\n",
"Extension Type | Supported | Not Supported\n",
":---|:---|:---\n",
"Transformer | `partitionby_has`, `partitionby_is`, `presort_has`, `presort_is`, `schema_has`, `schema_is` | None\n",
"CoTransformer | None | `partitionby_has`, `partitionby_is`, `presort_has`, `presort_is`, `schema_has`, `schema_is`\n",
"OutputTransformer | `partitionby_has`, `partitionby_is`, `presort_has`, `presort_is`, `schema_has`, `schema_is` | None\n",
"OutputCoTransformer | None | `partitionby_has`, `partitionby_is`, `presort_has`, `presort_is`, `schema_has`, `schema_is`\n",
"Creator | N/A | N/A\n",
"Processor | `partitionby_has`, `partitionby_is`, `presort_has`, `presort_is`, `schema_has`, `schema_is` | None\n",
"Outputter | `partitionby_has`, `partitionby_is`, `presort_has`, `presort_is`, `schema_has`, `schema_is` | None\n",
"\n",
"\n",
"## How To Add Validations\n",
"\n",
"It depends on how you write your extension, by comment, by decorator or by interface, feature wise, they are equivalent.\n",
"\n",
"## By Comment"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from typing import List, Dict, Any\n",
"\n",
"# schema: a:int,ct:int\n",
"def get_count2(df:List[Dict[str,Any]]) -> List[List[Any]]:\n",
" return [[df[0][\"a\"],len(df)]]"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## By Decorator"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import pandas as pd\n",
"from typing import List, Dict, Any\n",
"from fugue import processor, transformer\n",
"\n",
"@transformer(schema=\"*\", partitionby_has=[\"a\",\"d\"], presort_is=\"b, c desc\")\n",
"def example1(df:pd.DataFrame) -> pd.DataFrame:\n",
" return df\n",
"\n",
"@transformer(schema=\"*\", partitionby_has=\"a,d\", presort_is=[\"b\",(\"c\",False)])\n",
"def example2(df:pd.DataFrame) -> pd.DataFrame:\n",
" return df\n",
"\n",
"# partitionby_has: a\n",
"# presort_is: b\n",
"@transformer(schema=\"*\")\n",
"def example3(df:pd.DataFrame) -> pd.DataFrame:\n",
" return df\n",
"\n",
"@processor(partitionby_has=[\"a\",\"d\"], presort_is=\"b, c desc\")\n",
"def example4(df:pd.DataFrame) -> pd.DataFrame:\n",
" return df"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## By Interface\n",
"\n",
"In every extension, you can override `validation_rules`"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from fugue import Transformer\n",
"\n",
"class T(Transformer):\n",
" @property\n",
" def validation_rules(self):\n",
" return {\n",
" \"partitionby_has\": [\"a\"]\n",
" }\n",
"\n",
" def get_output_schema(self, df):\n",
" return df.schema\n",
" \n",
" def transform(self, df):\n",
" return df"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3.8.13 ('fugue')",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.8.13"
},
"vscode": {
"interpreter": {
"hash": "9fcd6e71927f6b3e5f4fa4280b4e8e6a66aa8d4365bb61cf7ef4017620fc09b9"
}
}
},
"nbformat": 4,
"nbformat_minor": 4
}