Decoupling Logic and Execution#

In the tutorials so far, we used Fugue’s transform() function to port pandas code to Spark without any rewrites. The transform() function is also very flexible so it can handle functions with varying input and output types.

Decoupling logic and execution is one of the primary motivations of Fugue. This is meant to solve a couple of problems:

  1. Users have to learn an entirely new framework to work with distributed computing problems

  2. Logic written for a small data project is not reusable for a big data project

  3. Testing becomes a heavyweight process for distributed computing, especially Spark

  4. Along with number 3, iterations for distributed computing problems become slower and more expensive

Fugue believes that code should minimize dependency on frameworks as much as possible. This provides flexibility and portability. By decoupling logic and execution, we can focus on our logic in a scale-agnostic way and then choose which execution engine to use when the time arises. In this section, we look at how to move from the transform() function to end-to-end workflows with FugueWorkflow().

Differences between pandas and Spark#

To illustrate the first two main points above, we’ll use the same example that we used in the Type Flexibility section. We want to create a new column called location using the mapping dictionary.

import pandas as pd

_area_code_map = {"217": "Champaign, IL", "407": "Orlando, FL", "510": "Fremont, CA"}

data = pd.DataFrame({"phone": ["(217)-123-4567", "(217)-234-5678", "(407)-123-4567", 
                               "(407)-234-5678", "(510)-123-4567"]})
data.head()
phone
0 (217)-123-4567
1 (217)-234-5678
2 (407)-123-4567
3 (407)-234-5678
4 (510)-123-4567

First, we’ll perform the operation in pandas. It’s very simple because of the .map() method in pandas.

def map_phone_to_location(df: pd.DataFrame) -> pd.DataFrame:
    df["location"] = df["phone"].str.slice(1,4).map(_area_code_map)
    return df

map_phone_to_location(data.copy())
phone location
0 (217)-123-4567 Champaign, IL
1 (217)-234-5678 Champaign, IL
2 (407)-123-4567 Orlando, FL
3 (407)-234-5678 Orlando, FL
4 (510)-123-4567 Fremont, CA

Next we’ll perform the same operation in Spark and see how different the syntax is.

# Setting up Spark session
from pyspark.sql import SparkSession, DataFrame
spark = SparkSession.builder.getOrCreate()
from pyspark.sql.functions import create_map, col, lit, substring
from itertools import chain

df = spark.createDataFrame(data)  # converting the previous Pandas DataFrame

mapping_expr = create_map([lit(x) for x in chain(*_area_code_map.items())])

def map_phone_to_location(df: DataFrame) -> DataFrame:
    _df = df.withColumn("location", mapping_expr[substring(col("phone"),2,3)])
    return _df

map_phone_to_location(df).show()
+--------------+-------------+
|         phone|     location|
+--------------+-------------+
|(217)-123-4567|Champaign, IL|
|(217)-234-5678|Champaign, IL|
|(407)-123-4567|  Orlando, FL|
|(407)-234-5678|  Orlando, FL|
|(510)-123-4567|  Fremont, CA|
+--------------+-------------+

Looking at the two code examples, we had to re-implement the exact same functionality with completely different syntax. This isn’t a cherry-picked example. Data practitioners will often have to write two implementations of the same logic, one for each framework, especially as the logic gets more specific.

This is where Fugue comes in. Users can use the abstraction layer to write only one implementation of the function. This can then be applied to pandas, Spark, and Dask. We already saw this with the transform() function. An example snippet can be found below, nothing will be new here.

from fugue import transform

def map_phone_to_location(df: pd.DataFrame) -> pd.DataFrame:
    df["location"] = df["phone"].str.slice(1,4).map(_area_code_map)
    return df

transform(data.copy(),
          map_phone_to_location,
          schema="*")
phone
0 (217)-123-4567
1 (217)-234-5678
2 (407)-123-4567
3 (407)-234-5678
4 (510)-123-4567

transform versus FugueWorkflow#

While the transform() function is good for running a single function across multiple execution engines, Fugue also has FugueWorkflow, which can be used to make engine-agnostic end-to-end workflows. FugueWorkflow() constructs a directed-acyclic graph (DAG) where the inputs and outputs are DataFrames. The code block below will run on the pandas-based NativeExecutionEngine.

from fugue import FugueWorkflow

with FugueWorkflow() as dag:
    df = dag.df(data.copy())
    df = df.transform(map_phone_to_location, schema="*, location:str")
    df.show()
