Type Hinting#

Have questions? Chat with us on Github or Slack:

Homepage Slack Status

In the previous section, we saw how the transform() function was used to bring a Python function to Spark, Dask, or Ray. In this section, we take a deeper look at the transform() function and what types of inputs and outputs it can handle.

In the last section, the function we brought to Spark looked like this:

def predict(df: pd.DataFrame, model: LinearRegression) -> pd.DataFrame:
    return df.assign(predicted=model.predict(df))

It had an input of type pd.DataFrame and an output of type pd.DataFrame. Fugue applies these type annotations and converts the input to the type specified. The output annotation is also used to convert back to the engine DataFrame (Pandas, Spark, Dask or Ray). In this section, we will see some other examples of type annotations that Fugue can handle.

For those more familiar with distributed computing, the conversion is done for each partition. This will be explained more in later sections, but the important concept is that Fugue does not convert the whole Spark or Dask DataFrame to Pandas to perform the operation. The conversion happens on a portion of the distributed DataFrame.

The conversion overhead has been benchmarked and found to be negligible, especially for big data.

Sample Problem and transform()#

In this sample problem, we are interested in getting the first three digits of the phone column and populating a new column called location by using a dictionary that maps the values. We start by preparing the sample data and defining the mapping.

import pandas as pd

_area_code_map = {"217": "Champaign, IL", "407": "Orlando, FL", "510": "Fremont, CA"}

data = pd.DataFrame({"phone": ["(217)-123-4567", "(217)-234-5678", "(407)-123-4567", 
                               "(407)-234-5678", "(510)-123-4567"]})

First, we’ll perform the operation in Pandas. It’s very simple because of the .map() method in Pandas. We then test the function.

def map_phone_to_location(df: pd.DataFrame) -> pd.DataFrame:
    df["location"] = df["phone"].str.slice(1,4).map(_area_code_map)
    return df

map_phone_to_location(data.copy())
phone location
0 (217)-123-4567 Champaign, IL
1 (217)-234-5678 Champaign, IL
2 (407)-123-4567 Orlando, FL
3 (407)-234-5678 Orlando, FL
4 (510)-123-4567 Fremont, CA

Similar to the function in the previous section, this function can be used in Spark on big data with Fugue’s transform() function. For now, we will leave the engine blank to use the default Pandas-based engine.

from fugue import transform

transform(data.copy(),
          map_phone_to_location,
          schema="*, location:str").head(2)
phone location
0 (217)-123-4567 Champaign, IL
1 (217)-234-5678 Champaign, IL

Role of Type Hints#

The type hints are used by Fugue used to convert each partition of the distributed DataFrame. Below is a diagram that illustrates this process.

img

Conversion is needed because Spark partitions are not Pandas DataFrames. By converting them, we can apply any Python or Pandas code that we are used to. The point here is that we decompose the big data into several, more manageable, small data problems. The pd.DataFrame type is the most straightforward one, but distributed computing often goes beyond the semantics of Pandas. For example, what if we want to use Spark to process a list of image files? More generally, how do we use the distributed computing frameworks to perform general jobs?

Different Input and Output Annotations#

The map_phone_to_location() logic can actually be expressed in native Python. Below are three valid implementations of the same function. The map_phone_to_location3() below is less practical, but it’s just to demo the varying types that Fugue can take. List[List] will be useful in some use cases where you want to perform row-wise operations on DataFrames.

from typing import List, Dict, Any, Iterable

def map_phone_to_location2(df: List[Dict[str,Any]]) -> Iterable[Dict[str,Any]]:
    for row in df:
        row["location"] = _area_code_map[row["phone"][1:4]]
        yield row

def map_phone_to_location3(df: List[List[Any]]) -> List[List[Any]]:
    for row in df:
        row.append(_area_code_map[row[0][1:4]])
    return df

def map_phone_to_location4(df: List[List[Any]]) -> pd.DataFrame:
    for row in df:
        row.append(_area_code_map[row[0][1:4]])
    df = pd.DataFrame.from_records(df, columns=["phone", "location"])
    return df

Note that map_phone_to_location4() shows that the input and output types can differ. We can test these functions by passing some input.

map_phone_to_location2([{"phone": "(217)-123-4567"}, {"phone": "(217)-234-5678"}])
<generator object map_phone_to_location2 at 0x7fb30bd28050>
map_phone_to_location3([["(217)-123-4567"], ["(217)-234-5678"]])
[['(217)-123-4567', 'Champaign, IL'], ['(217)-234-5678', 'Champaign, IL']]
map_phone_to_location4([["(217)-123-4567"], ["(217)-234-5678"]])
phone location
0 (217)-123-4567 Champaign, IL
1 (217)-234-5678 Champaign, IL

Even if these functions are not meant to operate on Pandas, Spark, Dask, or Ray DataFrames, Fugue transform() will handle the conversion for us so we don’t need to make any changes to them to use them. We test these functions on the default Pandas-based engine first.

transform(data.copy(),
          map_phone_to_location2,
          schema="*, location:str").head(2)
phone location
0 (217)-123-4567 Champaign, IL
1 (217)-234-5678 Champaign, IL
transform(data.copy(),
          map_phone_to_location3,
          schema="*, location:str").head(2)
phone location
0 (217)-123-4567 Champaign, IL
1 (217)-234-5678 Champaign, IL
transform(data.copy(),
          map_phone_to_location4,
          schema="*, location:str").head(2)
phone location
0 (217)-123-4567 Champaign, IL
1 (217)-234-5678 Champaign, IL

Bring the Functions to Distrbited Compute#

Because they work on the Pandas-based engine, we can also use the functions on Spark, Dask, and Ray. Any of the map_to_phone_location() variations can be used below.

from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

spark_df = transform(data.copy(),
                     map_phone_to_location2,    # Iterable[List] implementation
                     schema="*, location:str",
                     engine=spark)

spark_df.show(2)
+--------------+-------------+
|         phone|     location|
+--------------+-------------+
|(217)-123-4567|Champaign, IL|
|(217)-234-5678|Champaign, IL|
+--------------+-------------+
only showing top 2 rows

Valid input and output types#

When using the transform() function, Fugue is converting the function into a Transformer object under the hood. The full list valid input and output annotations can be found in the Transformer extension docs.

Conclusion#

In this section, we have shown how the transform() function can adapt to user’s code by accepting multiple input and output type annotations. This allows users to express their logic in whatever is best for the given problem. Fugue then uses the specified annotations and takes care of bringing these functions to Spark, Dask, or Ray.

In the next section, we’ll cover the schema requirement when using the transform(). We’ll cover both by it’s necessary and how Fugue simplifies this for users.