Partitioning#

Have questions? Chat with us on Github or Slack:

Homepage Slack Status

In the last section, we discussed how to define schema for operations in Fugue. In this case, we look at partitions, an important concept for distributed computing.

The partition argument allows us to control the partitioning scheme before the transform() operation is applied. Partitions dictate the physical grouping of data within a cluster.

Simple Partitioning Example#

In the DataFrame below, we want to take the difference of the value per day. Because there are three different ids, we want to make sure that we don’t get the difference across ids.

import pandas as pd 

data = pd.DataFrame({"date":["2021-01-01", "2021-01-02", "2021-01-03"] * 3,
                   "id": (["A"]*3 + ["B"]*3 + ["C"]*3),
                   "value": [3, 4, 2, 1, 2, 5, 3, 2, 3]})
data.head()
date id value
0 2021-01-01 A 3
1 2021-01-02 A 4
2 2021-01-03 A 2
3 2021-01-01 B 1
4 2021-01-02 B 2

Now we create a function that takes in a pd.DataFrame and outputs a pd.DataFrame. This will allow us to bring the logic to Spark and Dask as we’ve seen before.

def diff(df: pd.DataFrame) -> pd.DataFrame:
    df['diff'] = df['value'].diff()
    return df

But if we use the function directly seen below, we notice that the first row of B has a value instead of a NaN. This is wrong since the function used the value from A to calculate the difference.

from fugue import transform
transform(data.copy(), 
          diff, 
          schema="*, diff:int").head()
date id value diff
0 2021-01-01 A 3 NaN
1 2021-01-02 A 4 1.0
2 2021-01-03 A 2 -2.0
3 2021-01-01 B 1 -1.0
4 2021-01-02 B 2 1.0

This is solved by passing the partitions to Fugue’s transform(). We now see the correct output of NaN for the first value of B seen below.

transform(data.copy(), 
          diff, 
          schema="*, diff:int",
          partition={"by": "id"}).head()
date id value diff
0 2021-01-01 A 3 NaN
1 2021-01-02 A 4 1.0
2 2021-01-03 A 2 -2.0
3 2021-01-01 B 1 NaN
4 2021-01-02 B 2 1.0

Default Partitions#

What happens if we don’t supply partitions when we call transform()? For Spark and Dask, there are default partitions that are used. For some operations, row-wise operations for example, the default partitions should work. But when your groups of data need to be processed together, then partitions should be specified as the grouping mechanism.

To see what partitions look like, we create a count() function that will just count the number of elements in a given partition. If we use it naively without specifying partitions, we will see that the data is not grouped properly. There are many partitions with just one item of data in it.

from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
from typing import List, Dict, Any

def count(df: pd.DataFrame) -> List[Dict[str,Any]]:
    return [{"id": df.iloc[0]["id"], "count": df.shape[0]}]

transform(data.copy(),
          count,
          schema="id:str, count:int",
          engine=spark).show(5)
+---+-----+
| id|count|
+---+-----+
|  A|    2|
|  A|    2|
|  B|    2|
|  C|    2|
|  C|    1|
+---+-----+

But if we specify the partitions by id, then each id will be grouped into the same partition.

transform(data.copy(),
          count,
          schema="id:str, count:int",
          engine=spark,
          partition={"by":"id"}).show()
+---+-----+
| id|count|
+---+-----+
|  B|    3|
|  C|    3|
|  A|    3|
+---+-----+

Presort#

Fugue’s partition also takes in a presort that will sort the data before the transform() function is applied. For example, we can get the row with the maximum value for each id by doing the following:

def one_row(df: pd.DataFrame) -> pd.DataFrame:
    return df.head(1)

transform(data.copy(), 
          one_row,
          schema="*",
          partition={"by":"id","presort":"value desc"})
date id value
0 2021-01-02 A 4
1 2021-01-03 B 5
2 2021-01-01 C 3

Similarly, the row with the minimum value can be taken by using value asc as the presort.

Partition-specific Behavior#

Fugue also makes it possible to modify the logic that is applied for each partition of data. For example, we can create a function that has a different behavior for id==A and a different behavior for id==B or id==C. In the function below, the data with id==A will be clipped with a minimum of 0 and maximum of 4. The other groups will have a minimum of 1 and maximum of 2.

def clip(df: pd.DataFrame) -> pd.DataFrame:
    id = df.iloc[0]["id"]
    if id == "A":
        df = df.assign(value = df['value'].clip(0,4))
    else:
        df = df.assign(value = df['value'].clip(1,2))
    return df

Now when we call it with the transform() function, the values of rows with id of B or C will have a range of values 1 to 2.

transform(data.copy(),
          clip,
          schema="*",
          partition={"by":"id"},
          engine=spark).show()
+----------+---+-----+
|      date| id|value|
+----------+---+-----+
|2021-01-01|  B|    1|
|2021-01-02|  B|    2|
|2021-01-03|  B|    2|
|2021-01-01|  C|    2|
|2021-01-02|  C|    2|
|2021-01-03|  C|    2|
|2021-01-01|  A|    3|
|2021-01-02|  A|    4|
|2021-01-03|  A|    2|
+----------+---+-----+

Partition Validation#

In order for the functions that we defined above to behave correctly, we required the data to be partitioned or sorted. To ensure that we have configured the data correctly before passing it to our functions, we can define a partition validation check that instructs Fugue to throw an error if partitioning or presorting have not been aptly applied. Note the comments above the function defined below which will be read and applied. Even if Fugue is not used, they serve as helpful comments.

from typing import Iterable

df = pd.DataFrame({"col1": ["a", "a", "a", "b", "b", "b"],
                   "col2": [1,2,3,4,5,6]})

