Partitioning#

In the last section, we showed how the transform() function could be applied on functions with inputs such as List[List[Any]] or List[Dict[str,Any]] besides pd.DataFrame. In this section, we’ll show a feature of transform() we have not touched on yet, the partition argument.

The partition argument allows us to control the partitoning scheme before the transform() operation is applied. Partitions dictate the physical grouping of data within a cluster.

Simple Partitioning Example#

In the DataFrame below, we want to take the difference of the value per day. Because there are three different ids, we want to make sure that we don’t get the difference across ids.

import pandas as pd 

data = pd.DataFrame({"date":["2021-01-01", "2021-01-02", "2021-01-03"] * 3,
                   "id": (["A"]*3 + ["B"]*3 + ["C"]*3),
                   "value": [3, 4, 2, 1, 2, 5, 3, 2, 3]})
data.head()
date id value
0 2021-01-01 A 3
1 2021-01-02 A 4
2 2021-01-03 A 2
3 2021-01-01 B 1
4 2021-01-02 B 2

Now we create a function that takes in a pd.DataFrame and outputs a pd.DataFrame. This will allow us to bring the logic to Spark and Dask as we’ve seen before.

def diff(df: pd.DataFrame) -> pd.DataFrame:
    df['diff'] = df['value'].diff()
    return df

But if we use this directly as seen below, we will notice that the first day of B has a value which is invalid because it used a value from A to calculate the difference.

from fugue import transform
transform(data.copy(), 
          diff, 
          schema="*, diff:int").head()
date id value diff
0 2021-01-01 A 3 NaN
1 2021-01-02 A 4 1.0
2 2021-01-03 A 2 -2.0
3 2021-01-01 B 1 -1.0
4 2021-01-02 B 2 1.0

This can be solved by passing the partitions to Fugue’s transform(). Now we see that this will correctly give NaN for the first value of B.

transform(data.copy(), 
          diff, 
          schema="*, diff:int",
          partition={"by": "id"}).head()
date id value diff
0 2021-01-01 A 3 NaN
1 2021-01-02 A 4 1.0
2 2021-01-03 A 2 -2.0
3 2021-01-01 B 1 NaN
4 2021-01-02 B 2 1.0

Default Partitions#

What happens if we don’t supply partitions when we call transform()? For Spark and Dask, there are default partitions that are used. For some operations, row-wise operations for example, the default partitions should work. But when your groups of data need to be processed together, then partitions should be specified as the grouping mechanism.

To see what partitions look like, we create a count() function that will just count the number of elements in a given partition. If we use it naively without specifying partitions, we will see that the data is not grouped properly. There are many partitions with just one item of data in it.

from typing import List, Dict, Any
import fugue_spark

def count(df: pd.DataFrame) -> List[Dict[str,Any]]:
    return [{"id": df.iloc[0]["id"], "count": df.shape[0]}]

transform(data.copy(),
          count,
          schema="id:str, count:int",
          engine="spark").show(5)

+---+-----+
| id|count|
+---+-----+
|  A|    1|
|  A|    1|
|  A|    1|
|  B|    1|
|  B|    1|
+---+-----+
only showing top 5 rows

But if we specify the partitions by id, then each id will be grouped into the same partition.

transform(data.copy(),
          count,
          schema="id:str, count:int",
          engine="spark",
          partition={"by":"id"}).show()
+---+-----+
| id|count|
+---+-----+
|  A|    3|
|  B|    3|
|  C|    3|
+---+-----+

Presort#

Fugue’s partition also takes in a presort that will sort the data before the transform() function is applied. For example, we can get the row with the maximum value for each id by doing the following:

def one_row(df: pd.DataFrame) -> pd.DataFrame:
    return df.head(1)

transform(data.copy(), 
          one_row,
          schema="*",
          partition={"by":"id","presort":"value desc"})
date id value
0 2021-01-02 A 4
1 2021-01-03 B 5
2 2021-01-01 C 3

Similarly, the row with the minumum value can be taken by using value asc as the presort.

Partition-specific Behavior#

Fugue also makes it possible to modify the logic that is applied for each partition of data. For example, we can create a function that has a different behavior for id==A and a different behavior for id==B or id==C. In the function below, the data with id==A will be clipped with a minimum of 0 and maximum of 4. The other groups will have a minimum of 1 and maximum of 2.

def clip(df: pd.DataFrame) -> pd.DataFrame:
    id = df.iloc[0]["id"]
    if id == "A":
        df = df.assign(value = df['value'].clip(0,4))
    else:
        df = df.assign(value = df['value'].clip(1,2))
    return df

Now when we call it with the transform() function, the values of rows with id of B or C will have a range of values 1 to 2.

transform(data.copy(),
          clip,
          schema="*",
          partition={"by":"id"},
          engine="spark").show()
+----------+---+-----+
|      date| id|value|
+----------+---+-----+
|2021-01-01|  A|    3|
|2021-01-02|  A|    4|
|2021-01-03|  A|    2|
|2021-01-01|  B|    1|
|2021-01-02|  B|    2|
|2021-01-03|  B|    2|
|2021-01-01|  C|    2|
|2021-01-02|  C|    2|
|2021-01-03|  C|    2|
+----------+---+-----+

Conclusion#

In this section we have shown the partition-transform semantics, which are equivalent to the pandas groupby-apply. The difference is this scales to Spark or Dask seamlessly because it dictates the logical and physical grouping of data in distributed settings.