Saving and Loading#

Have questions? Chat with us on Github or Slack:

Homepage Slack Status

So far, we’ve only covered modifying data with the transform() function. We constructed or loaded DataFrames with Pandas and then applied the transformation with a distributed computing engine. This setup will become a bottleneck for large files since we are loading everything at once on the driver node. On the other hand, loading a DataFrame using Spark, Dask, or Ray locks in the code to those frameworks.

In order to make end-to-end workflows that are compatible with all backends, Fugue exposes two main ways to to load and save data that are compatible with all backends. The first is with the transform() function. The second is using the load() and save() functions of the Fugue API.

transform() using file path#

The transform() function can take in a file path instead of a DataFrame to load in data before performing the transformation. The engine specified will be used to directly load the file. First, we make an example file:

import pandas as pd
from fugue import transform

df = pd.DataFrame({"a": [1,2]})
df.to_parquet("/tmp/f.parquet")

Now we use the Dask engine to load in the data and apply the dummy() function.

def dummy(df:pd.DataFrame) -> pd.DataFrame:
    return df

res = transform("/tmp/f.parquet", dummy, schema="*", engine="dask")
res.compute()
a
0 1
1 2

To save the results, the transform() function can also take in a save_path argument. By default, it will return the path where it was saved, which is helpful for consecutive transform() calls.

transform("/tmp/f.parquet", dummy, schema="*", engine="dask", save_path="/tmp/f_out.parquet")
'/tmp/f_out.parquet'

load() and save()#

The Fugue API also has load() and save() methods that are compatible with any engine. These are capable of loading parquet, csv, and json files. Using parquet when possible is best practice because it contains schema information and does not require additional keywords to parse. These functions can be used independently similar to the transform() function.

import fugue.api as fa

df = fa.load("/tmp/f.parquet", engine="dask")
res = fa.transform(df, dummy, schema="*", engine="dask")
fa.save(res, "/tmp/f_out.parquet", engine="dask")

Using these functions gives additional control over loading and saving compared to using the transform() function’s saving and loading capabilities. Note that the fa.transform() in the cell above is exactly the same as the transform() function covered in earlier sections.

df = pd.DataFrame({"col1": [1,2,3], "col2": [1,2,3]})

fa.save(df, '/tmp/data.parquet', mode='overwrite')
fa.save(df, '/tmp/data.csv', mode='overwrite', header=True)
df2 = fa.load('/tmp/data.parquet')
df3 = fa.load("/tmp/data.csv", header=True, columns="col1:int")
df3
col1
0 1
1 2
2 3

The columns argument of load() takes a Fugue schema expression and limits the columns loaded.

Summary#

In this section, we learned how save and load DataFrames in an engine-agnostic way. Both methods presented in this section will work across all execution engines. There were some code snippets where we had to repeat engine="dask" multiple times. This can be redundant and tedious to type out. In practice, we can define the engine once by using the engine_context() we’ll learn next section.