# partitionby_has: col1
# presort_is: col2 desc
def top_two(df:List[Dict[str,Any]]) -> Iterable[Dict[str,Any]]:
    """
    Function that returns the top 2 rows of an iterable.
    """
    n = 0
    while n < 2:
        yield df[n]
        n = n + 1

transform(
    df=df, 
    using=top_two, 
    schema="*", 
    partition={"by":"col1", "presort": "col2 desc"}
    )
col1 col2
0 a 3
1 a 2
2 b 6
3 b 5

The above code snippet exhibits a function operating on data that has been correctly partitioned and/or presorted. Now take a look at what happens if we don’t apply these steps correctly:

try:
    transform(
        df=df, 
        using=top_two, 
        schema="*", 
        partition={"by":"col1"}
        )
except Exception as e:
    print(e)
required presort key col2 is not in presort of PartitionSpec(num='0', by=['col1'], presort='')

Partitioning Strategies (Advanced)#

Fugue also offers a number of other partitioning strategies that can be used aside from the column level strategy described above. See below for a brief summary of these strategies. Also, make sure to check out partitioning for further information.

Algo

  • even - enforces an even number of items per partition

  • rand - randomly shuffles data

  • hash - uses hashing to partition the data (similar to Spark default)

Num

  • A number of partitions can be supplied as the partitioning strategy. Note that this strategy will not work for the pandas-based engine.

These strategies can also be used with presort.

def no_op(df:List[Dict[str,Any]]) -> Iterable[Dict[str,Any]]:
    """
    Function that returns the first row of an iterable.
    """
    yield df[0]

# by number
transform(
    df=df, 
    using=no_op, 
    schema="*", 
    partition={"num":4}, 
    engine="dask"
    ).compute()

# by algorithm
transform(
    df=df, 
    using=no_op, 
    schema="*", 
    partition={"algo":"even"}, 
    engine="dask"
    ).compute()
col1 col2
0 a 1
0 a 2
0 a 3
0 b 4
0 b 5
0 b 6

Remember that transform() runs on each partition of data. When using the pandas-based engine, the whole DataFrame is treated as one partition. For example, if you are normalizing a column of data, it will be applied on a per partition basis.

If you need to perform an operation that requires global max/mean/min, then Fugue has a more advanced interface for that called FugueWorkflow, or you can use the native Spark/Dask operations after a transform() call.

Operations Across Whole DataFrame#

There are some cases where we need to perform some operations on a whole dataset. Upto now, the transform() has taken an operation and applied it per partition. How can we do a min max scaling on Spark? In Fugue semantics, we call this as a Processor. Processors operate on the whole DataFrame on the Driver side, while Transformers are meant to run inside a worker and focus on local logic.

Using FugueSQL#

The simplest way is to use FugueSQL because it’s inherently scale agnostic and will work for both Spark and Pandas (along with Dask). Below is a way to do the min-max scaling across a whole column. For practitioners that don’t want to use SQL, the Python equivalent can be found below.

from fugue.api import fugue_sql

df = pd.DataFrame({"col1": ["a", "a", "a", "b", "b", "b"],
                   "col2": [1,2,3,4,5,6]})

query = """
temp = SELECT MIN(col2) AS minval, MAX(col2) AS maxval
         FROM df

SELECT col1, (col2 - minval) / (maxval-minval) AS col2_scaled
  FROM temp
 CROSS JOIN df
"""

fugue_sql(query)
col1 col2_scaled
0 a 0.0
1 a 0.2
2 a 0.4
3 b 0.6
4 b 0.8
5 b 1.0

And to run this on Spark, all we have to do is change the engine.

df = spark.createDataFrame(df)

fugue_sql(query, engine=spark).show()
+----+-----------+
|col1|col2_scaled|
+----+-----------+
|   a|        0.0|
|   a|        0.2|
|   a|        0.4|
|   b|        0.6|
|   b|        0.8|
|   b|        1.0|
+----+-----------+

Using Fugue API#

Fugue has a programmatic API to do such operations, but users tend to find it to be less intuitive. This is closer to Spark’s syntax than Pandas.

from fugue.column import col, lit, functions as f
import fugue.api as fa
from fugue import AnyDataFrame

df = pd.DataFrame({"col1": ["a", "a", "a", "b", "b", "b"],
                   "col2": [1,2,3,4,5,6]})

def scale(df: AnyDataFrame):
    minmax = fa.as_array(fa.select(df, f.min(col("col2")).alias("minval"), f.max(col("col2")).alias("maxval")))
    return fa.select(
        df,
        col("col1"),
        ((col("col2") - minmax[0][0])/ (minmax[0][1] - minmax[0][0])).alias("col2_scaled"),
    )

scale(df)
col1 col2_scaled
0 a 0.0
1 a 0.2
2 a 0.4
3 b 0.6
4 b 0.8
5 b 1.0

And to run on Spark:

sdf = spark.createDataFrame(df)

scale(sdf).show()
+----+-----------+
|col1|col2_scaled|
+----+-----------+
|   a|        0.0|
|   a|        0.2|
|   a|        0.4|
|   b|        0.6|
|   b|        0.8|
|   b|        1.0|
+----+-----------+

For a full list of column functions, see the documentation but FugueSQL is more complete than the Python API for now.

Conclusion#

In this section we have shown the partition-transform semantics, which are equivalent to the Pandas groupby-apply. The difference is this scales to Spark, Dask, or Ray seamlessly because it dictates the logical and physical grouping of data in distributed settings.

In the next section, we take a look at the ways to define the execution engine to bring our functions to Spark, Dask, or Ray.