Partitioning#

Have questions? Chat with us on Github or Slack:

Homepage Slack Status

In the last section, we discussed how to define schema for operations in Fugue. In this case, we look at partitions, an important concept for distributed computing.

The partition argument allows us to control the partitioning scheme before the transform() operation is applied. Partitions dictate the physical grouping of data within a cluster.

Simple Partitioning Example#

In the DataFrame below, we want to take the difference of the value per day. Because there are three different ids, we want to make sure that we don’t get the difference across ids.

import pandas as pd 

data = pd.DataFrame({"date":["2021-01-01", "2021-01-02", "2021-01-03"] * 3,
                   "id": (["A"]*3 + ["B"]*3 + ["C"]*3),
                   "value": [3, 4, 2, 1, 2, 5, 3, 2, 3]})
data.head()
date id value
0 2021-01-01 A 3
1 2021-01-02 A 4
2 2021-01-03 A 2
3 2021-01-01 B 1
4 2021-01-02 B 2

Now we create a function that takes in a pd.DataFrame and outputs a pd.DataFrame. This will allow us to bring the logic to Spark and Dask as we’ve seen before.

def diff(df: pd.DataFrame) -> pd.DataFrame:
    df['diff'] = df['value'].diff()
    return df

But if we use the function directly seen below, we notice that the first row of B has a value instead of a NaN. This is wrong since the function used the value from A to calculate the difference.

from fugue import transform
transform(data.copy(), 
          diff, 
          schema="*, diff:int").head()
date id value diff
0 2021-01-01 A 3 NaN
1 2021-01-02 A 4 1.0
2 2021-01-03 A 2 -2.0
3 2021-01-01 B 1 -1.0
4 2021-01-02 B 2 1.0

This is solved by passing the partitions to Fugue’s transform(). We now see the correct output of NaN for the first value of B seen below.

transform(data.copy(), 
          diff, 
          schema="*, diff:int",
          partition={"by": "id"}).head()
date id value diff
0 2021-01-01 A 3 NaN
1 2021-01-02 A 4 1.0
2 2021-01-03 A 2 -2.0
3 2021-01-01 B 1 NaN
4 2021-01-02 B 2 1.0

Default Partitions#

What happens if we don’t supply partitions when we call transform()? For Spark and Dask, there are default partitions that are used. For some operations, row-wise operations for example, the default partitions should work. But when your groups of data need to be processed together, then partitions should be specified as the grouping mechanism.

To see what partitions look like, we create a count() function that will just count the number of elements in a given partition. If we use it naively without specifying partitions, we will see that the data is not grouped properly. There are many partitions with just one item of data in it.

from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
from typing import List, Dict, Any

def count(df: pd.DataFrame) -> List[Dict[str,Any]]:
    return [{"id": df.iloc[0]["id"], "count": df.shape[0]}]

transform(data.copy(),
          count,
          schema="id:str, count:int",
          engine=spark).show(5)

+---+-----+
| id|count|
+---+-----+
|  A|    1|
|  A|    1|
|  A|    1|
|  B|    1|
|  B|    1|
+---+-----+
only showing top 5 rows

But if we specify the partitions by id, then each id will be grouped into the same partition.

transform(data.copy(),
          count,
          schema="id:str, count:int",
          engine=spark,
          partition={"by":"id"}).show()
+---+-----+
| id|count|
+---+-----+
|  A|    3|
|  B|    3|
|  C|    3|
+---+-----+

Presort#

Fugue’s partition also takes in a presort that will sort the data before the transform() function is applied. For example, we can get the row with the maximum value for each id by doing the following:

def one_row(df: pd.DataFrame) -> pd.DataFrame:
    return df.head(1)

transform(data.copy(), 
          one_row,
          schema="*",
          partition={"by":"id","presort":"value desc"})
date id value
0 2021-01-02 A 4
1 2021-01-03 B 5
2 2021-01-01 C 3

Similarly, the row with the minumum value can be taken by using value asc as the presort.

Partition-specific Behavior#

