Databricks#

Fugue works perfectly well with Databricks. This document assumes you already have Databricks service setup and you know the basic operations on Databricks. If that is not the case, please read this blog first to onboard Databricks.

Using Fugue on Databricks Notebooks#

Please follow this blog. The only thing you need to pay attention to is that you must install fugue in Libraries. And you may add extras to the installation, but make sure you DO NOT include spark. For example, you should NOT install fugue[spark] because Databricks already installed its own PySpark.

When you are on the notebook, the first step is to setup the environment (if you want to use fugue SQL)

from fugue_notebook import setup
setup(is_lab=True)

Then you can directly use spark as your execution engine because it is the magic variable in Databricks representing the SparkSession

import pandas as pd

# schema: *
def dummy(df:pd.DataFrame) -> pd.DataFrame:
    return df

tdf = pd.DataFrame(dict(a=[1,2,3,4]))
from fugue import transform

spark_df = transform(tdf, dummy, engine=spark)
spark_df.show()

You can also use fsql magic. To use spark, just do %%fsql spark

%%fsql spark
CREATE [[0],[1]] SCHEMA a:int
TRANSFORM USING dummy
PRINT

Using Fugue On A Remote Notebook#

One important value of Fugue is to let you iterate your ideas on a local box that is not always connected with Databricks. It can save time and cost.

Fugue can let you use Databricks as ephemeral Spark clusters, only when you need it, you can directly activate an existing cluster or create a new cluster from your notebook. Fugue can also automatically stop the remote cluster if it is created on the fly.

Installation#

In order to use Fugue in this mode. You must install the Databricks plugins.

pip uninstall pyspark
pip install fugue-cloudprovider[databricks]

If you also need to access AWS or GCP, you should install the correspondent extras: aws or gcp. For example:

pip uninstall pyspark
pip install fugue-cloudprovider[databricks,aws]

This will install both databricks-connect and databricks-cli. We need databricks-connect to interact with the remote SparkSession and databricks-cli to manage the clusters.

Configuration#

Unfortunately, Databricks doesn’t unify the requirements of the two packages. They need different ways to initialize. Fugue unifies them to some degree. We always require users to provide host, token and cluster_id because these are the common requirements for the two packages.

For details how to get those values, please read this Databricks document section.

You can provide them in two ways: using environment variables or config keys in python.

Parameter

Config Key

Environment Variable

Default

Databricks Host

host

DATABRICKS_ADDRESS

REQUIRED

Databricks Token

token

DATABRICKS_API_TOKEN

REQUIRED

Org ID

org_id

DATABRICKS_ORG_ID

empty

Port

port

DATABRICKS_ORG_ID

15001

Notice that the command

databricks-connect configure

will have no effect, the configured values through the command line will not be used. You don’t have to run this command. The value of the config keys will have higher priority. For example, if you define a python dict:

conf = {
    "cluster_id": "abc",
    "token": "mytoken",
}

And if you have these environment variables:

DATABRICKS_API_TOKEN="def"
DATABRICKS_ADDRESS="https://dummy"

Then Fugue will us the following for both databricks-connect and databricks-cli:

{'cluster_id': 'abc', 'token': 'mytoken', 'host': 'https://dummy'}

Connecting to an existing cluster#

The above example is the minimal requirement to connect to an existing cluster. An existing cluster on Databricks does not need to be running, Fugue will start the cluster if it is stopped.

The easiest way to get a remote session from a cluster id is:

from fugue_databricks import init_db_spark

conf = {
    "host": "https://dbc-38aaa459-faaf.cloud.databricks.com",
    "token": "dapiecaaae64a727498daaaaafe1bace968a",
    "cluster_id": "0612-191111-6fopaaaa"
}

spark_session = init_db_spark(conf)

You must make sure that fugue is in installed in Libraries of the cluster.

spark_session is just a normal SparkSession, you can use it as the execution engine as we showed before.

from fugue import transform