PandasDataFrame
phone:str                                                      |location:str             
---------------------------------------------------------------+-------------------------
(217)-123-4567                                                 |Champaign, IL            
(217)-234-5678                                                 |Champaign, IL            
(407)-123-4567                                                 |Orlando, FL              
(407)-234-5678                                                 |Orlando, FL              
(510)-123-4567                                                 |Fremont, CA              
Total count: 5

In order to bring it to Spark, all we need to do is pass the SparkExecutionEngine into FugueWorkflow, similar to how we used the transform() function in the last section. Now all the code underneath the with statement will run on Spark. We did not make any modifications to map_phone_to_location in order to bring it to Spark. The df.transform() call below converts it to a Fugue Transformer during runtime by using the type-annotations and schema provided. We can use the same function in Spark or Dask without making modifications.

import fugue_spark

with FugueWorkflow("spark") as dag:
    df = dag.df(data.copy())  # Still the original Pandas DataFrame
    df = df.transform(map_phone_to_location, schema="*, location:str")
    df.show()
SparkDataFrame
phone:str                                                      |location:str             
---------------------------------------------------------------+-------------------------
(217)-123-4567                                                 |Champaign, IL            
(217)-234-5678                                                 |Champaign, IL            
(407)-123-4567                                                 |Orlando, FL              
(407)-234-5678                                                 |Orlando, FL              
(510)-123-4567                                                 |Fremont, CA              
Total count: 5

If we had five different functions that we call transform() on to bring to Spark, we would need to specify the SparkExecutionEngine five times. The FugueWorkflow allows us to make the entire computation run on either pandas, Spark, or Dask. Both are similar in principle in that they leave the original functions decoupled from the execution environment.

Testability and Maintainability#

Is the native Python or pandas implementation of map_phone_to_location() better? Is the native Spark implementation better?

The main concern of Fugue is clear, readable code. Users can write code in whatever expresses their logic the best. The computing efficiency lost by using Fugue is unlikely to be significant, especially in comparison to the developer efficiency gained through more rapid iterations and easier maintenance. In fact, Fugue is designed in a way that often sees more speed-ups than inexperienced users working with native Spark code because it handles a lot of the tricks necessary to use Spark effectively.

Fugue code becomes easily testable because the function contains logic that is portable across all pandas, Spark, and Dask. We can test code without the need to spin up computing resources (such as Spark or Dask clusters). This hardware often takes time to spin up just for a simple test, making it painful to run unit tests on Spark. Now, we can test quickly with native Python or pandas and then execute on Spark when needed. Developers that use Fugue benefit from more rapid iterations in their data projects.

If we use a pure Python function, such as the one below, all we have to do to test it is run some values through the defined function.

from typing import List, Dict, Any

def map_phone_to_location2(df: List[Dict[str,Any]]) -> List[Dict[str,Any]]:
    for row in df:
        row["location"] = _area_code_map[row["phone"][1:4]]
    return df

# Remember the input was List[Dict[str,Any]]
map_phone_to_location2([{'phone': '(407)-234-5678'}, 
                       {'phone': '(407)-234-5679'}])
[{'phone': '(407)-234-5678', 'location': 'Orlando, FL'},
 {'phone': '(407)-234-5679', 'location': 'Orlando, FL'}]

Even if the output here is a List[Dict[str,Any]], Fugue takes care of converting it back to a DataFrame.

Fugue as a Mindset#

Fugue is a framework, but more importantly, it is a mindset.

  1. Fugue believes that the framework should adapt to the user, not the other way around.

  2. Fugue lets users code express logic in a scale-agnostic way, with the tools they prefer.

  3. Fugue values readability and maintainability of code over deep framework-specific optimizations

Using distributed computing is currently harder than it needs to be. However, these systems often follow similar patterns, which have been abstracted to create a framework that lets users focus on defining their logic. We cover these concepts in the rest of tutorials. If you’re new to distributed computing, Fugue is the perfect place to get started.

[Optional] Comparison to Modin and Koalas#

Fugue gets compared a lot to Modin and Koalas. Modin is a pandas interface for execution on Dask, and Koalas is a pandas interface for execution on Spark. Fugue, Modin, and Koalas have similar goals in making an easier distributed computing experience. The main difference is that Modin and Koalas use pandas as the grammar for distributed computing. Fugue, on the other hand, uses native Python and SQL as the grammar for distributed computing (though pandas is also supported). For more information, check this page.

The clearest example of pandas not being compatible with Spark is the acceptance of mixed-typed columns. A single column can have numeric and string values. Spark, on the other hand, is strongly typed and enforces the schema. More than that, pandas is strongly reliant on the index for operations. As users transition to Spark, the index mindset does not hold as well. Order is not always guaranteed in a distributed system; there is an overhead to maintain a global index, and, moreover, it is often not necessary.