Coiled#

Fugue works perfectly well with Coiled. This document assumes you already have Coiled service setup and you know the basic operations on Coiled. If that is not the case, please create an account on Coiled (with free credits) and setup AWS or GCP according to their instructions.

Setup the environment#

Start from the pre-built Docker image#

The easiest way to start trying is to run the docker environment locally:

docker run -p 8888:8888 -it fugueproject/coiled:latest jupyter lab --port=8888 --ip=0.0.0.0 --no-browser --allow-root --NotebookApp.token='' --NotebookApp.password='' --NotebookApp.allow_origin='*'

You should access the JupyterLab environment by http://localhost:8888 in your browser. When you are in JupyterLab, the first thing is to log in into Coiled. You may start a terminal in the lab environment and type

coiled login

Or just follow the official instruction to setup your environment.

Now, you can start a new notebook. The first step is to create a software environment in Coiled. For example, you can run the following code:

import os
import coiled

SOFTWARE_ENV = os.environ["DOCKER_IMAGE"].replace("/","_").replace(":","_").replace(".","_")
coiled.create_software_environment(name=SOFTWARE_ENV, container=os.environ["DOCKER_IMAGE"])

The most important thing is the container is better to be the same as what you are on, which ensures version consistency.

Start from scratch#

To start from scratch, you must make sure the coiled cloud provider dependency is installed:

pip install fugue-cloudprovider[coiled]

Besides that you also need to make sure your worker environment has matching packages and python version. You must also install fugue on the worker side.

Use Fugue on Coiled#

If you already have a Coiled cluster#

You may follow the official doc or your own way to create a Cluster instance, assuming it is cluster. Then cluster can be used as the execution engine directly, for example:

from fugue_sql import fsql
from fugue import transform
import pandas as pd

fsql("""
CREATE [[0]] SCHEMA a:int
PRINT
""").run(cluster)

def my_transformer(df:pd.DataFrame) -> pd.DataFrame:
    return df

transform(
    pd.DataFrame(dict(a=[0,1])),
    my_transformer,
    schema="*",
    engine=cluster
)

In this case, Fugue is not responsible to start or close the Dask cluster, you must take care of it by yourself.

If you already have a Dask Client#

When you already instantiate a dask client, the client instance can be used as the execution engine. Or you can just use the string dask as the execution engine, it can find the active client automatically.

The following two ways are both fine:

fsql("""
CREATE [[0]] SCHEMA a:int
PRINT
""").run(client)

fsql("""
CREATE [[0]] SCHEMA a:int
PRINT
""").run("dask")

transform(
    pd.DataFrame(dict(a=[0,1])),
    my_transformer,
    schema="*",
    engine="dask"
)
%%fsql dask
CREATE [[0]] SCHEMA a:int
PRINT

Again, you will be responsible to start and stop the Dask client, Fugue will just use it.

It is true that people may forget closing the cluster, which is a waste of money. So we provide a slightly better way to help you manage the resource.

from fugue_coiled import CoiledDaskClient

with CoiledDaskClient(**coiled_cluster_kwargs) as client:
    fsql("""
        CREATE [['abc']] SCHEMA a:str
        SELECT * WHERE a LIKE 'ab%'
        PRINT
    """).run(client)

In this case, CoiledDaskClient will automatically close the Coiled cluster and the Dask client at the end of the context.

If you want to connect to a running Coiled cluster#

Just set the engine to coiled:<cluster_name>. Make sure the cluster with the name is active. Fugue will not stop this cluster after execution.

fsql("""
CREATE [[0]] SCHEMA a:int
PRINT
""").run("coiled:my_cluster")

transform(
    pd.DataFrame(dict(a=[0,1])),
    my_transformer,
    schema="*",
    engine="coiled:my_cluster",
)

If you want an ephemeral Coiled cluster#

If you haven’t instantiated a Coiled cluster or a Dask client, and you only want to use the computing resource at a certain step, we have a way to use coiled as an ephemeral service.

You just need to use the string coiled as the execution engine, and provide the configs in the following format:

Config Name

Description

token

The coiled token for authentication, it is required if you have not logged in on your machine

cluster

a dictionary of parameter to instantiate coiled.Cluster. For example dict(n_workers=2, software="my_env")

CONF = dict(token="abc", cluster=dict(n_workers=2, software="my_env"))
fsql("""
CREATE [[0]] SCHEMA a:int
PRINT
""").run("coiled", CONF)

transform(
    pd.DataFrame(dict(a=[0,1])),
    my_transformer,
    schema="*",
    engine="coiled",
    engine_conf=CONF
)
%%fsql coiled CONF
CREATE [[0]] SCHEMA a:int
PRINT