Distributed Computing#

Have questions? Chat with us on Github or Slack:

Homepage Slack Status

In the previous sections, we went over how to use Fugue in the form of extensions and basic data operations, such as joins. In this section, we’ll talk about how those Fugue extensions scale to big data. These concepts are important for effectively utilizing distributed computing.

Partition and Presort#

Our data is spread across several machines, and we often need to rearrange the way the data is spread across the machines. This is because of operations that need all of the related data in one place. For example, calculating the median value per group requires all of the data from the same group on one machine. Fugue allows users to control the partitioning scheme during execution.

In the example below, take() is an operation that extracts n number of rows. We apply take on each partition. We will have two partitions because col1 is the partition key and it only has 2 values.

from fugue import FugueWorkflow
import pandas as pd 

data = pd.DataFrame({'col1':[1,1,1,2,2,2], 'col2':[1,4,5,7,4,2]})
df2 = data.copy()

with FugueWorkflow() as dag:
    df = dag.df(df2)
    df = df.partition(by=['col1'], presort="col2 desc").take(1)
    df.show()
PandasDataFrame
col1:long|col2:long
---------+---------
2        |7        
1        |5        
Total count: 2

We also used presort. The presort key here was col2 desc, which means that the data is sorted in descending order after partitioning. This makes the take operation give us the max value. We’ll go over one more example.

Persist and Broadcast#

Persist and broadcast are two other distributed computing concepts that Fugue has support for. Persist keeps a DataFrame in memory to avoid recomputation. Distributed computing frameworks often need an explicit persist() call to know which DataFrames need to be kept, otherwise they tend to be calculated repeatedly.

Broadcasting is making a smaller DataFrame available on all the workers of a cluster. Without broadcast(), these small DataFrames would be repeatedly sent to workers whenever they are needed to perform an operation. Broadcasting caches them on the workers.

with FugueWorkflow() as dag:
    df = dag.df([[0,1],[1,2]],"a:long,b:long")
    df.persist()
    df.broadcast()