{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Fugue API in 10 minutes\n",
"\n",
"Have questions? Chat with us on Github or Slack:\n",
"\n",
"[![Homepage](https://img.shields.io/badge/fugue-source--code-red?logo=github)](https://github.com/fugue-project/fugue)\n",
"[![Slack Status](https://img.shields.io/badge/slack-join_chat-white.svg?logo=slack&style=social)](http://slack.fugue.ai)\n",
"\n",
"This is a short introduction to the [Fugue API](https://github.com/fugue-project/fugue) geared towards new users. The Fugue project aims to make big data effortless by accelerating iteration speed and providing a simpler interface for users to utilize distributed computing engines.\n",
"\n",
"This tutorial covers the Python interface only. For SQL, check the [FugueSQL in 10 minutes section](ten_minutes_sql.ipynb).\n",
"\n",
"Fugue's intended audience consists of but is not limited to:\n",
"1. Data scientists who need to bring business logic written in Python or Pandas to bigger datasets\n",
"2. Data practitioners looking to parallelize existing code with distributed computing\n",
"3. Data teams that want to reduce the maintenance and testing of Spark/Dask/Ray code"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Setup\n",
"\n",
"For this tutorial, we firstly need to run through some quick setup to instantiate a Spark session for later use. "
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {},
"outputs": [],
"source": [
"from pyspark.sql import SparkSession\n",
"spark = SparkSession.builder.getOrCreate()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Bringing a function to Spark, Dask, or Ray\n",
"\n",
"The simplest way to scale Pandas-based code to Spark or Dask is with the `transform()` function. With the addition of this minimal wrapper, we can bring existing Pandas and Python code to distributed execution with minimal refactoring. The `transform()` function also provides quality of life enhancements that can eliminate boilerplate code for users.\n",
"\n",
"\n",
"\n",
"Let's quickly demonstrate how this concept can be applied. In the following code snippets below we will train a model using scikit-learn and Pandas. Then we will perform predictions using this model in parallel on top of Spark through Fugue."
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {},
"outputs": [],
"source": [
"import pandas as pd\n",
"import numpy as np\n",
"from sklearn.linear_model import LinearRegression\n",
"\n",
"X = pd.DataFrame({\"x_1\": [1, 1, 2, 2], \"x_2\":[1, 2, 2, 3]})\n",
"y = np.dot(X, np.array([1, 2])) + 3\n",
"reg = LinearRegression().fit(X, y)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"After training our model, we then wrap it in a `predict()` function. Bear in mind that this function is still written in Pandas making it easy to test on the `input_df` that we create. Wrapping our model in `predict()` will allow us to bridge execution to Spark, Dask, or Ray."
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
"
\n",
"\n",
"
\n",
" \n",
" \n",
" | \n",
" x_1 | \n",
" x_2 | \n",
" predicted | \n",
"
\n",
" \n",
" \n",
" \n",
" 0 | \n",
" 3 | \n",
" 3 | \n",
" 12.0 | \n",
"
\n",
" \n",
" 1 | \n",
" 4 | \n",
" 3 | \n",
" 13.0 | \n",
"
\n",
" \n",
" 2 | \n",
" 6 | \n",
" 6 | \n",
" 21.0 | \n",
"
\n",
" \n",
" 3 | \n",
" 6 | \n",
" 6 | \n",
" 21.0 | \n",
"
\n",
" \n",
"
\n",
"
"
],
"text/plain": [
" x_1 x_2 predicted\n",
"0 3 3 12.0\n",
"1 4 3 13.0\n",
"2 6 6 21.0\n",
"3 6 6 21.0"
]
},
"execution_count": 4,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"# define our predict function\n",
"def predict(df: pd.DataFrame, model: LinearRegression) -> pd.DataFrame:\n",
" \"\"\"\n",
" Function to predict results using a pre-built model\n",
" \"\"\"\n",
" return df.assign(predicted=model.predict(df))\n",
"\n",
"# create test data\n",
"input_df = pd.DataFrame({\"x_1\": [3, 4, 6, 6], \"x_2\":[3, 3, 6, 6]})\n",
"\n",
"# test the predict function\n",
"predict(input_df, reg)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Now this is where it starts to get interesting, let's bring the same code defined above to Spark using Fugue `transform()`. We take our DataFrame and apply the `predict()` function to it using either one of the Spark, Dask, or Ray engines. The `transform()` function parameters will be explained in detail later on, but for now, notice how we made no modifications to the `predict()` function in order to switch the execution from Pandas to Spark. All we have to do is pass in the `SparkSession` as the engine."
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"\n",
"+---+---+---------+\n",
"|x_1|x_2|predicted|\n",
"+---+---+---------+\n",
"| 3| 3| 12.0|\n",
"| 4| 3| 13.0|\n",
"| 6| 6| 21.0|\n",
"| 6| 6| 21.0|\n",
"+---+---+---------+\n",
"\n"
]
}
],
"source": [
"# import Fugue\n",
"from fugue import transform\n",
"\n",
"# use Fugue transform to switch execution to spark\n",
"result = transform(\n",
" df=input_df,\n",
" using=predict,\n",
" schema=\"*,predicted:double\",\n",
" params=dict(model=reg),\n",
" engine=spark\n",
")\n",
"\n",
"# result is a Spark DataFrame\n",
"print(type(result))\n",
"result.show()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"The `transform()` function provides much more flexibility for users than what we have just described above. This is just a simple use case designed to give you a flavor of what Fugue has to offer. For this example, the `transform()` function took in the following arguments:\n",
"\n",
"* df - input DataFrame (can be a Pandas, Spark, Dask, or Ray DataFrame)\n",
"* using - a Python function with valid input and output types\n",
"* schema - output schema of the operation\n",
"* params - a dictionary of parameters to pass in the function\n",
"* engine - the execution engine to run the operation on (Pandas, Spark, Dask, or Ray)\n",
"\n",
"We will delve into these arguments in more detail later on, this will include an explanation of the roles that `type hints` and `schema` play. For now, the most important thing to discuss is the engine."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Execution Engines\n",
"\n",
"In the example above, we supplied the `spark` variable as the engine so the `predict()` function will be applied on the `input_df` using Spark.\n",
"\n",
"To provide flexibility, a Pandas DataFrame input will be converted the `engine`'s DataFrame class before applying the operation automatically. The input will be converted in the examples below.\n",
"\n",
"```python\n",
"transform(df, fn, ..., engine=spark_session) # output is Spark DataFrame\n",
"transform(df, fn, ..., engine=dask_client) # output is Dask DataFrame\n",
"transform(df, fn, ..., engine=\"ray\") # output is Ray Dataset\n",
"```\n",
"\n",
"We can also use the `\"dask\"` or `\"spark\"` strings to spin up a Dask Client or SparkSession. \n",
"\n",
"As of Fugue 0.8.0, if no `engine` is supplied to `transform()`, it will infer to use the engine associated with the DataFrame by default.\n",
"\n",
"```python\n",
"transform(df, fn, ...) # runs on Pandas\n",
"transform(spark_df, fn, ...) # runs on Spark\n",
"transform(dask_df, fn, ...) # runs on Dask\n",
"transform(ray_df, fn, ...) # runs on Ray\n",
"```\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Returning a Local DataFrame\n",
"\n",
"While Fugue can convert Pandas DataFrames to Spark, Dask or Ray DataFrames, it will not convert a distributed DataFrame to Pandas, unless explicitly specified by using `as_local=True`. \n",
"\n",
"It sometimes more sense to save the output data as a parquet file after transformations. Returning a local Pandas DataFrame after `transform()` is only recommended for smaller data because it can overload the driver node of the cluster."
]
},
{
"cell_type": "code",
"execution_count": 6,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"\n"
]
},
{
"data": {
"text/html": [
"\n",
"\n",
"
\n",
" \n",
" \n",
" | \n",
" x_1 | \n",
" x_2 | \n",
" predicted | \n",
"
\n",
" \n",
" \n",
" \n",
" 0 | \n",
" 3 | \n",
" 3 | \n",
" 12.0 | \n",
"
\n",
" \n",
" 1 | \n",
" 4 | \n",
" 3 | \n",
" 13.0 | \n",
"
\n",
" \n",
" 2 | \n",
" 6 | \n",
" 6 | \n",
" 21.0 | \n",
"
\n",
" \n",
" 3 | \n",
" 6 | \n",
" 6 | \n",
" 21.0 | \n",
"
\n",
" \n",
"
\n",
"
"
],
"text/plain": [
" x_1 x_2 predicted\n",
"0 3 3 12.0\n",
"1 4 3 13.0\n",
"2 6 6 21.0\n",
"3 6 6 21.0"
]
},
"execution_count": 6,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"# use as_local=True to return a Pandas DataFrame\n",
"local_result = transform(\n",
" df=input_df,\n",
" using=predict,\n",
" schema=\"*,predicted:double\",\n",
" params=dict(model=reg),\n",
" engine=spark,\n",
" as_local=True\n",
")\n",
"\n",
"print(type(local_result))\n",
"local_result.head()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Type Hint Conversion\n",
"\n",
"In the previous section, we successfully displayed how Fugue can facilitate distributed execution by passing a Pandas-based function with valid input and output types to the Spark, Dask or Ray engines. The `predict()` function we defined above accepted `pd.DataFrame` as input and `pd.DataFrame` as output. These type annotations are essentially used by Fugue as a guide in order to allow Fugue to convert the distributed partitions before the function is applied. For those less experienced with Spark, it can be thought of as Spark partitions being converted to multiple Pandas DataFrames.\n",
"\n",
"In practice, not all data problems fit the Pandas semantics. Fugue can also handle other DataFrame-like input and output types. Take the following function that sums up a whole row and returns one column. We add a special condition that if the value in the summed column is greater than 10, we drop the row."
]
},
{
"cell_type": "code",
"execution_count": 7,
"metadata": {},
"outputs": [],
"source": [
"df = pd.DataFrame({\"a\": [1,2,3,4], \"b\": [1,2,3,4], \"c\": [1,2,3,4]})\n",
"\n",
"def add_row(df: pd.DataFrame) -> pd.DataFrame:\n",
" \"\"\"\n",
" A function that sums each row in a dataframe and drops the row\n",
" if the sum is greater than 10.\n",
" \"\"\"\n",
" df = df.assign(total=df.sum(axis=1))\n",
" df = df.loc[df[\"total\"] < 10]\n",
" return df"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"This can be ran using `transform()` on all engines. In the example below, we don't use pass an engine so it uses Pandas."
]
},
{
"cell_type": "code",
"execution_count": 8,
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
"\n",
"\n",
"
\n",
" \n",
" \n",
" | \n",
" a | \n",
" b | \n",
" c | \n",
" total | \n",
"
\n",
" \n",
" \n",
" \n",
" 0 | \n",
" 1 | \n",
" 1 | \n",
" 1 | \n",
" 3 | \n",
"
\n",
" \n",
" 1 | \n",
" 2 | \n",
" 2 | \n",
" 2 | \n",
" 6 | \n",
"
\n",
" \n",
" 2 | \n",
" 3 | \n",
" 3 | \n",
" 3 | \n",
" 9 | \n",
"
\n",
" \n",
"
\n",
"
"
],
"text/plain": [
" a b c total\n",
"0 1 1 1 3\n",
"1 2 2 2 6\n",
"2 3 3 3 9"
]
},
"execution_count": 8,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"transform(\n",
" df=df, \n",
" using=add_row, \n",
" schema=\"*,total:int\"\n",
" )"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"This same logic can be represented in multiple ways, and Fugue will be able to handle these due to the type annotation."
]
},
{
"cell_type": "code",
"execution_count": 9,
"metadata": {},
"outputs": [],
"source": [
"from typing import List, Iterable, Any, Dict\n",
"\n",
"def add_row2(df: List[Dict[str,Any]]) -> List[Dict[str,Any]]:\n",
" result = []\n",
" for row in df:\n",
" row[\"total\"] = row[\"a\"] + row[\"b\"] + row[\"c\"]\n",
" if row[\"total\"] < 10:\n",
" result.append(row)\n",
" return result\n",
"\n",
"def add_row3(df: List[List[Any]]) -> Iterable[List[Any]]:\n",
" for row in df:\n",
" row.append(sum(row))\n",
" if row[-1] < 10:\n",
" yield row"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"The input type annotation tells Fugue what to convert the input data to before the function is applied whereas the output type annotation informs Fugue how to convert it back to a Pandas, Spark, Dask, or Ray DataFrame. Notice that these functions are not even dependent on Pandas and can be tested easily. For example:"
]
},
{
"cell_type": "code",
"execution_count": 10,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"[{'a': 1, 'b': 2, 'c': 3, 'total': 6}]\n",
"[[1, 2, 3, 6]]\n"
]
}
],
"source": [
"print(add_row2([{\"a\": 1, \"b\": 2, \"c\": 3}]))\n",
"print(list(add_row3([[1,2,3]])))"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"This is one of the core offerings of Fugue. **Testing code that uses Spark, Dask or Ray is hard because of the dependency on the hardware.** Even if running the tests locally, iteration speed is significantly slower than using Pandas. This setup allows developers to unit test Python or Pandas code and bring it to the distributed setting when ready.\n",
"\n",
"These definitions are compatible with `transform()` across all execution engines. For example, we can use `add_row2` with the Spark engine."
]
},
{
"cell_type": "code",
"execution_count": 13,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+---+---+---+-----+\n",
"| a| b| c|total|\n",
"+---+---+---+-----+\n",
"| 1| 1| 1| 3|\n",
"| 2| 2| 2| 6|\n",
"| 3| 3| 3| 9|\n",
"+---+---+---+-----+\n",
"\n"
]
}
],
"source": [
"transform(\n",
" df=df, \n",
" using=add_row2, \n",
" schema=\"*,total:int\", \n",
" engine=spark\n",
" ).show()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"The full list of acceptable input and output types for `transform()` can be found in the [Transformer section](../extensions/transformer.ipynb)."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Schema\n",
"\n",
"We have seen a couple of `transform()` calls by now and each of them has had the `schema` passed in. The schema is a requirement for Spark, and heavily recommended for Dask and Ray. When data lives across multiple machines, schema inference can be computationally expensive. Data processing can also end up as inconsistent without explicit schema. \n",
"\n",
"Fugue enforces best practices so that code can run effectively at scale. Here we see how to use Fugue's representation, which is minimal compared to Spark's. "
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"For this section we create a DataFrame to use that will be used throughout the examples provided below:"
]
},
{
"cell_type": "code",
"execution_count": 14,
"metadata": {},
"outputs": [],
"source": [
"df = pd.DataFrame({\"a\": [1,2,3], \"b\": [1,2,3], \"c\": [1,2,3]})"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"**Adding a column**\n",
"\n",
"When using the `transform()`, the `*` in a schema expression means all existing columns. From there we can add new columns by adding `\",column_name:type\"`. "
]
},
{
"cell_type": "code",
"execution_count": 15,
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
"\n",
"\n",
"
\n",
" \n",
" \n",
" | \n",
" a | \n",
" b | \n",
" c | \n",
" new_col | \n",
"
\n",
" \n",
" \n",
" \n",
" 0 | \n",
" 1 | \n",
" 1 | \n",
" 1 | \n",
" 2 | \n",
"
\n",
" \n",
" 1 | \n",
" 2 | \n",
" 2 | \n",
" 2 | \n",
" 3 | \n",
"
\n",
" \n",
" 2 | \n",
" 3 | \n",
" 3 | \n",
" 3 | \n",
" 4 | \n",
"
\n",
" \n",
"
\n",
"
"
],
"text/plain": [
" a b c new_col\n",
"0 1 1 1 2\n",
"1 2 2 2 3\n",
"2 3 3 3 4"
]
},
"execution_count": 15,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"def add_col(df: pd.DataFrame) -> pd.DataFrame:\n",
" return df.assign(new_col=df[\"a\"] + 1)\n",
"\n",
"transform(\n",
" df=df, \n",
" using=add_col, \n",
" schema=\"*,new_col:int\"\n",
" )"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"**Entirely new schema**\n",
"\n",
"There is no need to use the `*` operation. We can just specify all columns."
]
},
{
"cell_type": "code",
"execution_count": 16,
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
"\n",
"\n",
"
\n",
" \n",
" \n",
" | \n",
" x | \n",
" y | \n",
"
\n",
" \n",
" \n",
" \n",
" 0 | \n",
" 1 | \n",
" a | \n",
"
\n",
" \n",
" 1 | \n",
" 2 | \n",
" b | \n",
"
\n",
" \n",
" 2 | \n",
" 3 | \n",
" c | \n",
"
\n",
" \n",
"
\n",
"
"
],
"text/plain": [
" x y\n",
"0 1 a\n",
"1 2 b\n",
"2 3 c"
]
},
"execution_count": 16,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"def new_df(df: pd.DataFrame) -> pd.DataFrame:\n",
" return pd.DataFrame({\"x\": [1,2,3], \"y\": [\"a\",\"b\",\"c\"]})\n",
"\n",
"transform(\n",
" df=df, \n",
" using=new_df, \n",
" schema=\"x:int,y:str\"\n",
" )"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"There are schema operations. For a deeper look, check the [Schema](../beginner/schema.ipynb) section."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Partitioning\n",
"\n",
"The type hint conversion we saw earlier is not applied at the DataFrame level but rather on the partition level. If no partitions are supplied, the default engine partitions are used. To get a better clue of partitions, look at the following data."
]
},
{
"cell_type": "code",
"execution_count": 17,
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
"\n",
"\n",
"
\n",
" \n",
" \n",
" | \n",
" col1 | \n",
" col2 | \n",
"
\n",
" \n",
" \n",
" \n",
" 0 | \n",
" a | \n",
" 1 | \n",
"
\n",
" \n",
" 1 | \n",
" a | \n",
" 2 | \n",
"
\n",
" \n",
" 2 | \n",
" a | \n",
" 3 | \n",
"
\n",
" \n",
" 3 | \n",
" b | \n",
" 4 | \n",
"
\n",
" \n",
" 4 | \n",
" b | \n",
" 5 | \n",
"
\n",
" \n",
"
\n",
"
"
],
"text/plain": [
" col1 col2\n",
"0 a 1\n",
"1 a 2\n",
"2 a 3\n",
"3 b 4\n",
"4 b 5"
]
},
"execution_count": 17,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"df = pd.DataFrame({\"col1\": [\"a\",\"a\",\"a\",\"b\",\"b\",\"b\"], \n",
" \"col2\": [1,2,3,4,5,6]})\n",
"df.head()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"First, we create a function that gets the min and max of each group. By the time this function is run, the data will already be split such that there is one group per partition. Note the output is a `List[Dict[str,Any]]` but we are taking advantage of Fugue to handle the conversion."
]
},
{
"cell_type": "code",
"execution_count": 18,
"metadata": {},
"outputs": [],
"source": [
"def min_max(df:pd.DataFrame) -> List[Dict[str,Any]]:\n",
" return [{\"group\": df.iloc[0][\"col1\"], \n",
" \"max\": df['col2'].max(), \n",
" \"min\": df['col2'].min()}]"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"We can then pass the partitioning strategy to the `transform()` function. In this example, we use Dask as the engine. "
]
},
{
"cell_type": "code",
"execution_count": 19,
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
"\n",
"\n",
"
\n",
" \n",
" \n",
" | \n",
" group | \n",
" max | \n",
" min | \n",
"
\n",
" \n",
" \n",
" \n",
" 0 | \n",
" a | \n",
" 3 | \n",
" 1 | \n",
"
\n",
" \n",
" 1 | \n",
" b | \n",
" 6 | \n",
" 4 | \n",
"
\n",
" \n",
"
\n",
"
"
],
"text/plain": [
" group max min\n",
"0 a 3 1\n",
"1 b 6 4"
]
},
"execution_count": 19,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"transform(df, \n",
" min_max, \n",
" schema=\"group:str,max:int,min:int\", \n",
" engine=\"dask\",\n",
" partition={\"by\":\"col1\"},\n",
" as_local=True)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"On Pandas, the `partition-transform` semantic is close to a `groupby-apply`. The difference is that the `partition-transform` paradigm also extends to distributed computing where we control the movement of the physical location of the data. Again, the expression above will also work on Spark, Dask, and Ray by supplying the engine."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Presort\n",
"\n",
"During the partition operation, we can specify a `presort` so that the data comes in sorted before the function is applied. For example, we can get the top 2 rows of each group using the function below. **This is needed because distributed engines do not guarantee order is preserved when data is partitioned**. "
]
},
{
"cell_type": "code",
"execution_count": 20,
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
"\n",
"\n",
"
\n",
" \n",
" \n",
" | \n",
" col1 | \n",
" col2 | \n",
"
\n",
" \n",
" \n",
" \n",
" 0 | \n",
" a | \n",
" 3 | \n",
"
\n",
" \n",
" 1 | \n",
" a | \n",
" 2 | \n",
"
\n",
" \n",
" 2 | \n",
" b | \n",
" 6 | \n",
"
\n",
" \n",
" 3 | \n",
" b | \n",
" 5 | \n",
"
\n",
" \n",
"
\n",
"
"
],
"text/plain": [
" col1 col2\n",
"0 a 3\n",
"1 a 2\n",
"2 b 6\n",
"3 b 5"
]
},
"execution_count": 20,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"def top_two(df:List[Dict[str,Any]]) -> Iterable[Dict[str,Any]]:\n",
" n = 0\n",
" while n < 2:\n",
" yield df[n]\n",
" n = n + 1\n",
"\n",
"transform(\n",
" df=df, \n",
" using=top_two, \n",
" schema=\"*\", \n",
" partition={\"by\":\"col1\", \"presort\": \"col2 desc\"}\n",
" ) "
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Loading and Saving Files\n",
"\n",
"The `transform()` function is the most minimal function of Fugue, allowing users to distribute one step in their workflow. However, it will not be enough to express end-to-end data workflows that are agnostic to the execution engine. For example, users may still have code that looks like this:\n",
"\n",
"```python\n",
"result = transform(df, fn, engine=spark)\n",
"result.write.parquet(\"/tmp/out.parquet\")\n",
"```\n",
"\n",
"This still has a dependency on Spark. To support end-to-end workflows, Fugue has other functions compatible with any backend. Here, we look at loading and saving. First we create an example file:"
]
},
{
"cell_type": "code",
"execution_count": 21,
"metadata": {},
"outputs": [],
"source": [
"df = pd.DataFrame({\"a\": [1,2,3], \"b\": [1,2,3], \"c\": [1,2,3]})\n",
"df.to_parquet(\"/tmp/df.parquet\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Then we use the Fugue API to call engine-agnostic functions `load` and `save`. The `fa.transform()` function seen below is the same as the `transform()` function used earlier."
]
},
{
"cell_type": "code",
"execution_count": 22,
"metadata": {},
"outputs": [],
"source": [
"import fugue.api as fa\n",
"\n",
"def add_col(df:pd.DataFrame) -> pd.DataFrame:\n",
" return df.assign(sum=df['a']+df['b']+df['c'])\n",
"\n",
"df = fa.load(\"/tmp/df.parquet\", engine=spark)\n",
"out = fa.transform(df ,add_col, schema=\"*,sum:int\", engine=spark)\n",
"fa.save(out, \"/tmp/out.parquet\" ,engine=spark)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"This gives us an end-to-end workflow where we can pass in the `engine` as a variable. If the `engine` is not passed or is `None`, all the operations will run on Pandas."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Engine Context\n",
"\n",
"The load, transform, save operations above all use `spark` as the engine. Instead of having to write out the engine multiple times, it will be more convenient to specify the execution engine once by using the `engine_context`. "
]
},
{
"cell_type": "code",
"execution_count": 23,
"metadata": {},
"outputs": [],
"source": [
"with fa.engine_context(spark):\n",
" df = fa.load(\"/tmp/df.parquet\")\n",
" out = fa.transform(df ,add_col, schema=\"*,sum:int\")\n",
" fa.save(out, \"/tmp/out.parquet\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Now all of these operations will use the Spark Execution Engine. Similar to the ways to specified the execution earlier, passing nothing will use Pandas. Dask and Ray can also be used in the `engine_context`. This allows users to split out their logic into functional groups based on the execution engine, allowing for workflows that combine Pandas and Spark (for example) elegantly.\n",
"\n",
"The engine specified in the `engine_context` is a default, meaning that the default can be overridden inside the context if we pass it in a Fugue API function. For example:"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"with fa.engine_context(spark):\n",
" df = fa.load(\"/tmp/df.parquet\", engine=\"pandas\") # run this step on Pandas\n",
" out = fa.transform(df ,add_col, schema=\"*,sum:int\")\n",
" fa.save(out, \"/tmp/out.parquet\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Conclusion\n",
"\n",
"The Fugue `transform()` function is the simplest interface for Fugue. It handles the execution of one function across Pandas, Spark, Dask, and Ray. Most users can easily adopt this minimal wrapper to parallelize existing code that they have already written. It is also minimally invasive, and a lot of users just use `transform()` for a single step they want to distribute.\n",
"\n",
"For end-to-end workflows that are engine-agnostic, see the [Fugue API tutorial](../beginner/engine_context.ipynb). There are many more engine-agnostic functions available for users to create end-to-end workflows that can be run on Pandas, Spark, Dask, or Ray just by changing the engine.\n",
"\n",
"To see other functions Fugue has, check the [top level API docs](https://fugue.readthedocs.io/en/latest/top_api.html)\n",
"\n",
"For any questions, free free to reach out on [Slack](http://slack.fugue.ai)"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3 (ipykernel)",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.8.13"
},
"vscode": {
"interpreter": {
"hash": "9fcd6e71927f6b3e5f4fa4280b4e8e6a66aa8d4365bb61cf7ef4017620fc09b9"
}
}
},
"nbformat": 4,
"nbformat_minor": 2
}