Checkpoint Deep Dive#

A checkpoint in DAG is to cache the execution state of a certain step. The purpose of a checkpoint may be one or some of the following:

  • break the lineage

  • avoid duplicated execution of the previous steps

  • quickly recover the execution state and resume

Spark Persist vs Checkpoint#

In Spark, there are persist and checkpoint (different from streaming checkpoint) methods for rdd.

Spark persist is to cache data into memory (or executor local disk), it does not break the lineage in Spark execution). It can’t break the lineage because the assumption is that the storage either executor memory or local disk is not permanent. Some executor can die during execution, to retrieve that portion of data, the driver has to use the lineage to recompute and recache. This is a great and robust idea, and in most cases it works very well. However, when the lineage contains hundreds or thousands of steps, we may see stack overflow problems.

Spark checkpoint is to store data into file system, and it breaks the lineage. This is also a fine idea, because the data will not lose even the entire execution breaks. The drawback is speed and disk space usage. Practically, this is more useful for large data and expensive compute cases where the overhead of saving intermediate data into filesystem is ignorable compared to the cost of the compute itself. Another scenario is that the execution environment is not reliable, executors can die any time and the compute is expensive, in this case, breaking the lineage may significantly improve the performance and reliability.

It’s worth to mention that Spark persist is lazy, only the first usage of the rdd will trigger the persist to happen. Spark checkpoint can be lazy or eager, eager means the caching operation happens immediately when requested. As of pyspark<=3.0.1, the checkpoint seems to be lazy by default, and it forces recompute of the previous steps unless you persist and checkpoint (see this issue). So the implementation itself if problematic.

Last but not the least, Spark persist’s cache is for current execution only while Spark checkpoint’s cache can be permanent (in reliable mode), so it can be retrived cross execution. This is similar to the caching mechanism of some workflow management solution such as Airflow. However practically, without a clear definition of determinism of the steps, retrieving checkpoints cross execution may cause more problem than it can solve.

Generalizing The Concept#

It is fair to say that Spark persist and checkpoint solves some common problems encountered in distributed computing. However, it lacks a systematic design for the problems it tries to solve, so the solutions are neither consistent nor complete.

For a caching problem inside a workflow, we need to consider several properties of a checkpoint:

  • lazy vs eager: as mentioned above, lazy checkpoint happens only when the first usage happens and eager checkpoint happens immediately regardless how it will be used in later steps

  • memory vs filesystem: storing data into the current worker’s memory (or worker’s local disk) is fast, but it has to keep the linage to deal with failure, plus, it can’t cross execution. Using (global) filesystem is slower, but it can break the lineage and may be used cross execution. Also using filesystem, the workers can have more free space for compute.

  • permanent vs non-permanent: memory must be non-permanent but filesystem can be permanent or non-permanent. For the permanent case, the filesystem checkpoint is not removed after one execution. For the non-permanent case, the checkpoint files will be removed after workflow execution finished. So only permanent checkpoint can be used cross execution.

  • deterministic vs non-deterministic: determinism is for cross execution cases, so only for permanent checkpoint, determinism is meaningful. For example for a defined execution graph, in the first execution, it failed after a certain step that has a deterministic checkpoint. If we run the exactly same graph again, due to determinism, we are able to know recover the checkpoint file when running the second time, so we can reliably and immediately ‘resume’ from the last failure point. For a non-deterministic permanent checkpoint, it is to intentionally disable ‘resume’ but to let users who get the checkpoint key (like a password token) use that data after execution.

  • explicit vs implicit: for a permanent checkpoint, the checkpoint key can be explicit, for example a user specified file path, or implicit, for example a random or deterministic uuid. So explicit checkpoint must also be a deterministic checkpoint, the difference is who defines the checkpoint key.

The following chart can explain the logical relationship between the properties:

Mapping The Concept To Spark And Fugue Features#

Mapping the properties to Spark and Fugue equivalent, we can have this comparison table

Lazy

Filesystem

Permanent

Deterministic

Explicit

Spark Equivalent

Fugue Equivalent

Yes

No

No

No

No

persist()

weak_checkpoint(lazy=True)

No

No

No

No

No

persist() or weak_checkpoint(lazy=False)

Yes

Yes

No

No

No

strong_checkpoint(lazy=True)

No

Yes

No

No

No

checkpoint() or strong_checkpoint(lazy=False)

Yes

Yes

Yes

No

No

No

Yes

Yes

No

No

checkpoint

yield_as

Yes

Yes

Yes

Yes

No

deteriministic_checkpoint(lazy=True)

No

Yes

Yes

Yes

No

deteriministic_checkpoint()

Yes

Yes

Yes

Yes

Yes

No

Yes

Yes

Yes

Yes

save & read

save & load

Checkpoints in Fugue#

Note that currently, Fugue does not recommend users to use any lazy checkpoint. In Fugue, we construct the DAG first and then execute, lazy checkpoints are only useful if we don’t know what the next steps are, but when you define the DAG, the following steps are always defined before execution, so when you put a checkpoint in the dag, there is nothing uncertain, if there are no following steps, you should remove the checkpoint.

Weak Checkpoint#

Weak checkpoint is in memory checkpoint. We don’t use ‘in memory’ because it may also use executor’s local disk space, depending on the execution engine we use. persist() is an alias of weak_checkpoint(lazy=False).

Note Fugue persist is eager but Spark persist is lazy.

from fugue import FugueWorkflow
from fugue_spark import SparkExecutionEngine
from time import sleep
import pandas as pd

