Type Flexibility#

In the previous section, we saw how the transform() function was used to bring a Python function to Spark or Dask. In this section, we take a deeper look at the transform() function and what types of inputs and outputs it can handle.

In the last section, the function we brought to Spark looked like this:

def predict(df: pd.DataFrame, model: LinearRegression) -> pd.DataFrame:
    return df.assign(predicted=model.predict(df))

It had an input of type pd.DataFrame and an output of type pd.DataFrame. Fugue applies these type annotations and converts the input to the type specified. The output annotation is also used to convert back to the engine DataFrame (pandas, Spark or Dask). In this section, we will see some other examples of type annotations that Fugue can handle.

For those more familiar with distributed computing, the conversion is done for each partition. This will be explained more in later sections, but the important concept is that Fugue does not convert the whole Spark or Dask DataFrame to pandas to perform the operation. The conversion overhead has been benchmarked and found to be negligible, especially for big data.

Sample Problem and transform()#

In this sample problem, we are interested in getting the first three digits of the phone column and populating a new column called location by using a dictionary that maps the values. We start by preparing the sample data and defining the mapping.

import pandas as pd

_area_code_map = {"217": "Champaign, IL", "407": "Orlando, FL", "510": "Fremont, CA"}

data = pd.DataFrame({"phone": ["(217)-123-4567", "(217)-234-5678", "(407)-123-4567", 
                               "(407)-234-5678", "(510)-123-4567"]})

First, we’ll perform the operation in pandas. It’s very simple because of the .map() method in pandas. We then test the function.

def map_phone_to_location(df: pd.DataFrame) -> pd.DataFrame:
    df["location"] = df["phone"].str.slice(1,4).map(_area_code_map)
    return df

map_phone_to_location(data.copy())
phone location
0 (217)-123-4567 Champaign, IL
1 (217)-234-5678 Champaign, IL
2 (407)-123-4567 Orlando, FL
3 (407)-234-5678 Orlando, FL
4 (510)-123-4567 Fremont, CA

Similar to the function in the previous section, this function can be used in Spark on big data with Fugue’s transform() function. For now, we will leave the engine blank to use the default pandas-based NativeExecutionEngine.

from fugue import transform

transform(data.copy(),
          map_phone_to_location,
          schema="*, location:str").head(2)
phone location
0 (217)-123-4567 Champaign, IL
1 (217)-234-5678 Champaign, IL

Different Input and Output Annotations#

The map_phone_to_location() logic can actually be expressed in native Python. Below are three valid implementations of the same function. The map_phone_to_location3() below is less practical, but it’s just to demo the varying types that Fugue can take. List[List] will be useful in some use cases where you want to perform row-wise operations on DataFrames.

from typing import List, Dict, Any, Iterable

def map_phone_to_location2(df: List[Dict[str,Any]]) -> Iterable[Dict[str,Any]]:
    for row in df:
        row["location"] = _area_code_map[row["phone"][1:4]]
        yield row

def map_phone_to_location3(df: List[List[Any]]) -> List[List[Any]]:
    for row in df:
        row.append(_area_code_map[row[0][1:4]])
    return df

def map_phone_to_location4(df: List[List[Any]]) -> pd.DataFrame:
    for row in df:
        row.append(_area_code_map[row[0][1:4]])
    df = pd.DataFrame.from_records(df, columns=["phone", "location"])
    return df

Note that map_phone_to_location4() shows that the input and output types can differ. We can test these functions by passing some input.

map_phone_to_location2([{"phone": "(217)-123-4567"}, {"phone": "(217)-234-5678"}])
<generator object map_phone_to_location2 at 0x7fb30bd28050>
map_phone_to_location3([["(217)-123-4567"], ["(217)-234-5678"]])
[['(217)-123-4567', 'Champaign, IL'], ['(217)-234-5678', 'Champaign, IL']]
map_phone_to_location4([["(217)-123-4567"], ["(217)-234-5678"]])
phone location
0 (217)-123-4567 Champaign, IL
1 (217)-234-5678 Champaign, IL

Even if these functions are not meant to operate on pandas DataFrames, Fugue transform() will handle the conversion for us so we don’t need to make any changes to them to use them on pandas, Spark or Dask. We test these functions on the default pandas-based NativeExecutionEngine first.

transform(data.copy(),
          map_phone_to_location2,
          schema="*, location:str").head(2)
phone location
0 (217)-123-4567 Champaign, IL
1 (217)-234-5678 Champaign, IL
transform(data.copy(),
          map_phone_to_location3,
          schema="*, location:str").head(2)
phone location
0 (217)-123-4567 Champaign, IL
1 (217)-234-5678 Champaign, IL
transform(data.copy(),
          map_phone_to_location4,
          schema="*, location:str").head(2)
phone location
0 (217)-123-4567 Champaign, IL
1 (217)-234-5678 Champaign, IL

Bring the Functions to Distrbited Compute#

Because they work on the NativeExecutionEngine, we can also use the functions on the SparkExecutionEngine. Any of the map_to_phone_location() variations can be used below.

import fugue_spark

spark_df = transform(data.copy(),
                     map_phone_to_location2,    # Iterable[List] implementation
                     schema="*, location:str",
                     engine="spark")

spark_df.show(2)
+--------------+-------------+
|         phone|     location|
+--------------+-------------+
|(217)-123-4567|Champaign, IL|
|(217)-234-5678|Champaign, IL|
+--------------+-------------+
only showing top 2 rows

Valid input and output types#

When using the transform() function, Fugue is converting the function into a Transformer object under the hood. The full list valid input and output annotations can be found in the Transformer extension docs.

Conclusion#

In this section, we have shown how the transform() function can adapt to user’s code by accepting multiple input and output type annotations. This allows users to express their logic in whatever is best for the given problem. Fugue then uses the specified annotations and takes care of bringing these functions to Spark or Dask.

In the next section, we’ll cover how the transform() function handles partitions, a very important concept in distributed computing.