{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Schema\n", "\n", "Have questions? Chat with us on Github or Slack:\n", "\n", "[![Homepage](https://img.shields.io/badge/fugue-source--code-red?logo=github)](https://github.com/fugue-project/fugue)\n", "[![Slack Status](https://img.shields.io/badge/slack-join_chat-white.svg?logo=slack&style=social)](http://slack.fugue.ai)\n", "\n", "In the previous section, we saw how type hints are used in Fugue to provide more flexibility to express operations in different ways. In this section, we'll talk about schema and why it's an important requirement for distributed computing." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Explicit Schema\n", "\n", "The first thing to recognize is that explicit schema is pretty much a requirement for distributed computing. Frameworks like Spark and Dask support schema inference, but it can be expensive, and error prone. For more information, check the [best practices section](../resources/best_practices/explicit_schema.ipynb) about it.\n", "\n", "This is why we require it on the Fugue-level. In all cases, removing the need for schema inference will speed up code, which translates to meaningful cost reductions when dealing with big data. Also, some of the transformations we perform can be black-boxes to the underlying execution engine. Frameworks such as Spark, Dask, and Ray will need to know the output schema to handle custom transformations." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Fugue's Schema\n", "\n", "Again, Fugue does not invent schema, it uses [pyarrow schema](https://arrow.apache.org/docs/python/generated/pyarrow.Schema.html). But Fugue creates a special syntax to represent schema. Each column is represented as : , separated by commas.\n", "\n", "The Fugue project has a utility called `triad`, which contains the `Schema` class. In practice, you will just need to interact with the string representation." ] }, { "cell_type": "code", "execution_count": 8, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "True" ] }, "execution_count": 8, "metadata": {}, "output_type": "execute_result" } ], "source": [ "from triad.collections.schema import Schema\n", "\n", "s = Schema(\"a:int, b:str\")\n", "s == \"a:int,b:str\"" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "For this section we create a DataFrame to use that will be used throughout the examples provided below:" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Schema Expressions" ] }, { "cell_type": "code", "execution_count": 9, "metadata": {}, "outputs": [], "source": [ "import pandas as pd\n", "df = pd.DataFrame({\"a\": [1,2,3], \"b\": [1,2,3], \"c\": [1,2,3]})" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "**Adding a column**\n", "\n", "When using the `transform()`, the `*` in a schema expression means all existing columns. From there we can add new columns by adding `\",col:type\"`." ] }, { "cell_type": "code", "execution_count": 10, "metadata": {}, "outputs": [ { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
abcnew_col
01112
12223
23334
\n", "
" ], "text/plain": [ " a b c new_col\n", "0 1 1 1 2\n", "1 2 2 2 3\n", "2 3 3 3 4" ] }, "execution_count": 10, "metadata": {}, "output_type": "execute_result" } ], "source": [ "from fugue import transform \n", "\n", "def add_col(df: pd.DataFrame) -> pd.DataFrame:\n", " \"\"\"\n", " Function that creates a column with a value of column a + 1.\n", " \"\"\"\n", " return df.assign(new_col=df[\"a\"] + 1)\n", "\n", "transform(\n", " df=df, \n", " using=add_col, \n", " schema=\"*,new_col:int\"\n", " )" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "**Entirely new schema**\n", "\n", "There is no need to use the `*` operation. We can just specify all columns." ] }, { "cell_type": "code", "execution_count": 11, "metadata": {}, "outputs": [ { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
x
01
12
23
\n", "
" ], "text/plain": [ " x\n", "0 1\n", "1 2\n", "2 3" ] }, "execution_count": 11, "metadata": {}, "output_type": "execute_result" } ], "source": [ "def new_df(df: pd.DataFrame) -> pd.DataFrame:\n", " \"\"\"\n", " Function that creates a new DataFrame.\n", " \"\"\"\n", " return pd.DataFrame({\"x\": [1,2,3]})\n", "\n", "transform(\n", " df=df, \n", " using=new_df, \n", " schema=\"x:int\"\n", " )" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "**Dropping Columns**\n", "\n", "To drop a column, use `-col` without `\",\"`." ] }, { "cell_type": "code", "execution_count": 12, "metadata": {}, "outputs": [ { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
ac
011
122
233
\n", "
" ], "text/plain": [ " a c\n", "0 1 1\n", "1 2 2\n", "2 3 3" ] }, "execution_count": 12, "metadata": {}, "output_type": "execute_result" } ], "source": [ "def drop_col(df: pd.DataFrame) -> pd.DataFrame:\n", " \"\"\"\n", " A function used to drop a column labelled 'b'.\n", " \"\"\"\n", " return df.drop(\"b\", axis=1)\n", "\n", "transform(\n", " df=df, \n", " using=drop_col, \n", " schema=\"*-b\"\n", " )" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "**Altering Types**\n", "\n", "If a column is remaining but the type is being altered, use `+col:type`." ] }, { "cell_type": "code", "execution_count": 13, "metadata": {}, "outputs": [ { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
abc
01a11
12a22
23a33
\n", "
" ], "text/plain": [ " a b c\n", "0 1a 1 1\n", "1 2a 2 2\n", "2 3a 3 3" ] }, "execution_count": 13, "metadata": {}, "output_type": "execute_result" } ], "source": [ "def alter_col(df: pd.DataFrame) -> pd.DataFrame:\n", " \"\"\"\n", " Function that changes column a to string.\n", " \"\"\"\n", " return df.assign(a=df['a'].astype(\"str\")+\"a\")\n", "\n", "transform(\n", " df=df, \n", " using=alter_col, \n", " schema=\"*+a:str\"\n", " )" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "**Drop if Present**\n", "\n", "Use `~` to drop a column from the result if it is present." ] }, { "cell_type": "code", "execution_count": 14, "metadata": {}, "outputs": [ { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
ac
011
122
233
\n", "
" ], "text/plain": [ " a c\n", "0 1 1\n", "1 2 2\n", "2 3 3" ] }, "execution_count": 14, "metadata": {}, "output_type": "execute_result" } ], "source": [ "def no_op(df: pd.DataFrame) -> pd.DataFrame:\n", " \"\"\"\n", " A function that returns its input.\n", " \"\"\"\n", " return df\n", "\n", "transform(\n", " df=df, \n", " using=no_op, \n", " schema=\"*~b\"\n", " )" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "**Schema Result Mismatch**\n", "\n", "If the `transform()` output has columns not in the defined schema, they will not be returned.\n", "\n", "If the `transform()` output has an inconsistent type with the defined schema, it will be coerced." ] }, { "cell_type": "code", "execution_count": 15, "metadata": {}, "outputs": [ { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
a
01.0
12.0
23.0
\n", "
" ], "text/plain": [ " a\n", "0 1.0\n", "1 2.0\n", "2 3.0" ] }, "execution_count": 15, "metadata": {}, "output_type": "execute_result" } ], "source": [ "def no_op(df: pd.DataFrame) -> pd.DataFrame:\n", " \"\"\"\n", " A function that returns its input.\n", " \"\"\"\n", " return df\n", "\n", "transform(\n", " df=df, \n", " using=no_op, \n", " schema=\"a:float\"\n", " )" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Schema\n", "\n", "This leads us to schema. In this context, schema maps column fields with their corresponding data types. Schema is explicit in Fugue for a couple of reasons:\n", "\n", "1. It allows quick validation if the computation job contains all of the necessary columns.\n", "\n", "2. It guarantees that operations are performed as expected across all machines. As a DataFrame can be split across multiple machines, each machine only sees the data it holds. A strong schema prevents an operation intended for, say, an integer column from unexpectedly having to deal with string values.\n", "\n", "3. Inferring schema is an expensive and error-prone operation. To infer the schema, distributed computing frameworks have to go through at least one partition of data to figure out the possible schema. If the inferred schema is inconsistent across partitions, the result will be wrong.\n", "\n", "4. Schema is a requirement for using Spark and Dask." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Defining Schema\n", "\n", "There are a few ways to define the schema. Fugue has no preference and some of the approaches may be friendlier depending on the situation. Previously, we were passing it during the runtime with something like:\n", "\n", "```python\n", "transform(\n", " df=df, \n", " using=func,\n", " schema=\"*, new_col:int\n", " )\n", "```\n", "\n", "We can actually define the schema ahead of time during the function declaration as well.\n", "\n", "### Schema Hint\n", "\n", "In this case the comment is read and enforced by `FugueWorkflow`. This is the least invasive to code and is not even dependent on Fugue. If a user chooses to move away from Fugue, these are still helpful comments that can remain in the code.\n", "\n", "in the above examples we supplied the schema as an argument in the `transform()` function. The schema can also be supplied on the function as a comment similar to the partition validation described earlier in this chapter. If done this way, the schema should not be supplied to the `transform()` function. " ] }, { "cell_type": "code", "execution_count": 16, "metadata": {}, "outputs": [ { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
a
01
12
23
\n", "
" ], "text/plain": [ " a\n", "0 1\n", "1 2\n", "2 3" ] }, "execution_count": 16, "metadata": {}, "output_type": "execute_result" } ], "source": [ "# schema: a:int\n", "def no_op(df: pd.DataFrame) -> pd.DataFrame:\n", " \"\"\"\n", " A function that returns its input.\n", " \"\"\"\n", " return df\n", "\n", "transform(\n", " df=df, \n", " using=no_op\n", " )" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Decorator\n", "\n", "One of the limitations of the schema hint is that linters often complain if there is a very long schema (past 70 or 80 characters). In that situation, users can import a long string into their script and pass it to the `transformer` decorator. This is also more explicit that this function is being wrapped into a Fugue transformer.\n", "\n", "Using the decorator is a bit more invasive, but it can me used when the schema is very long. There is no functional difference, and it is discouraged because it injects a Fugue dependency." ] }, { "cell_type": "code", "execution_count": 20, "metadata": {}, "outputs": [ { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
a
01
12
23
\n", "
" ], "text/plain": [ " a\n", "0 1\n", "1 2\n", "2 3" ] }, "execution_count": 20, "metadata": {}, "output_type": "execute_result" } ], "source": [ "from fugue import transformer\n", "\n", "@transformer(\"a:int\")\n", "def no_op(df: pd.DataFrame) -> pd.DataFrame:\n", " \"\"\"\n", " A function that returns its input.\n", " \"\"\"\n", " return df\n", "\n", "transform(\n", " df=df, \n", " using=no_op\n", " )" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Conclusion\n", "\n", "In this section, we have shown the different ways to define the schema for an operation. We also discussed why it's important to be explicit about the schema when working with distributed computing. In the next section, we take a look at the different partitioning strategies Fugue provides to control how data is distributed across the cluster." ] } ], "metadata": { "kernelspec": { "display_name": "Python 3.8.13 ('fugue')", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.8.13" }, "vscode": { "interpreter": { "hash": "9fcd6e71927f6b3e5f4fa4280b4e8e6a66aa8d4365bb61cf7ef4017620fc09b9" } } }, "nbformat": 4, "nbformat_minor": 2 }