# schema: *
def just_wait(df:pd.DataFrame) -> pd.DataFrame:
    sleep(5)
    return df

In the following code, the two shows, due to Spark lineage, will duplicae the transform step, and will take 10 sec

%%time
with FugueWorkflow(SparkExecutionEngine()) as dag:
    df = dag.df([[0]],"a:int")
    df = df.transform(just_wait)
    df.show()
    df.show()

In the following code, the two shows, due to Spark lineage, will duplicae the transform step, and will take 10 sec

%%time
with FugueWorkflow(SparkExecutionEngine()) as dag:
    df = dag.df([[0]],"a:int")
    df = df.transform(just_wait).persist()
    df.show()
    df.show()

The following takes almost no time because it’s a lazy checkpoint that didn’t happen

%%time
with FugueWorkflow(SparkExecutionEngine()) as dag:
    df = dag.df([[0]],"a:int")
    df = df.transform(just_wait).weak_checkpoint(lazy=True)

The following takes 5 sec because the first show triggers the persist and the second uses the cache. But it is pointless to use a lazy checkpoint in a predefined dag like this.

%%time
with FugueWorkflow(SparkExecutionEngine()) as dag:
    df = dag.df([[0]],"a:int")
    df = df.transform(just_wait).weak_checkpoint(lazy=True)
    df.show()
    df.show()

Strong Checkpoint#

Strong checkpoints are in file but non-permanent checkpoints. They are removed after the DAG execution done.

Note Fugue checkpoint is non-permanent but Spark checkpoint is permanent.

To use in filesystem checkpoints, we must specify the config fugue.workflow.cache.path

%%time
engine = SparkExecutionEngine(conf={"fugue.workflow.checkpoint.path":"/tmp"})
with FugueWorkflow(engine) as dag:
    df = dag.df([[0]],"a:int")
    df = df.transform(just_wait).checkpoint()
    df.show()
    df.show()
%%time
engine = SparkExecutionEngine(conf={"fugue.workflow.checkpoint.path":"/tmp"})
with FugueWorkflow(engine) as dag:
    df = dag.df([[0]],"a:int")
    df = df.transform(just_wait).strong_checkpoint(lazy=True)
    df.show()
    df.show()

Deterministic Checkpoint#

Deterministic checkpoint is mainly for ‘resuming’. Expecially when in an interactive environment, you may not want to rerun the whole DAG every time you make a minor change. But note that, practically, you should avoid using deterministic checkpoint in production, also whenever you use deterministic checkpoint, ask yourself, is the dag overly complicated? Can you make it more modulized?

See this example

%%time
engine = SparkExecutionEngine(conf={"fugue.workflow.checkpoint.path":"/tmp"})
with FugueWorkflow(engine) as dag:
    df = dag.df([[0]],"a:int")
    df = df.transform(just_wait).deterministic_checkpoint()
    df.show()

Now, run it again in the following, you will see it’s much faster, this is because the dag is identical, and it can resume from the deterministic checkpoint.

%%time
engine = SparkExecutionEngine(conf={"fugue.workflow.checkpoint.path":"/tmp"})
with FugueWorkflow(engine) as dag:
    df = dag.df([[0]],"a:int")
    df = df.transform(just_wait).deterministic_checkpoint()
    df.show()

But what if you want to force rerunning the transform? You can provide a different namespace to renew the determinism

%%time
engine = SparkExecutionEngine(conf={"fugue.workflow.checkpoint.path":"/tmp"})
with FugueWorkflow(engine) as dag:
    df = dag.df([[0]],"a:int")
    df = df.transform(just_wait).deterministic_checkpoint(namespace="x")
    df.show()

And if you run the above code again, you will see it uses the new checkpoint again.

Note that deterministic checkpoint is only dependent on the related steps change, for example if you add another independent steps, that doesn’t change the determinism, the following code can also resume from the previous checkpoint

%%time
engine = SparkExecutionEngine(conf={"fugue.workflow.checkpoint.path":"/tmp"})
with FugueWorkflow(engine) as dag:
    df = dag.df([[1]],"a:int")
    df.show()
    # added a new independent subworkflow above, the determinism of the following code is not changed
    df = dag.df([[0]],"a:int")
    df = df.transform(just_wait).deterministic_checkpoint(namespace="x")
    df.show()

Yield#

It is common to see you have a good separation of your logic, and they are divided into several DAGs, and you need to pass the dataframes between the DAGs. This can be done by save and load an explicit file path, but for intermediate data, you don’t always want to specify paths for them, also you want determinism to work cross DAGs. This is reason to use yield_as

%%time
dag1 = FugueWorkflow()
df = dag1.df([[0]],"a:int")
df.transform(just_wait).yield_as("df")

dag2 = FugueWorkflow()
df = dag2.df(dag1.yields["df"])
df.transform(just_wait).show()

engine = SparkExecutionEngine(conf={"fugue.workflow.checkpoint.path":"/tmp"})
dag1.run(engine)
dag2.run(engine)

Run the following twice, you can find the second run is about 5 sec faster than the first because of deterministic_checkpoint().yield_as("df")

%%time
dag1 = FugueWorkflow()
df = dag1.df([[0]],"a:int")
df.transform(just_wait).deterministic_checkpoint().yield_as("df")

dag2 = FugueWorkflow()
df = dag2.df(dag1.yields["df"])
df.transform(just_wait).show()

engine = SparkExecutionEngine(conf={"fugue.workflow.checkpoint.path":"/tmp"})
dag1.run(engine)
dag2.run(engine)