Prefect#

Have questions? Chat with us on Github or Slack:

Homepage Slack Status

Prefect is an open-source workflow orchestration framework designed for the modern data stack. The prefect-fugue collection allows users to prototype their code locally, and then execute it on a distributed computing cluster (Spark, Dask, Ray) when production ready. It also lets users submit their tasks to a Databricks cluster easily, even without Fugue.

The prefect-fugue collection runs on top of the fugue-cloudprovider library which can run on top of Databricks, Coiled and Anyscale. For supported cloud providers, see the Cloud Providers section of the tutorials.

If you use some other platform such as self-hosted Spark, Dask, or Ray. You can still take advantage of the prefect-fugue collection by passing a SparkSession, Dask Client or Ray cluster address. This tutorial focuses on Spark specifically because Prefect already has Dask and Ray support in prefect-dask and prefect-ray for running tasks. Still, this may be a good approach when working with DataFrames.

Working with Fugue Blocks#

The prefect-fugue collection uses Prefect Blocks to hold connections to compute. The first step would be to register a Fugue Engine block. First we install the collection:

pip install prefect-fugue

and then we can register the block into our workspace so it will appear in the Prefect UI.

prefect block register -m prefect_fugue

That will display the block in our Prefect workspace. Below is what the creation screen will look like.

img

There are 4 items that need to be filled to create a block.

  1. Block Name - name that will be used to use the block.

  2. Engine Name - one of the Fugue supported backends (spark, dask, ray, duckdb)

  3. Engine Config - configurations related to the cluster

  4. Secret Config - credentials to connect to a cluster

In this example, we create a block to use a Databricks cluster.

  1. Block Name - databricks

  2. Engine Name - spark

  3. Engine Config - None

  4. Secret Config - seen below

{
    "host": "https://dbc-38aaa459-faaf.cloud.databricks.com",
    "token": "dapiecaaae64a727498daaaaafe1bace968a",
    "cluster_id": "0612-191111-6fopaaaa"
}

More information can be found on the databricks section under the cloudprovider tutorials

Using a Spark Cluster Inside a Flow#

Let’s start by running code on top of Databricks. databricks-connect is already installed in this environment. This section may have a lot of logs because of the monitoring provided by Prefect. This section also assumes that the user has Prefect configured to the right workspace.

Below we have one task that takes in a SparkSession and uses it to run some Spark code. We can then use this in the Prefect Flow with the fugue_engine context. This fugue_engine will create an ephemeral cluster to run the code underneath, and then turn off when finished.

from prefect import task, flow
from prefect_fugue import fugue_engine

@task
def my_spark_task(spark, n=1):
    df = spark.createDataFrame([[f"hello spark {n}"]], "a string")
    df.show()

@flow
def native_spark_flow(engine):
    with fugue_engine(engine) as engine:
        my_spark_task(engine.spark_session, 1)

Then now we can invoke this Flow using the Fugue block as the engine.

native_spark_flow("fugue/databricks")
22:14:01.381 | INFO    | prefect.engine - Created flow run 'belligerent-potoo' for flow 'native-spark-flow'
22:14:02.167 | INFO    | Flow run 'belligerent-potoo' - Created task run 'my_spark_task-4b707fb5-0' for task 'my_spark_task'
22:14:02.169 | INFO    | Flow run 'belligerent-potoo' - Executing 'my_spark_task-4b707fb5-0' immediately...
View job details at https://dbc-fd3c27a3-594d.cloud.databricks.com/?o=2707332449367147#/setting/clusters/0804-155157-zq2rzdv4/sparkUi
22:14:03.273 | INFO    | Task run 'my_spark_task-4b707fb5-0' - Finished in state Completed()
22:14:03.361 | INFO    | Flow run 'belligerent-potoo' - Finished in state Completed('All states completed.')
+-------------+
|            a|
+-------------+
|hello spark 1|
+-------------+
[Completed(message=None, type=COMPLETED, result=None)]

Similarly, if you don’t use Databricks but have your own way to get a SparkSession, you can directly pass the SparkSession into the Flow. The fugue_engine context will be able to interpret this.

from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

native_spark_flow(spark)
22:14:07.803 | INFO    | prefect.engine - Created flow run 'rational-alligator' for flow 'native-spark-flow'
22:14:08.453 | INFO    | Flow run 'rational-alligator' - Created task run 'my_spark_task-4b707fb5-0' for task 'my_spark_task'
22:14:08.453 | INFO    | Flow run 'rational-alligator' - Executing 'my_spark_task-4b707fb5-0' immediately...
View job details at https://dbc-fd3c27a3-594d.cloud.databricks.com/?o=2707332449367147#/setting/clusters/0804-155157-zq2rzdv4/sparkUi
22:14:09.408 | INFO    | Task run 'my_spark_task-4b707fb5-0' - Finished in state Completed()
22:14:09.498 | INFO    | Flow run 'rational-alligator' - Finished in state Completed('All states completed.')
+-------------+
|            a|
+-------------+
|hello spark 1|
+-------------+
[Completed(message=None, type=COMPLETED, result=None)]

