{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Saving and Loading\n", "\n", "Have questions? Chat with us on Github or Slack:\n", "\n", "[![Homepage](https://img.shields.io/badge/fugue-source--code-red?logo=github)](https://github.com/fugue-project/fugue)\n", "[![Slack Status](https://img.shields.io/badge/slack-join_chat-white.svg?logo=slack&style=social)](http://slack.fugue.ai)\n", "\n", "So far, we've only covered modifying data with the `transform()` function. We constructed or loaded DataFrames with Pandas and then applied the transformation with a distributed computing engine. This setup will become a bottleneck for large files since we are loading everything at once on the driver node. On the other hand, loading a DataFrame using Spark, Dask, or Ray locks in the code to those frameworks.\n", "\n", "In order to make end-to-end workflows that are compatible with all backends, Fugue exposes two main ways to to load and save data that are compatible with all backends. The first is with the `transform()` function. The second is using the `load()` and `save()` functions of the Fugue API.\n", "\n", "## transform() using file path\n", "\n", "The `transform()` function can take in a file path instead of a DataFrame to load in data before performing the transformation. The engine specified will be used to directly load the file. First, we make an example file:" ] }, { "cell_type": "code", "execution_count": 10, "metadata": {}, "outputs": [], "source": [ "import pandas as pd\n", "from fugue import transform\n", "\n", "df = pd.DataFrame({\"a\": [1,2]})\n", "df.to_parquet(\"/tmp/f.parquet\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Now we use the Dask engine to load in the data and apply the `dummy()` function." ] }, { "cell_type": "code", "execution_count": 11, "metadata": {}, "outputs": [ { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
a
01
12
\n", "
" ], "text/plain": [ " a\n", "0 1\n", "1 2" ] }, "execution_count": 11, "metadata": {}, "output_type": "execute_result" } ], "source": [ "def dummy(df:pd.DataFrame) -> pd.DataFrame:\n", " return df\n", "\n", "res = transform(\"/tmp/f.parquet\", dummy, schema=\"*\", engine=\"dask\")\n", "res.compute()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "To save the results, the `transform()` function can also take in a `save_path` argument. By default, it will return the path where it was saved, which is helpful for consecutive `transform()` calls." ] }, { "cell_type": "code", "execution_count": 12, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "'/tmp/f_out.parquet'" ] }, "execution_count": 12, "metadata": {}, "output_type": "execute_result" } ], "source": [ "transform(\"/tmp/f.parquet\", dummy, schema=\"*\", engine=\"dask\", save_path=\"/tmp/f_out.parquet\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## load() and save()\n", "\n", "The Fugue API also has `load()` and `save()` methods that are compatible with any engine. These are capable of loading `parquet`, `csv`, and `json` files. Using `parquet` when possible is best practice because it contains schema information and does not require additional keywords to parse. These functions can be used independently similar to the `transform()` function." ] }, { "cell_type": "code", "execution_count": 13, "metadata": {}, "outputs": [], "source": [ "import fugue.api as fa\n", "\n", "df = fa.load(\"/tmp/f.parquet\", engine=\"dask\")\n", "res = fa.transform(df, dummy, schema=\"*\", engine=\"dask\")\n", "fa.save(res, \"/tmp/f_out.parquet\", engine=\"dask\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Using these functions gives additional control over loading and saving compared to using the `transform()` function's saving and loading capabilities. Note that the `fa.transform()` in the cell above is exactly the same as the `transform()` function covered in earlier sections." ] }, { "cell_type": "code", "execution_count": 15, "metadata": {}, "outputs": [ { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
col1
01
12
23
\n", "
" ], "text/plain": [ " col1\n", "0 1\n", "1 2\n", "2 3" ] }, "execution_count": 15, "metadata": {}, "output_type": "execute_result" } ], "source": [ "df = pd.DataFrame({\"col1\": [1,2,3], \"col2\": [1,2,3]})\n", "\n", "fa.save(df, '/tmp/data.parquet', mode='overwrite')\n", "fa.save(df, '/tmp/data.csv', mode='overwrite', header=True)\n", "df2 = fa.load('/tmp/data.parquet')\n", "df3 = fa.load(\"/tmp/data.csv\", header=True, columns=\"col1:int\")\n", "df3" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The `columns` argument of `load()` takes a Fugue schema expression and limits the columns loaded." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Summary\n", "\n", "In this section, we learned how save and load DataFrames in an engine-agnostic way. Both methods presented in this section will work across all execution engines. There were some code snippets where we had to repeat `engine=\"dask\"` multiple times. This can be redundant and tedious to type out. In practice, we can define the engine once by using the `engine_context()` we'll learn next section." ] } ], "metadata": { "kernelspec": { "display_name": "Python 3.8.13 ('fugue')", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.8.13" }, "orig_nbformat": 2, "vscode": { "interpreter": { "hash": "9fcd6e71927f6b3e5f4fa4280b4e8e6a66aa8d4365bb61cf7ef4017620fc09b9" } } }, "nbformat": 4, "nbformat_minor": 2 }