All questions are welcome in the Slack channel.
We have previously introduced extensions in the Getting Started section. This section is a more comprehensive guide to extensions in Fugue. Extensions are user-created functions that perform operations on DataFrames. By converting these functions to the approriate extension, they can be brought into Fugue workflows.
Driver vs Worker#
To fully understand extensions, it is important to understand the basic distributed compute architecture. A compute cluster is comprised of one
driver, and multiple
worker nodes. The
driver is the machine responsible for orchestrating the
worker machines. Because the driver is responsible for communciating and distributing work, it has access to information that worker nodes don’t have. For example, it keeps track of partition information.
Because of this distinction in distributed computing, there will be code written that is specifically meant to be run on the
driver machine. These are things like loading and saving a dataframe, or maybe the logic to divide the data into different partitions. On the other hand, code that runs on
workers will be agnostic to what is happening on other workers.
Note that this means using a function to get the maximum value of a column behaves differently dependending if it is a client-side or worker-side extension. The client-side extension will give a global maximum, while the worker-side extension will give a local maximum.
In the following descriptions, note the difference between
LocalDataFrame is a dataframe that exists on a single machine. The
LocalDataFrame is an abstraction for structures like
List[List[Any]]. On the other hand, the
DataFrame is a dataframe that can exist on multiple machines. Fugue’s
DataFrame class is a abstract version of Spark or Dask DataFrames.
DataFrame can only be used on driver.
Creator: no input, single output
DataFrame, it is to produce a
DataFrameto be used by other extensions
Processor: one or multiple input
DataFrame, single output
DataFrame, it is to do certain transformation and pass to the next node
Outputter: one or multiple input
DataFrames, no input, it is to finalize the process of the input, for example save or print
CoTransformer: one or multiple
Advanced worker-side extensions
These extensions can be defined with the appropriate Python class or decorator. For example, a
transformer can be defined with the
Transformer class or by using the
@transformer decorator with a Python function. These are interfaces provided by Fugue, but they are not required to convert functions to extensions. As seen in the beginner tutorial, schema hints can be used to define extensions. For example, the following function will create a new column called
# schema: *,c:int def add_transformer(df:pd.DataFrame) -> pd.DataFrame: df['c'] = df['a'] + df['b'] return df
This schema hint comment is read by Fugue to make the
add_transformer an extension during runtime. In fact, the schema hint is not even required if the schema is provided during runtime as seen below.
from fugue import transform def add_transformer(df:pd.DataFrame) -> pd.DataFrame: df['c'] = df['a'] + df['b'] return df df = transform(df, add_transformer, schema="*, c:int")
These approaches that leave the code in native Python are called the interfaceless approach and are the easiest way to use Fugue. They are designed to be non-invasive, and at the same time, encourage more maintainable code by working around typo annotations and comments. The resulting code is not tied to Fugue, and can run independently.
Output Schema Requirement#
There is a distinction when it comes to
worker-side extensions. The driver-side extensions (Creator, Processor, Outputter) have access to the schema, so there is no need to infer or guess it. This is why Fugue does not require the schema to be specified if the output annotation is one of
pd.DataFrame for the
worker-side extensions, things need to be a bit more explicit. Normally distributed computing frameworks can infer output schema, however, it is neither reliable nor efficient. To infer the schema, it has to go through at least one partition of data and figure out the possible schema. However,
transformers can produce inconsistent schemas on different partitions. The inference can also take a long time or directly fail. So to avoid potential correctness and performance issues,
CoTransformer output schemas are required in Fugue.