Running Map Jobs on Spark/Dask with 0 Spark/Dask code#

We showed how to run Spark code on top of a Spark cluster, but the strength of Fugue is decoupling from distributed framework code such as Spark and Dask. Decoupling from these frameworks allows us to test code locally before scaling out to a cluster. In the example below, we simulate having a pandas DataFrame where each row is a job.

When testing the Flow, we can pass None as the engine so everything runs on Pandas. When ready to scale out, we can pass in our Block or SparkSession. Fugue’s transform() task will use the engine provided by the fugue_engine context.

from time import sleep
import pandas as pd
from prefect_fugue import transform

@task
def create_jobs(n) -> pd.DataFrame:
    return pd.DataFrame(dict(jobs=range(n)))

# schema: *,batch_size:str
def run_one_job(df:pd.DataFrame) -> pd.DataFrame:
    sleep(len(df)*5)
    return df.assign(batch_size=len(df))

@flow
def run_all_jobs(n, engine=None):
    jobs = create_jobs(n)
    with fugue_engine(engine):
        return transform(jobs, run_one_job, partition="per_row", as_local=True)

We can test the Flow above on a local machine without Spark. We run on one job first.

run_all_jobs(1)
22:25:46.575 | INFO    | prefect.engine - Created flow run 'hopping-curassow' for flow 'run-all-jobs'
22:25:47.401 | INFO    | Flow run 'hopping-curassow' - Created task run 'create_jobs-71a807e0-0' for task 'create_jobs'
22:25:47.402 | INFO    | Flow run 'hopping-curassow' - Executing 'create_jobs-71a807e0-0' immediately...
22:25:47.653 | INFO    | Task run 'create_jobs-71a807e0-0' - Finished in state Completed()
22:25:47.772 | INFO    | Flow run 'hopping-curassow' - Created task run 'run_one_job (transfomer) - f6d46-1e345475-0' for task 'run_one_job (transfomer) - f6d46'
22:25:47.774 | INFO    | Flow run 'hopping-curassow' - Executing 'run_one_job (transfomer) - f6d46-1e345475-0' immediately...
22:25:47.974 | WARNING | root - NativeExecutionEngine doesn't respect num_partitions ROWCOUNT
22:25:53.150 | INFO    | Task run 'run_one_job (transfomer) - f6d46-1e345475-0' - Finished in state Completed()
22:25:53.256 | INFO    | Flow run 'hopping-curassow' - Finished in state Completed()
jobs batch_size
0 0 1

Because it succeeded, we can now attach our Fugue Databricks Block to run on Databricks. Now we run on 8 jobs, and we’ll see that parallelization from the Spark cluster will make this Flow execute faster.

%%time
run_all_jobs(8, "fugue/databricks") # run on databricks
22:38:07.115 | INFO    | prefect.engine - Created flow run 'observant-doberman' for flow 'run-all-jobs'
22:38:07.747 | INFO    | Flow run 'observant-doberman' - Created task run 'create_jobs-71a807e0-0' for task 'create_jobs'
22:38:07.749 | INFO    | Flow run 'observant-doberman' - Executing 'create_jobs-71a807e0-0' immediately...
22:38:07.963 | INFO    | Task run 'create_jobs-71a807e0-0' - Finished in state Completed()
22:38:08.239 | INFO    | Flow run 'observant-doberman' - Created task run 'run_one_job (transfomer) - 0f7fe-1e345475-0' for task 'run_one_job (transfomer) - 0f7fe'
22:38:08.241 | INFO    | Flow run 'observant-doberman' - Executing 'run_one_job (transfomer) - 0f7fe-1e345475-0' immediately...
View job details at https://dbc-fd3c27a3-594d.cloud.databricks.com/?o=2707332449367147#/setting/clusters/0804-155157-zq2rzdv4/sparkUi
View job details at https://dbc-fd3c27a3-594d.cloud.databricks.com/?o=2707332449367147#/setting/clusters/0804-155157-zq2rzdv4/sparkUi
View job details at https://dbc-fd3c27a3-594d.cloud.databricks.com/?o=2707332449367147#/setting/clusters/0804-155157-zq2rzdv4/sparkUi
View job details at https://dbc-fd3c27a3-594d.cloud.databricks.com/?o=2707332449367147#/setting/clusters/0804-155157-zq2rzdv4/sparkUi
View job details at https://dbc-fd3c27a3-594d.cloud.databricks.com/?o=2707332449367147#/setting/clusters/0804-155157-zq2rzdv4/sparkUi
                                                                                
