{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Type Hinting\n",
"\n",
"Have questions? Chat with us on Github or Slack:\n",
"\n",
"[](https://github.com/fugue-project/fugue)\n",
"[](http://slack.fugue.ai)\n",
"\n",
"In the previous section, we saw how the `transform()` function was used to bring a Python function to Spark, Dask, or Ray. In this section, we take a deeper look at the `transform()` function and what types of inputs and outputs it can handle."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"In the last section, the function we brought to Spark looked like this:\n",
"\n",
"```\n",
"def predict(df: pd.DataFrame, model: LinearRegression) -> pd.DataFrame:\n",
" return df.assign(predicted=model.predict(df))\n",
"```\n",
"\n",
"It had an input of type `pd.DataFrame` and an output of type `pd.DataFrame`. Fugue applies these type annotations and converts the input to the type specified. The output annotation is also used to convert back to the engine DataFrame (Pandas, Spark, Dask or Ray). In this section, we will see some other examples of type annotations that Fugue can handle.\n",
"\n",
"For those more familiar with distributed computing, the conversion is done for each partition. This will be explained more in later sections, but the important concept is that Fugue **does not** convert the whole Spark or Dask DataFrame to Pandas to perform the operation. **The conversion happens on a portion of the distributed DataFrame.**\n",
"\n",
"The conversion overhead has been [benchmarked](../resources/best_practices/fugue_spark_benchmark.ipynb) and found to be negligible, especially for big data."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Sample Problem and `transform()`\n",
"\n",
"In this sample problem, we are interested in getting the first three digits of the `phone` column and populating a new column called `location` by using a dictionary that maps the values. We start by preparing the sample data and defining the mapping."
]
},
{
"cell_type": "code",
"execution_count": 1,
"metadata": {},
"outputs": [],
"source": [
"import pandas as pd\n",
"\n",
"_area_code_map = {\"217\": \"Champaign, IL\", \"407\": \"Orlando, FL\", \"510\": \"Fremont, CA\"}\n",
"\n",
"data = pd.DataFrame({\"phone\": [\"(217)-123-4567\", \"(217)-234-5678\", \"(407)-123-4567\", \n",
" \"(407)-234-5678\", \"(510)-123-4567\"]})"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"First, we'll perform the operation in Pandas. It's very simple because of the `.map()` method in Pandas. We then test the function."
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
"
\n",
"\n",
"
\n",
" \n",
" \n",
" | \n",
" phone | \n",
" location | \n",
"
\n",
" \n",
" \n",
" \n",
" 0 | \n",
" (217)-123-4567 | \n",
" Champaign, IL | \n",
"
\n",
" \n",
" 1 | \n",
" (217)-234-5678 | \n",
" Champaign, IL | \n",
"
\n",
" \n",
" 2 | \n",
" (407)-123-4567 | \n",
" Orlando, FL | \n",
"
\n",
" \n",
" 3 | \n",
" (407)-234-5678 | \n",
" Orlando, FL | \n",
"
\n",
" \n",
" 4 | \n",
" (510)-123-4567 | \n",
" Fremont, CA | \n",
"
\n",
" \n",
"
\n",
"
"
],
"text/plain": [
" phone location\n",
"0 (217)-123-4567 Champaign, IL\n",
"1 (217)-234-5678 Champaign, IL\n",
"2 (407)-123-4567 Orlando, FL\n",
"3 (407)-234-5678 Orlando, FL\n",
"4 (510)-123-4567 Fremont, CA"
]
},
"execution_count": 2,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"def map_phone_to_location(df: pd.DataFrame) -> pd.DataFrame:\n",
" df[\"location\"] = df[\"phone\"].str.slice(1,4).map(_area_code_map)\n",
" return df\n",
"\n",
"map_phone_to_location(data.copy())"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Similar to the function in the previous section, this function can be used in Spark on big data with Fugue's `transform()` function. For now, we will leave the engine blank to use the default Pandas-based engine. "
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
"\n",
"\n",
"
\n",
" \n",
" \n",
" | \n",
" phone | \n",
" location | \n",
"
\n",
" \n",
" \n",
" \n",
" 0 | \n",
" (217)-123-4567 | \n",
" Champaign, IL | \n",
"
\n",
" \n",
" 1 | \n",
" (217)-234-5678 | \n",
" Champaign, IL | \n",
"
\n",
" \n",
"
\n",
"
"
],
"text/plain": [
" phone location\n",
"0 (217)-123-4567 Champaign, IL\n",
"1 (217)-234-5678 Champaign, IL"
]
},
"execution_count": 3,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"from fugue import transform\n",
"\n",
"transform(data.copy(),\n",
" map_phone_to_location,\n",
" schema=\"*, location:str\").head(2)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Role of Type Hints\n",
"\n",
"**The type hints are used by Fugue used to convert each partition of the distributed DataFrame**. Below is a diagram that illustrates this process.\n",
"\n",
"\n",
"\n",
"Conversion is needed because Spark partitions **are not** Pandas DataFrames. By converting them, we can apply any Python or Pandas code that we are used to. The point here is that we decompose the big data into several, more manageable, small data problems. The `pd.DataFrame` type is the most straightforward one, but distributed computing often goes beyond the semantics of Pandas. For example, what if we want to use Spark to process a list of image files? More generally, how do we use the distributed computing frameworks to perform general jobs?"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Different Input and Output Annotations\n",
"\n",
"The `map_phone_to_location()` logic can actually be expressed in native Python. Below are three valid implementations of the same function. The `map_phone_to_location3()` below is less practical, but it's just to demo the varying types that Fugue can take. `List[List]` will be useful in some use cases where you want to perform row-wise operations on DataFrames.\n"
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {},
"outputs": [],
"source": [
"from typing import List, Dict, Any, Iterable\n",
"\n",
"def map_phone_to_location2(df: List[Dict[str,Any]]) -> Iterable[Dict[str,Any]]:\n",
" for row in df:\n",
" row[\"location\"] = _area_code_map[row[\"phone\"][1:4]]\n",
" yield row\n",
"\n",
"def map_phone_to_location3(df: List[List[Any]]) -> List[List[Any]]:\n",
" for row in df:\n",
" row.append(_area_code_map[row[0][1:4]])\n",
" return df\n",
"\n",
"def map_phone_to_location4(df: List[List[Any]]) -> pd.DataFrame:\n",
" for row in df:\n",
" row.append(_area_code_map[row[0][1:4]])\n",
" df = pd.DataFrame.from_records(df, columns=[\"phone\", \"location\"])\n",
" return df\n",
"\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Note that `map_phone_to_location4()` shows that the input and output types can differ. We can test these functions by passing some input."
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
""
]
},
"execution_count": 5,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"map_phone_to_location2([{\"phone\": \"(217)-123-4567\"}, {\"phone\": \"(217)-234-5678\"}])"
]
},
{
"cell_type": "code",
"execution_count": 6,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"[['(217)-123-4567', 'Champaign, IL'], ['(217)-234-5678', 'Champaign, IL']]"
]
},
"execution_count": 6,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"map_phone_to_location3([[\"(217)-123-4567\"], [\"(217)-234-5678\"]])"
]
},
{
"cell_type": "code",
"execution_count": 7,
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
"\n",
"\n",
"
\n",
" \n",
" \n",
" | \n",
" phone | \n",
" location | \n",
"
\n",
" \n",
" \n",
" \n",
" 0 | \n",
" (217)-123-4567 | \n",
" Champaign, IL | \n",
"
\n",
" \n",
" 1 | \n",
" (217)-234-5678 | \n",
" Champaign, IL | \n",
"
\n",
" \n",
"
\n",
"
"
],
"text/plain": [
" phone location\n",
"0 (217)-123-4567 Champaign, IL\n",
"1 (217)-234-5678 Champaign, IL"
]
},
"execution_count": 7,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"map_phone_to_location4([[\"(217)-123-4567\"], [\"(217)-234-5678\"]])"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Even if these functions are not meant to operate on Pandas, Spark, Dask, or Ray DataFrames, Fugue `transform()` will handle the conversion for us so we don't need to make any changes to them to use them. We test these functions on the default Pandas-based engine first. "
]
},
{
"cell_type": "code",
"execution_count": 8,
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
"\n",
"\n",
"
\n",
" \n",
" \n",
" | \n",
" phone | \n",
" location | \n",
"
\n",
" \n",
" \n",
" \n",
" 0 | \n",
" (217)-123-4567 | \n",
" Champaign, IL | \n",
"
\n",
" \n",
" 1 | \n",
" (217)-234-5678 | \n",
" Champaign, IL | \n",
"
\n",
" \n",
"
\n",
"
"
],
"text/plain": [
" phone location\n",
"0 (217)-123-4567 Champaign, IL\n",
"1 (217)-234-5678 Champaign, IL"
]
},
"execution_count": 8,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"transform(data.copy(),\n",
" map_phone_to_location2,\n",
" schema=\"*, location:str\").head(2)"
]
},
{
"cell_type": "code",
"execution_count": 9,
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
"\n",
"\n",
"
\n",
" \n",
" \n",
" | \n",
" phone | \n",
" location | \n",
"
\n",
" \n",
" \n",
" \n",
" 0 | \n",
" (217)-123-4567 | \n",
" Champaign, IL | \n",
"
\n",
" \n",
" 1 | \n",
" (217)-234-5678 | \n",
" Champaign, IL | \n",
"
\n",
" \n",
"
\n",
"
"
],
"text/plain": [
" phone location\n",
"0 (217)-123-4567 Champaign, IL\n",
"1 (217)-234-5678 Champaign, IL"
]
},
"execution_count": 9,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"transform(data.copy(),\n",
" map_phone_to_location3,\n",
" schema=\"*, location:str\").head(2)"
]
},
{
"cell_type": "code",
"execution_count": 10,
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
"\n",
"\n",
"
\n",
" \n",
" \n",
" | \n",
" phone | \n",
" location | \n",
"
\n",
" \n",
" \n",
" \n",
" 0 | \n",
" (217)-123-4567 | \n",
" Champaign, IL | \n",
"
\n",
" \n",
" 1 | \n",
" (217)-234-5678 | \n",
" Champaign, IL | \n",
"
\n",
" \n",
"
\n",
"
"
],
"text/plain": [
" phone location\n",
"0 (217)-123-4567 Champaign, IL\n",
"1 (217)-234-5678 Champaign, IL"
]
},
"execution_count": 10,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"transform(data.copy(),\n",
" map_phone_to_location4,\n",
" schema=\"*, location:str\").head(2)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Bring the Functions to Distrbited Compute\n",
"\n",
"Because they work on the Pandas-based engine, we can also use the functions on Spark, Dask, and Ray. Any of the `map_to_phone_location()` variations can be used below."
]
},
{
"cell_type": "code",
"execution_count": 12,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+--------------+-------------+\n",
"| phone| location|\n",
"+--------------+-------------+\n",
"|(217)-123-4567|Champaign, IL|\n",
"|(217)-234-5678|Champaign, IL|\n",
"+--------------+-------------+\n",
"only showing top 2 rows\n",
"\n"
]
}
],
"source": [
"from pyspark.sql import SparkSession\n",
"\n",
"spark = SparkSession.builder.getOrCreate()\n",
"\n",
"spark_df = transform(data.copy(),\n",
" map_phone_to_location2, # Iterable[List] implementation\n",
" schema=\"*, location:str\",\n",
" engine=spark)\n",
"\n",
"spark_df.show(2)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Valid input and output types\n",
"\n",
"When using the `transform()` function, Fugue is converting the function into a `Transformer` object under the hood. The full list valid input and output annotations can be found in the [Transformer extension docs](../extensions/transformer.ipynb)."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Conclusion\n",
"\n",
"In this section, we have shown how the `transform()` function can adapt to user's code by accepting multiple input and output type annotations. This allows users to express their logic in whatever is best for the given problem. Fugue then uses the specified annotations and takes care of bringing these functions to Spark, Dask, or Ray.\n",
"\n",
"In the next section, we'll cover the schema requirement when using the `transform()`. We'll cover both by it's necessary and how Fugue simplifies this for users."
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3 (ipykernel)",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.8.13"
},
"vscode": {
"interpreter": {
"hash": "31f2aee4e71d21fbe5cf8b01ff0e069b9275f58929596ceb00d14d90e3e16cd6"
}
}
},
"nbformat": 4,
"nbformat_minor": 2
}