spark_df = transform(tdf, dummy, engine=spark_session)
spark_df.show()

Notice when using Fugue SQL cell, you should still use %%fsql spark meaning that getting the current spark session.

%%fsql spark
CREATE [[0],[1]] SCHEMA a:int
TRANSFORM USING dummy
PRINT

But using it programmatically, you should use spark_session

from fugue_sql import fsql

fsql("""
CREATE [[0],[1]] SCHEMA a:int
TRANSFORM USING dummy
PRINT
""").run(spark_session)

There is an alternative way without using init_db_spark. You can use db as the engine, and conf as the config

spark_df = transform(tdf, dummy, engine="db", engine_conf=conf)
spark_df.show()
%%fsql db conf
CREATE [[0],[1]] SCHEMA a:int
TRANSFORM USING dummy
PRINT
fsql("""
CREATE [[0],[1]] SCHEMA a:int
TRANSFORM USING dummy
PRINT
""").run("db", conf)

Connecting to an ephemeral cluster#

With Fugue, users can provide the cluster spec json, Fugue can create an ephemeral cluster, run the job and stop the cluster. In this way, you will have zero interaction with Databricks UI.

First let’s see a config example:

spec = {
    "num_workers": 1,
    "cluster_name": "test", # Any name is fine, duplicated name is fine
    "spark_version": "10.4.x-scala2.12", # Must match databricks-connect version
    "spark_conf": {
        "spark.speculation": "true"
    },
    "aws_attributes": {
        "first_on_demand": 1,
        "availability": "SPOT",
        "zone_id": "us-west-2c",
        "spot_bid_price_percent": 100,
        "ebs_volume_count": 0
    },
    "node_type_id": "i3.xlarge",
    "driver_node_type_id": "i3.xlarge",
    "ssh_public_keys": [],
    "custom_tags": {},
    "spark_env_vars": { # Get AWS credentials from DB's secrect store
        "AWS_SECRET_ACCESS_KEY": "{{secrets/secret}}",
        "AWS_ACCESS_KEY_ID": "{{secrets/access}}"
    },
    "autotermination_minutes": 20, # This is another protection
    "enable_elastic_disk": False,
    "runtime_engine": "STANDARD"
}

conf = {
    "host": "https://dbc-38aaa459-faaf.cloud.databricks.com",
    "token": "dapiecaaae64a727498daaaaafe1bace968a",
    "cluster": spec,
    "libraries": [
        {
          "pypi": {
            "package": "fugue" # must install fugue!!
          }
        }
    ]
}

For details of cluster spec, please read this document.

For details of libraries spec, please read this document.

A tip to get the spec is to output JSON of one of your existing clusters created from UI and modify it for your need.

You can use init_db_spark to initialize the Spark cluster, but it’s not recommended because this approach does not stop the cluster for you. Instead, you can use db_spark with with statement.

from fugue_databricks import db_spark

with db_spark(conf) as spark_session:
    spark_df = transform(tdf, dummy, engine=spark_session)
    spark_df.show()

The following approach will also guarantee the ephemeral cluster to be closed after use:

spark_df = transform(tdf, dummy, engine="db", engine_conf=conf)
spark_df.show()

Fugue is able to identify if you are requesting an ephemeral cluster from the config. If cluster is provided, then it is ephemeral, otherwise, cluster_id must be provided, they can’t both exist.

A quick summary, we have 3 ways to connect to Databricks from a remote notebook:

  1. init_db_spark is better for existing clusters. It doesn’t stop any cluster. Using this to get a SparkSession then it can be very intuitive.

  2. engine="db", engine_conf=conf is great for both existing and ephemeral clusters. It will automatically stop the ephemeral cluster for you. It will auto start an existing cluster but will not stop it automatically.

  3. with db_spark(conf) as spark: is great for both existing and ephemeral clusters. It will automatically stop the ephemeral cluster for you. It will auto start an existing cluster but will not stop it automatically.