Distributed Computing#

Have questions? Chat with us on Github or Slack:

Homepage Slack Status

In the previous sections, we went over how to use Fugue in the form of extensions and basic data operations, such as joins. In this section, we’ll talk about how those Fugue extensions scale to big data. These concepts are important for effectively utilizing distributed computing.

Partition and Presort#

Our data is spread across several machines, and we often need to rearrange the way the data is spread across the machines. This is because of operations that need all of the related data in one place. For example, calculating the median value per group requires all of the data from the same group on one machine. Fugue allows users to control the partitioning scheme during execution.

We have seen it used with the transform() function previously. take() is another operation that can be executed per partition. It extracts n number of rows per partition.

import fugue.api as fa
import pandas as pd 

df = pd.DataFrame({'col1':[1,1,1,2,2,2], 'col2':[1,4,5,7,4,2]})
fa.take(df, 1, presort="col2 desc", partition={"by":['col1']})
col1 col2
0 2 7
1 1 5

The presort expression here was col2 desc, which means that the data is sorted in descending order after partitioning. This makes the take operation give us the max value.

Persist and Broadcast#

Persist and broadcast are two other distributed computing concepts that Fugue has support for. Persist keeps a DataFrame in memory to avoid recomputation. Distributed computing frameworks often need an explicit persist() call to know which DataFrames need to be kept, otherwise they tend to be calculated repeatedly.

Broadcasting is making a smaller DataFrame available on all the workers of a cluster. Without broadcast(), these small DataFrames would be repeatedly sent to workers whenever they are needed to perform an operation. Broadcasting caches them on the workers.

fa.persist(df, engine="spark")
fa.broadcast(df, engine="spark")
DataFrame[col1: bigint, col2: bigint]


Fugue has support for reparitioning a distributed DataFrame. This can be used to increase the number of partitions to increase utilization. In the opposite case, sometimes the overhead of having too many partitions is too much, and an operation can be more performant with less partitions.

It takes a Fugue PartitionSpec to partition the data. To see how partitions can be defined, check partitioning.

fa.repartition(df, {"num": 100}, engine="spark")
DataFrame[col1: bigint, col2: bigint]