Fugue works perfectly well with Coiled. This document assumes you already have Coiled service setup and you know the basic operations on Coiled. If that is not the case, please create an account on Coiled (with free credits) and setup AWS or GCP according to their instructions.
Setup the environment#
Start from the pre-built Docker image#
The easist way to start trying is to run the docker environment locally:
docker run -p 8888:8888 -it fugueproject/coiled:latest jupyter lab --port=8888 --ip=0.0.0.0 --no-browser --allow-root --NotebookApp.token='' --NotebookApp.password='' --NotebookApp.allow_origin='*'
You should access the JupyterLab envrionment by
http://localhost:8888 in your browser. When you are in JupyterLab, the first thing is to log in into Coiled. You may start a terminal in the lab environment and type
Or just follow the official instruction to setup your environment.
Now, you can start a new notebook. The first step is to create a software environment in Coiled. For example, you can run the following code:
import os import coiled SOFTWARE_ENV = os.environ["DOCKER_IMAGE"].replace("/","_").replace(":","_").replace(".","_") coiled.create_software_environment(name=SOFTWARE_ENV, container=os.environ["DOCKER_IMAGE"])
The most important thing is the container is better to be the same as what you are on, which ensures version consistency.
Start from scratch#
To start from scratch, you must make sure the coiled cloud provider dependency is installed:
pip install fugue-cloudprovider[coiled]
Besides that you also need to make sure your worker environment has matching packages and python version. You must also install fugue on the worker side.
Use Fugue on Coiled#
If you already have a Coiled cluster#
You may follow the official doc or your own way to create a
Cluster instance, assuming it is
cluster can be used as the execution engine directly, for example:
from fugue_sql import fsql from fugue import transform import pandas as pd fsql(""" CREATE [] SCHEMA a:int PRINT """).run(cluster) def my_transformer(df:pd.DataFrame) -> pd.DataFrame: return df transform( pd.DataFrame(dict(a=[0,1])), my_transformer, schema="*", engine=cluster )
In this case, Fugue is not responsible to start or close the Dask cluster, you must take care of it by yourself.
If you already have a Dask Client#
When you already instantiate a dask client, the
client instance can be used as the execution engine. Or you can just use the string
dask as the execution engine, it can find the active client automatically.
The following two ways are both fine:
fsql(""" CREATE [] SCHEMA a:int PRINT """).run(client) fsql(""" CREATE [] SCHEMA a:int PRINT """).run("dask") transform( pd.DataFrame(dict(a=[0,1])), my_transformer, schema="*", engine="dask" )
%%fsql dask CREATE [] SCHEMA a:int PRINT
Again, you will be responsible to start and stop the Dask client, Fugue will just use it.
It is true that people may forget closing the cluster, which is a waste of money. So we provide a slightly better way to help you manage the resource.
from fugue_coiled import CoiledDaskClient with CoiledDaskClient(**coiled_cluster_kwargs) as client: fsql(""" CREATE [['abc']] SCHEMA a:str SELECT * WHERE a LIKE 'ab%' PRINT """).run(client)
In this case,
CoiledDaskClient will automatically close the Coiled cluster and the Dask client at the end of the context.
If you want to connect to a running Coiled cluster#
Just set the engine to
coiled:<cluster_name>. Make sure the cluster with the name is active. Fugue will not stop this cluster after execution.
fsql(""" CREATE [] SCHEMA a:int PRINT """).run("coiled:my_cluster") transform( pd.DataFrame(dict(a=[0,1])), my_transformer, schema="*", engine="coiled:my_cluster", )
If you want an ephemeral Coiled cluster#
If you haven’t instantiated a Coiled cluster or a Dask client, and you only want to use the computing resource at a certain step, we have a way to use coiled as an ephemeral service.
You just need to use the string
coiled as the execution engine, and provide the configs in the following format:
The coiled token for authentication, it is required if you have not logged in on your machine
a dictionary of parameter to instantiate coiled.Cluster. For example
CONF = dict(token="abc", cluster=dict(n_workers=2, software="my_env"))
fsql(""" CREATE [] SCHEMA a:int PRINT """).run("coiled", CONF) transform( pd.DataFrame(dict(a=[0,1])), my_transformer, schema="*", engine="coiled", engine_conf=CONF )
%%fsql coiled CONF CREATE [] SCHEMA a:int PRINT