Fugue also makes it possible to modify the logic that is applied for each partition of data. For example, we can create a function that has a different behavior for id==A and a different behavior for id==B or id==C. In the function below, the data with id==A will be clipped with a minimum of 0 and maximum of 4. The other groups will have a minimum of 1 and maximum of 2.

def clip(df: pd.DataFrame) -> pd.DataFrame:
    id = df.iloc[0]["id"]
    if id == "A":
        df = df.assign(value = df['value'].clip(0,4))
    else:
        df = df.assign(value = df['value'].clip(1,2))
    return df

Now when we call it with the transform() function, the values of rows with id of B or C will have a range of values 1 to 2.

transform(data.copy(),
          clip,
          schema="*",
          partition={"by":"id"},
          engine=spark).show()
+----------+---+-----+
|      date| id|value|
+----------+---+-----+
|2021-01-01|  A|    3|
|2021-01-02|  A|    4|
|2021-01-03|  A|    2|
|2021-01-01|  B|    1|
|2021-01-02|  B|    2|
|2021-01-03|  B|    2|
|2021-01-01|  C|    2|
|2021-01-02|  C|    2|
|2021-01-03|  C|    2|
+----------+---+-----+

Partition Validation#

In order for the functions that we defined above to behave correctly, we required the data to be partitioned or sorted. To ensure that we have configured the data correctly before passing it to our functions, we can define a partition validation check that instructs Fugue to throw an error if partitioning or presorting have not been aptly applied. Note the comments above the function defined below which will be read and applied. Even if Fugue is not used, they serve as helpful comments.

# partitionby_has: col1
# presort_is: col2 desc
def top_two(df:List[Dict[str,Any]]) -> Iterable[Dict[str,Any]]:
    """
    Function that returns the top 2 rows of an iterable.
    """
    n = 0
    while n < 2:
        yield df[n]
        n = n + 1

transform(
    df=df, 
    using=top_two, 
    schema="*", 
    partition={"by":"col1", "presort": "col2 desc"}
    )
col1 col2
0 a 3
1 a 2
2 b 6
3 b 5

The above code snippet exhibits a function operating on data that has been correctly partitioned and/or presorted. Now take a look at what happens if we don’t apply these steps correctly:

try:
    transform(
        df=df, 
        using=top_two, 
        schema="*", 
        partition={"by":"col1"}
        )
except Exception as e:
    print(e)
required presort key col2 is not in presort of PartitionSpec(num='0', by=['col1'], presort='')

Partitioning Strategies (Advanced)#

Fugue also offers a number of other partitioning strategies that can be used aside from the column level strategy described above. See below for a brief summary of these strategies. Also, make sure to check out partitioning for further information.

Algo

  • even - enforces an even number of items per partition

  • rand - randomly shuffles data

  • hash - uses hashing to partition the data (similar to Spark default)

Num

  • A number of partitions can be supplied as the partitioning strategy. Note that this strategy will not work for the pandas-based engine.

These strategies can also be used with presort.

def no_op(df:List[Dict[str,Any]]) -> Iterable[Dict[str,Any]]:
    """
    Function that returns the first row of an iterable.
    """
    yield df[0]

# by number
transform(
    df=df, 
    using=no_op, 
    schema="*", 
    partition={"num":4}, 
    engine="dask"
    ).compute()

# by algorithm
transform(
    df=df, 
    using=no_op, 
    schema="*", 
    partition={"algo":"even"}, 
    engine="dask"
    ).compute()
col1 col2
0 a 1
0 a 2
0 a 3
0 b 4
0 b 5
0 b 6

Remember that transform() runs on each partition of data. When using the pandas-based engine, the whole DataFrame is treated as one partition. For example, if you are normalizing a column of data, it will be applied on a per partition basis.

If you need to perform an operation that requires global max/mean/min, then Fugue has a more advanced interface for that called FugueWorkflow, or you can use the native Spark/Dask operations after a transform() call.

Conclusion#

In this section we have shown the partition-transform semantics, which are equivalent to the Pandas groupby-apply. The difference is this scales to Spark, Dask, or Ray seamlessly because it dictates the logical and physical grouping of data in distributed settings.

In the next section, we take a look at the ways to define the execution engine to bring our functions to Spark, Dask, or Ray.