Databricks
Contents
Databricks#
Fugue works perfectly well with Databricks. This document assumes you already have Databricks service setup and you know the basic operations on Databricks. If that is not the case, please read this blog first to onboard Databricks.
Using Fugue on Databricks Notebooks#
Please follow this blog. The only thing you need to pay attention to is that you must install fugue
in Libraries. And you may add extras to the installation, but make sure you DO NOT include spark
. For example, you should NOT install fugue[spark]
because Databricks already installed its own PySpark.
When you are on the notebook, the first step is to setup the environment (if you want to use fugue SQL)
from fugue_notebook import setup
setup(is_lab=True)
Then you can directly use spark
as your execution engine because it is the magic variable in Databricks representing the SparkSession
import pandas as pd
# schema: *
def dummy(df:pd.DataFrame) -> pd.DataFrame:
return df
tdf = pd.DataFrame(dict(a=[1,2,3,4]))
from fugue import transform
spark_df = transform(tdf, dummy, engine=spark)
spark_df.show()
You can also use fsql
magic. To use spark, just do %%fsql spark
%%fsql spark
CREATE [[0],[1]] SCHEMA a:int
TRANSFORM USING dummy
PRINT
Using Fugue On A Remote Notebook#
One important value of Fugue is to let you iterate your ideas on a local box that is not always connected with Databricks. It can save time and cost.
Fugue can let you use Databricks as ephemeral Spark clusters, only when you need it, you can directly activate an existing cluster or create a new cluster from your notebook. Fugue can also automatically stop the remote cluster if it is created on the fly.
Installation#
In order to use Fugue in this mode. You must install the Databricks plugins.
pip uninstall pyspark
pip install fugue-cloudprovider[databricks]
If you also need to access AWS or GCP, you should install the correspondent extras: aws
or gcp
. For example:
pip uninstall pyspark
pip install fugue-cloudprovider[databricks,aws]
This will install both databricks-connect and databricks-cli. We need databricks-connect to interact with the remote SparkSession and databricks-cli to manage the clusters.
Configuration#
Unfortunately, Databricks doesn’t unify the requirements of the two packages. They need different ways to initialize. Fugue unifies them to some degree. We always require users to provide host
, token
and cluster_id
because these are the common requirements for the two packages.
For details how to get those values, please read this Databricks document section.
You can provide them in two ways: using environment variables or config keys in python.
Parameter |
Config Key |
Environment Variable |
Default |
---|---|---|---|
Databricks Host |
host |
DATABRICKS_ADDRESS |
REQUIRED |
Databricks Token |
token |
DATABRICKS_API_TOKEN |
REQUIRED |
Org ID |
org_id |
DATABRICKS_ORG_ID |
empty |
Port |
port |
DATABRICKS_ORG_ID |
15001 |
Notice that the command
databricks-connect configure
will have no effect, the configured values through the command line will not be used. You don’t have to run this command. The value of the config keys will have higher priority. For example, if you define a python dict:
conf = {
"cluster_id": "abc",
"token": "mytoken",
}
And if you have these environment variables:
DATABRICKS_API_TOKEN="def"
DATABRICKS_ADDRESS="https://dummy"
Then Fugue will us the following for both databricks-connect and databricks-cli:
{'cluster_id': 'abc', 'token': 'mytoken', 'host': 'https://dummy'}
Connecting to an existing cluster#
The above example is the minimal requirement to connect to an existing cluster. An existing cluster on Databricks does not need to be running, Fugue will start the cluster if it is stopped.
The easiest way to get a remote session from a cluster id is:
from fugue_databricks import init_db_spark
conf = {
"host": "https://dbc-38aaa459-faaf.cloud.databricks.com",
"token": "dapiecaaae64a727498daaaaafe1bace968a",
"cluster_id": "0612-191111-6fopaaaa"
}
spark_session = init_db_spark(conf)
You must make sure that fugue
is in installed in Libraries
of the cluster.
spark_session
is just a normal SparkSession, you can use it as the execution engine as we showed before.
from fugue import transform
spark_df = transform(tdf, dummy, engine=spark_session)
spark_df.show()
Notice when using Fugue SQL cell, you should still use %%fsql spark
meaning that getting the current spark session.
%%fsql spark
CREATE [[0],[1]] SCHEMA a:int
TRANSFORM USING dummy
PRINT
But using it programmatically, you should use spark_session
from fugue_sql import fsql
fsql("""
CREATE [[0],[1]] SCHEMA a:int
TRANSFORM USING dummy
PRINT
""").run(spark_session)
There is an alternative way without using init_db_spark
. You can use db
as the engine, and conf
as the config
spark_df = transform(tdf, dummy, engine="db", engine_conf=conf)
spark_df.show()
%%fsql db conf
CREATE [[0],[1]] SCHEMA a:int
TRANSFORM USING dummy
PRINT
fsql("""
CREATE [[0],[1]] SCHEMA a:int
TRANSFORM USING dummy
PRINT
""").run("db", conf)
Connecting to an ephemeral cluster#
With Fugue, users can provide the cluster spec json, Fugue can create an ephemeral cluster, run the job and stop the cluster. In this way, you will have zero interaction with Databricks UI.
First let’s see a config example:
spec = {
"num_workers": 1,
"cluster_name": "test", # Any name is fine, duplicated name is fine
"spark_version": "10.4.x-scala2.12", # Must match databricks-connect version
"spark_conf": {
"spark.speculation": "true"
},
"aws_attributes": {
"first_on_demand": 1,
"availability": "SPOT",
"zone_id": "us-west-2c",
"spot_bid_price_percent": 100,
"ebs_volume_count": 0
},
"node_type_id": "i3.xlarge",
"driver_node_type_id": "i3.xlarge",
"ssh_public_keys": [],
"custom_tags": {},
"spark_env_vars": { # Get AWS credentials from DB's secrect store
"AWS_SECRET_ACCESS_KEY": "{{secrets/secret}}",
"AWS_ACCESS_KEY_ID": "{{secrets/access}}"
},
"autotermination_minutes": 20, # This is another protection
"enable_elastic_disk": False,
"runtime_engine": "STANDARD"
}
conf = {
"host": "https://dbc-38aaa459-faaf.cloud.databricks.com",
"token": "dapiecaaae64a727498daaaaafe1bace968a",
"cluster": spec,
"libraries": [
{
"pypi": {
"package": "fugue" # must install fugue!!
}
}
]
}
For details of cluster spec, please read this document.
For details of libraries spec, please read this document.
A tip to get the spec is to output JSON of one of your existing clusters created from UI and modify it for your need.
You can use init_db_spark
to initialize the Spark cluster, but it’s not recommended because this approach does not stop the cluster for you. Instead, you can use db_spark
with with
statement.
from fugue_databricks import db_spark
with db_spark(conf) as spark_session:
spark_df = transform(tdf, dummy, engine=spark_session)
spark_df.show()
The following approach will also guarantee the ephemeral cluster to be closed after use:
spark_df = transform(tdf, dummy, engine="db", engine_conf=conf)
spark_df.show()
Fugue is able to identify if you are requesting an ephemeral cluster from the config. If cluster
is provided, then it is ephemeral, otherwise, cluster_id
must be provided, they can’t both exist.
A quick summary, we have 3 ways to connect to Databricks from a remote notebook:
init_db_spark
is better for existing clusters. It doesn’t stop any cluster. Using this to get aSparkSession
then it can be very intuitive.engine="db", engine_conf=conf
is great for both existing and ephemeral clusters. It will automatically stop the ephemeral cluster for you. It will auto start an exsiting cluster but will not stop it automatically.with db_spark(conf) as spark:
is great for both existing and ephemeral clusters. It will automatically stop the ephemeral cluster for you. It will auto start an exsiting cluster but will not stop it automatically.