View job details at https://dbc-fd3c27a3-594d.cloud.databricks.com/?o=2707332449367147#/setting/clusters/0804-155157-zq2rzdv4/sparkUi
22:38:29.844 | INFO    | Task run 'run_one_job (transfomer) - 0f7fe-1e345475-0' - Finished in state Completed()
22:38:29.929 | INFO    | Flow run 'observant-doberman' - Finished in state Completed()
CPU times: user 332 ms, sys: 56.2 ms, total: 389 ms
Wall time: 23.1 s
jobs batch_size
0 0 1
1 1 1
2 2 1
3 3 1
4 4 1
5 5 1
6 6 1
7 7 1

There is still some overhead with sending the work, but the time is decreased compared to the expected execution time if ran sequentially (40 seconds).

We can also use local Dask by passing the string "dask". We can also pass a Dask Client() or use the Fugue Engine Block with Coiled. More information can be found in the Coiled cloudprovider docs.

%%time
run_all_jobs(4, "dask")
22:39:43.381 | INFO    | prefect.engine - Created flow run 'imported-mammoth' for flow 'run-all-jobs'
22:39:43.944 | INFO    | Flow run 'imported-mammoth' - Created task run 'create_jobs-71a807e0-0' for task 'create_jobs'
22:39:43.946 | INFO    | Flow run 'imported-mammoth' - Executing 'create_jobs-71a807e0-0' immediately...
22:39:44.164 | INFO    | Task run 'create_jobs-71a807e0-0' - Finished in state Completed()
22:39:44.258 | INFO    | Flow run 'imported-mammoth' - Created task run 'run_one_job (transfomer) - 71d77-1e345475-0' for task 'run_one_job (transfomer) - 71d77'
22:39:44.260 | INFO    | Flow run 'imported-mammoth' - Executing 'run_one_job (transfomer) - 71d77-1e345475-0' immediately...
22:39:49.663 | INFO    | Task run 'run_one_job (transfomer) - 71d77-1e345475-0' - Finished in state Completed()
22:39:49.741 | INFO    | Flow run 'imported-mammoth' - Finished in state Completed()
CPU times: user 768 ms, sys: 189 ms, total: 956 ms
Wall time: 6.63 s
jobs batch_size
0 0 1
1 1 1
2 2 1
3 3 1

Running SQL on any Spark, Dask, and Duckdb#

Prototyping locally, and then running the full job on the cluster is also possible with FugueSQL. DuckDB is a good engine to run SQL queries on flat files or Pandas DataFrames. When ready, we can bring it to SparkSQL on the cluster. Similar to the transform() task shown above, there is also an fsql() task.

Here we can load in data and perform a query with FugueSQL.

from prefect_fugue import fsql

@task
def load_data():
    return pd.read_parquet("https://d37ci6vzurychx.cloudfront.net/trip-data/green_tripdata_2022-01.parquet")

@flow
def run_sql(top, engine):
    data = load_data()
    with fugue_engine(engine):
        fsql("""
        SELECT PULocationID, COUNT(*) AS ct FROM df
        GROUP BY 1 ORDER BY 2 DESC LIMIT {{top}}
        PRINT
        """, df=data, top=top)

To debug locally without SparkSQL, we can use DuckDB as the engine.

run_sql(2, "duckdb"); # debug/develop without spark
22:43:43.403 | INFO    | prefect.engine - Created flow run 'spectacular-mongrel' for flow 'run-sql'
22:43:44.013 | INFO    | Flow run 'spectacular-mongrel' - Created task run 'load_data-2ff00c39-0' for task 'load_data'
22:43:44.014 | INFO    | Flow run 'spectacular-mongrel' - Executing 'load_data-2ff00c39-0' immediately...
22:43:44.769 | INFO    | Task run 'load_data-2ff00c39-0' - Finished in state Completed()
22:43:44.881 | INFO    | Flow run 'spectacular-mongrel' - Created task run 'SELECT PULocationID, COUNT(*) AS ct FROM... - 12537-54db7018-0' for task 'SELECT PULocationID, COUNT(*) AS ct FROM... - 12537'
22:43:44.883 | INFO    | Flow run 'spectacular-mongrel' - Executing 'SELECT PULocationID, COUNT(*) AS ct FROM... - 12537-54db7018-0' immediately...
PULocationID ct
0 74 9728
1 75 8152
schema: PULocationID:long,ct:long
22:43:45.537 | INFO    | Task run 'SELECT PULocationID, COUNT(*) AS ct FROM... - 12537-54db7018-0' - Finished in state Completed()
22:43:45.832 | INFO    | Flow run 'spectacular-mongrel' - Finished in state Completed('All states completed.')

