{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Saving and Loading\n",
"\n",
"Have questions? Chat with us on Github or Slack:\n",
"\n",
"[![Homepage](https://img.shields.io/badge/fugue-source--code-red?logo=github)](https://github.com/fugue-project/fugue)\n",
"[![Slack Status](https://img.shields.io/badge/slack-join_chat-white.svg?logo=slack&style=social)](http://slack.fugue.ai)\n",
"\n",
"So far, we've only covered modifying data with the `transform()` function. We constructed or loaded DataFrames with Pandas and then applied the transformation with a distributed computing engine. This setup will become a bottleneck for large files since we are loading everything at once on the driver node. On the other hand, loading a DataFrame using Spark, Dask, or Ray locks in the code to those frameworks.\n",
"\n",
"In order to make end-to-end workflows that are compatible with all backends, Fugue exposes two main ways to to load and save data that are compatible with all backends. The first is with the `transform()` function. The second is using the `load()` and `save()` functions of the Fugue API.\n",
"\n",
"## transform() using file path\n",
"\n",
"The `transform()` function can take in a file path instead of a DataFrame to load in data before performing the transformation. The engine specified will be used to directly load the file. First, we make an example file:"
]
},
{
"cell_type": "code",
"execution_count": 10,
"metadata": {},
"outputs": [],
"source": [
"import pandas as pd\n",
"from fugue import transform\n",
"\n",
"df = pd.DataFrame({\"a\": [1,2]})\n",
"df.to_parquet(\"/tmp/f.parquet\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Now we use the Dask engine to load in the data and apply the `dummy()` function."
]
},
{
"cell_type": "code",
"execution_count": 11,
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
"
\n",
"\n",
"
\n",
" \n",
" \n",
" | \n",
" a | \n",
"
\n",
" \n",
" \n",
" \n",
" 0 | \n",
" 1 | \n",
"
\n",
" \n",
" 1 | \n",
" 2 | \n",
"
\n",
" \n",
"
\n",
"
"
],
"text/plain": [
" a\n",
"0 1\n",
"1 2"
]
},
"execution_count": 11,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"def dummy(df:pd.DataFrame) -> pd.DataFrame:\n",
" return df\n",
"\n",
"res = transform(\"/tmp/f.parquet\", dummy, schema=\"*\", engine=\"dask\")\n",
"res.compute()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"To save the results, the `transform()` function can also take in a `save_path` argument. By default, it will return the path where it was saved, which is helpful for consecutive `transform()` calls."
]
},
{
"cell_type": "code",
"execution_count": 12,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"'/tmp/f_out.parquet'"
]
},
"execution_count": 12,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"transform(\"/tmp/f.parquet\", dummy, schema=\"*\", engine=\"dask\", save_path=\"/tmp/f_out.parquet\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## load() and save()\n",
"\n",
"The Fugue API also has `load()` and `save()` methods that are compatible with any engine. These are capable of loading `parquet`, `csv`, and `json` files. Using `parquet` when possible is best practice because it contains schema information and does not require additional keywords to parse. These functions can be used independently similar to the `transform()` function."
]
},
{
"cell_type": "code",
"execution_count": 13,
"metadata": {},
"outputs": [],
"source": [
"import fugue.api as fa\n",
"\n",
"df = fa.load(\"/tmp/f.parquet\", engine=\"dask\")\n",
"res = fa.transform(df, dummy, schema=\"*\", engine=\"dask\")\n",
"fa.save(res, \"/tmp/f_out.parquet\", engine=\"dask\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Using these functions gives additional control over loading and saving compared to using the `transform()` function's saving and loading capabilities. Note that the `fa.transform()` in the cell above is exactly the same as the `transform()` function covered in earlier sections."
]
},
{
"cell_type": "code",
"execution_count": 15,
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
"\n",
"\n",
"
\n",
" \n",
" \n",
" | \n",
" col1 | \n",
"
\n",
" \n",
" \n",
" \n",
" 0 | \n",
" 1 | \n",
"
\n",
" \n",
" 1 | \n",
" 2 | \n",
"
\n",
" \n",
" 2 | \n",
" 3 | \n",
"
\n",
" \n",
"
\n",
"
"
],
"text/plain": [
" col1\n",
"0 1\n",
"1 2\n",
"2 3"
]
},
"execution_count": 15,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"df = pd.DataFrame({\"col1\": [1,2,3], \"col2\": [1,2,3]})\n",
"\n",
"fa.save(df, '/tmp/data.parquet', mode='overwrite')\n",
"fa.save(df, '/tmp/data.csv', mode='overwrite', header=True)\n",
"df2 = fa.load('/tmp/data.parquet')\n",
"df3 = fa.load(\"/tmp/data.csv\", header=True, columns=\"col1:int\")\n",
"df3"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"The `columns` argument of `load()` takes a Fugue schema expression and limits the columns loaded."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Summary\n",
"\n",
"In this section, we learned how save and load DataFrames in an engine-agnostic way. Both methods presented in this section will work across all execution engines. There were some code snippets where we had to repeat `engine=\"dask\"` multiple times. This can be redundant and tedious to type out. In practice, we can define the engine once by using the `engine_context()` we'll learn next section."
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3.8.13 ('fugue')",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.8.13"
},
"orig_nbformat": 2,
"vscode": {
"interpreter": {
"hash": "9fcd6e71927f6b3e5f4fa4280b4e8e6a66aa8d4365bb61cf7ef4017620fc09b9"
}
}
},
"nbformat": 4,
"nbformat_minor": 2
}