FugueSQL in 10 Minutes
Contents
FugueSQL in 10 Minutes#
Have questions? Chat with us on Github or Slack:
This is a short introduction of FugueSQL geared for new users. FugueSQL is the SQL interface for Fugue. The Fugue project aims to make big data effortless by accelerating iteration speed and providing a simpler interface for users to utilize distributed computing engines.
This tutorial only covers the SQL interface. For Python, check the Fugue API in 10 minutes section. Note that this is just an overview of the features, not a full tutorial.
FugueSQL is meant for SQL lovers who want to use their preferred grammar of choice on top of Pandas, Spark and Dask.
FugueSQL has also since extended to supporting SQL backends such as DuckDB and BigQuery. These allow data practitioners to create more modular pieces of SQL logic to iterate faster. This page shows the FugueSQL syntax, but for more specific examples on BigQuery, there is the BigQuery tutorial.
Installation#
There are two things to install. First is FugueSQL (which is separate from Fugue). Install it with:
pip install fugue[sql]
FugueSQL has a notebook extension for both Jupyter Notebooks and JupyterLab. This extension provides syntax highlighting. To install the extension, use pip:
pip install fugue-jupyter
and then to register the startup script:
fugue-jupyter install startup
See this documentation for more details.
Setup#
If you are using Jupyter lab and followed the installation instructions above, then the %%fsql
cell magic is already registered by default. Otherwise, it can be used using the following command where is_lab
indicates if you are using Jupyter Lab versus Classic Jupyter Notebooks. This setup()
gives both the cell magic and the syntax highlighting.
from fugue_jupyter import setup
setup()
Standard SQL Compatible#
FugueSQL is meant for SQL users to work with Python DataFrames (Pandas, Spark, and Dask). FugueSQL is parsed and then executed on the underlying engine. For example, FugueSQL with Spark is run on top of SparkSQL and PySpark. We’ll see non-standard SQL commands later but for now, the important thing to note is that Fugue is compatible with standard SQL.
First, we define two Pandas DataFrames.
import pandas as pd
df = pd.DataFrame({"col1": ["A","A","A","B","B","B"], "col2": [1,2,3,4,5,6]})
df2 = pd.DataFrame({"col1": ["A", "B"], "col3": [1, 2]})
Now we can use them in a %%fsql
cell as seen below. PRINT
is a FugueSQL keyword to display the first few rows of the DataFrame. Everything else besides PRINT
is standard SQL. By default, %%fsql
will run on Pandas.
%%fsql
SELECT df.col1, df.col2, df2.col3
FROM df
LEFT JOIN df2
ON df.col1 = df2.col1
WHERE df.col1 = "A"
PRINT
col1:str | col2:long | col3:long | |
---|---|---|---|
0 | A | 1 | 1 |
1 | A | 2 | 1 |
2 | A | 3 | 1 |
Using FugueSQL DataFrame in Python#
FugueSQL can access DataFrames defined in Python. To use a DataFrame from a FugueSQL query, we need to use the YIELD
keyword.
df = pd.DataFrame({"col1": ["A","A","A","B","B","B"], "col2": [1,2,3,4,5,6]})
%%fsql
SELECT *
FROM df
YIELD DATAFRAME AS result
YIELD
will make the variable available in Python. It will be a FugueDataFrame
where .native
contains the underlying Pandas, Spark, or Dask DataFrame.
print(type(result))
print(result.native.head())
<class 'fugue.dataframe.pandas_dataframe.PandasDataFrame'>
col1 col2
0 A 1
1 A 2
2 A 3
3 B 4
4 B 5
Saving and Loading Files#
In the previous cells, we relied on Python cells to load in the files and then bring them to FugueSQL. We can use FugueSQL to LOAD
and SAVE
the files directly. Parquet files are the most preferred method but CSVs and JSON are also supported. They just require some additional arguments. See the LOAD documentation for more details.
df = pd.DataFrame({"col1": ["A","A","A","B","B","B"], "col2": [1,2,3,4,5,6]})
df2 = pd.DataFrame({"col1": ["A", "B"], "col3": [1, 2]})
df.to_parquet("/tmp/df.parquet")
df2.to_parquet("/tmp/df2.parquet")
%%fsql
df = LOAD "/tmp/df.parquet"
df2 = LOAD "/tmp/df2.parquet"
new = SELECT df.col1, df.col2, df2.col3
FROM df
LEFT JOIN df2
ON df.col1 = df2.col1
WHERE df.col1 = "A"
SAVE OVERWRITE "/tmp/res.parquet"
Variable Assignment#
As seen in the previous cell, Fugue simplifies SQL syntax by removing the need for common table expressions (CTEs). CTEs are still supported but FugueSQL users can assign tables to variables with the =
sign. This reduces a significant amount of boilerplate code SQL practitioners have to deal with.
%%fsql
df = LOAD "/tmp/df.parquet"
max_vals = SELECT col1, MAX(col2) AS max_val
FROM df
GROUP BY col1
SELECT df.col1,
df.col2 / max_vals.max_val AS normalized
FROM df
JOIN max_vals
ON df.col1 = max_vals.col1
PRINT
col1:str | normalized:double | |
---|---|---|
0 | A | 0.333333 |
1 | A | 0.666667 |
2 | A | 1.000000 |
3 | B | 0.666667 |
4 | B | 0.833333 |
5 | B | 1.000000 |
Anonymity (Optional)#
The boilerplate code the SQL introduces can be reduced further by using a FugueSQL featured called anonymity. If no FROM
clause is used, the last table will be pulled. This way, intermediate steps don’t have to be named. The example below has no FROM
clause. Tables only need to be named if they will be joined downstream.
%%fsql
LOAD "/tmp/df.parquet"
SELECT col1, MAX(col2) AS max_val
GROUP BY col1
PRINT
col1:str | max_val:long | |
---|---|---|
0 | A | 3 |
1 | B | 6 |
The PRINT
keyword we used earlier actually uses anonymity. The fully written version would look like this:
%%fsql
df = LOAD "/tmp/df.parquet"
PRINT 2 ROWS FROM df
col1:str | col2:long | |
---|---|---|
0 | A | 1 |
1 | A | 2 |
Invoking Python Code#
In all data computing frameworks, SQL is a second-class citizen often sandwiched between Python code. FugueSQL elevates SQL to be a first-class interface that can invoke Python code. We’ll show an example below, but for more details about what functions can be used, see the Fugue in 10 minutes section. The valid functions for Fugue’s transform()
function will be the same as the ones in FugueSQL.
Using Python can often reduce the amount of code that we need to write. For example, let’s normalize the column like we did previously.
# schema: *+col2:float
def std_dev(df: pd.DataFrame) -> pd.DataFrame:
return df.assign(col2=df['col2']/df['col2'].max())
The function above is defined to handle one group of data at a time. In order to apply it per group, we partition the DataFrame first by group using the PREPARTITION
and TRANSFORM
keywords of FugueSQL. This semantic is similar to a groupby-apply
.
%%fsql
df = LOAD "/tmp/df.parquet"
TRANSFORM df PREPARTITION BY col1 USING std_dev
PRINT
col1:str | col2:float | |
---|---|---|
0 | A | 0.333333 |
1 | A | 0.666667 |
2 | A | 1.000000 |
3 | B | 0.666667 |
4 | B | 0.833333 |
5 | B | 1.000000 |
Execution Engine#
The strongest feature of FugueSQL is that it can be run on any of the backend engines Fugue supports. Fugue supports Pandas, Spark, Dask, and DuckDB. For operations on a laptop or single machine, DuckDB may give significant improvements over Pandas because it has a query optimizer.
For data that is too large to process on a single machine, Spark or Dask can be used. All we need to do is specify the engine in the cell. For example, to run on DuckDB we can do:
%%fsql duckdb
df = LOAD "/tmp/df.parquet"
TRANSFORM df PREPARTITION BY col1 USING std_dev
PRINT
col1:str | col2:float | |
---|---|---|
0 | A | 0.333333 |
1 | A | 0.666667 |
2 | A | 1.000000 |
3 | B | 0.666667 |
4 | B | 0.833333 |
5 | B | 1.000000 |
Or to run on Spark, we can specify %%fsql spark
. If there is a SparkSession
defined, it will use it. Otherwise, it will start a new SparkSession
.
%%fsql spark
df = LOAD "/tmp/df.parquet"
TRANSFORM df PREPARTITION BY col1 USING std_dev
PRINT
col1:str | col2:float | |
---|---|---|
0 | B | 0.666667 |
1 | B | 0.833333 |
2 | B | 1.000000 |
3 | A | 0.333333 |
4 | A | 0.666667 |
5 | A | 1.000000 |
FugueSQL in Scripts#
The %%fsql
cell magic is meant for iteration inside Jupyter notebooks. To use FugueSQL in scripts, there is a fugue_sql_flow
class that can be used. For example:
from fugue.api import fugue_sql_flow
fugue_sql_flow("""
LOAD "/tmp/df.parquet"
SELECT col1, MAX(col2) AS max_val
GROUP BY col1
PRINT
""").run(engine="spark");
col1:str | max_val:long | |
---|---|---|
0 | B | 6 |
1 | A | 3 |
If the query only has a single return, users can use fugue_sql
instead. This will just grab the last DataFrame of the query.
from fugue.api import fugue_sql
result = fugue_sql("""
LOAD "/tmp/df.parquet"
SELECT col1, MAX(col2) AS max_val
GROUP BY col1
""", engine=None)
result.head()
col1 | max_val | |
---|---|---|
0 | A | 3 |
1 | B | 6 |
Distributed Computing Commands (Advanced)#
One of the weak points of SQL is that it doesn’t have the grammar to describe distributed computing operations. For example, it’s common to PERSIST
DataFrames in Spark to hold them in memory so that they don’t get recomputed.
FugueSQL adds keywords such as PERSIST
and BROADCAST
to allow users to perform these operations without leaving SQL. In the example below, df2
will not be recomputed on the Spark and Dask engines.
%%fsql spark
df = LOAD "/tmp/df.parquet"
df2 = SELECT *
FROM df
WHERE col2 > 2
PERSIST
SAVE df2 OVERWRITE "/tmp/df-processed.parquet"
SELECT COUNT(col2) AS cnt
FROM df2
PRINT
cnt:long | |
---|---|
0 | 4 |
Next Steps#
For a more detailed guide to FugueSQL, check the FugueSQL section of the documentation.