Again to run on the cluster, we can use the Block.

run_sql(10, "fugue/databricks");
22:44:03.182 | INFO    | prefect.engine - Created flow run 'cooperative-shark' for flow 'run-sql'
22:44:03.821 | INFO    | Flow run 'cooperative-shark' - Created task run 'load_data-2ff00c39-0' for task 'load_data'
22:44:03.823 | INFO    | Flow run 'cooperative-shark' - Executing 'load_data-2ff00c39-0' immediately...
22:44:04.424 | INFO    | Task run 'load_data-2ff00c39-0' - Finished in state Completed()
22:44:04.689 | INFO    | Flow run 'cooperative-shark' - Created task run 'SELECT PULocationID, COUNT(*) AS ct FROM... - b3b18-54db7018-0' for task 'SELECT PULocationID, COUNT(*) AS ct FROM... - b3b18'
22:44:04.691 | INFO    | Flow run 'cooperative-shark' - Executing 'SELECT PULocationID, COUNT(*) AS ct FROM... - b3b18-54db7018-0' immediately...
22/08/30 22:44:11 WARN SparkServiceRPCClient: Syncing Temp Views took 6033 ms
View job details at https://dbc-fd3c27a3-594d.cloud.databricks.com/?o=2707332449367147#/setting/clusters/0804-155157-zq2rzdv4/sparkUi
View job details at https://dbc-fd3c27a3-594d.cloud.databricks.com/?o=2707332449367147#/setting/clusters/0804-155157-zq2rzdv4/sparkUi
PULocationID ct
0 74 9728
1 75 8152
2 41 4904
3 42 2693
4 166 2635
5 95 2561
6 97 2240
7 7 2068
8 43 1916
9 244 1881
schema: PULocationID:long,ct:long
22:44:16.945 | INFO    | Task run 'SELECT PULocationID, COUNT(*) AS ct FROM... - b3b18-54db7018-0' - Finished in state Completed()
22:44:17.210 | INFO    | Flow run 'cooperative-shark' - Finished in state Completed('All states completed.')

The downside of the query above is that our data loading is still tied to the Pandas interface. FugueSQL has additional keywords such as LOAD and SAVE so we can run everything from loading, processing, and saving all on DuckDB or SparkSQL. More information on FugueSQL can be found in the FugueSQL section of the tutorials.

@flow
def run_sql_full(top, engine):
    with fugue_engine(engine):
        fsql("""
        df = LOAD "https://d37ci6vzurychx.cloudfront.net/trip-data/green_tripdata_2022-01.parquet"

        SELECT PULocationID, COUNT(*) AS ct FROM df
        GROUP BY 1 ORDER BY 2 DESC LIMIT {{top}}
        PRINT
        """, top=top)
run_sql(2, "fugue/databricks");
22:46:52.723 | INFO    | prefect.engine - Created flow run 'neat-junglefowl' for flow 'run-sql'
22:46:53.309 | INFO    | Flow run 'neat-junglefowl' - Created task run 'load_data-2ff00c39-0' for task 'load_data'
22:46:53.311 | INFO    | Flow run 'neat-junglefowl' - Executing 'load_data-2ff00c39-0' immediately...
22:46:54.001 | INFO    | Task run 'load_data-2ff00c39-0' - Finished in state Completed()
22:46:54.265 | INFO    | Flow run 'neat-junglefowl' - Created task run 'SELECT PULocationID, COUNT(*) AS ct FROM... - 133ce-54db7018-0' for task 'SELECT PULocationID, COUNT(*) AS ct FROM... - 133ce'
22:46:54.266 | INFO    | Flow run 'neat-junglefowl' - Executing 'SELECT PULocationID, COUNT(*) AS ct FROM... - 133ce-54db7018-0' immediately...
22/08/30 22:47:03 WARN SparkServiceRPCClient: Syncing Temp Views took 3078 ms
View job details at https://dbc-fd3c27a3-594d.cloud.databricks.com/?o=2707332449367147#/setting/clusters/0804-155157-zq2rzdv4/sparkUi
View job details at https://dbc-fd3c27a3-594d.cloud.databricks.com/?o=2707332449367147#/setting/clusters/0804-155157-zq2rzdv4/sparkUi
PULocationID ct
0 74 9728
1 75 8152
schema: PULocationID:long,ct:long
22:47:06.503 | INFO    | Task run 'SELECT PULocationID, COUNT(*) AS ct FROM... - 133ce-54db7018-0' - Finished in state Completed()
22:47:06.763 | INFO    | Flow run 'neat-junglefowl' - Finished in state Completed('All states completed.')