FugueSQL in 10 Minutes#

Have questions? Chat with us on Github or Slack:

Homepage Slack Status

This is a short introduction of FugueSQL geared for new users. FugueSQL is the SQL interface for Fugue. The Fugue project aims to make big data effortless by accelerating iteration speed and providing a simpler interface for users to utilize distributed computing engines.

This tutorial only covers the SQL interface. For Python, check the Fugue API in 10 minutes section. Note that this is just an overview of the features, not a full tutorial.

FugueSQL is meant for SQL lovers who want to use their preferred grammar of choice on top of Pandas, Spark and Dask.

FugueSQL has also since extended to supporting SQL backends such as DuckDB and BigQuery. These allow data practitioners to create more modular pieces of SQL logic to iterate faster. This page shows the FugueSQL syntax, but for more specific examples on BigQuery, there is the BigQuery tutorial.

Installation#

There are two things to install. First is FugueSQL (which is separate from Fugue). Install it with:

pip install fugue[sql]

FugueSQL has a notebook extension for both Jupyter Notebooks and JupyterLab. This extension provides syntax highlighting. To install the extension, use pip:

pip install fugue-jupyter

and then to register the startup script:

fugue-jupyter install startup

See this documentation for more details.

Setup#

If you are using Jupyter lab and followed the installation instructions above, then the %%fsql cell magic is already registered by default. Otherwise, it can be used using the following command where is_lab indicates if you are using Jupyter Lab versus Classic Jupyter Notebooks. This setup() gives both the cell magic and the syntax highlighting.

from fugue_jupyter import setup

setup()

Standard SQL Compatible#

FugueSQL is meant for SQL users to work with Python DataFrames (Pandas, Spark, and Dask). FugueSQL is parsed and then executed on the underlying engine. For example, FugueSQL with Spark is run on top of SparkSQL and PySpark. We’ll see non-standard SQL commands later but for now, the important thing to note is that Fugue is compatible with standard SQL.

First, we define two Pandas DataFrames.

import pandas as pd

df = pd.DataFrame({"col1": ["A","A","A","B","B","B"], "col2": [1,2,3,4,5,6]})
df2 = pd.DataFrame({"col1": ["A", "B"], "col3": [1, 2]})

Now we can use them in a %%fsql cell as seen below. PRINT is a FugueSQL keyword to display the first few rows of the DataFrame. Everything else besides PRINT is standard SQL. By default, %%fsql will run on Pandas.

%%fsql
   SELECT df.col1, df.col2, df2.col3
     FROM df
LEFT JOIN df2
       ON df.col1 = df2.col1
    WHERE df.col1 = "A"
    PRINT
col1:str col2:long col3:long
0 A 1 1
1 A 2 1
2 A 3 1
PandasDataFrame: col1:str,col2:long,col3:long

Using FugueSQL DataFrame in Python#

FugueSQL can access DataFrames defined in Python. To use a DataFrame from a FugueSQL query, we need to use the YIELD keyword.

df = pd.DataFrame({"col1":  ["A","A","A","B","B","B"], "col2": [1,2,3,4,5,6]})
%%fsql
SELECT *
  FROM df
 YIELD DATAFRAME AS result

YIELD will make the variable available in Python. It will be a FugueDataFrame where .native contains the underlying Pandas, Spark, or Dask DataFrame.

print(type(result))
print(result.native.head())
<class 'fugue.dataframe.pandas_dataframe.PandasDataFrame'>
  col1  col2
0    A     1
1    A     2
2    A     3
3    B     4
4    B     5

Saving and Loading Files#

In the previous cells, we relied on Python cells to load in the files and then bring them to FugueSQL. We can use FugueSQL to LOAD and SAVE the files directly. Parquet files are the most preferred method but CSVs and JSON are also supported. They just require some additional arguments. See the LOAD documentation for more details.

df = pd.DataFrame({"col1": ["A","A","A","B","B","B"], "col2": [1,2,3,4,5,6]})
df2 = pd.DataFrame({"col1": ["A", "B"], "col3": [1, 2]})
df.to_parquet("/tmp/df.parquet")
df2.to_parquet("/tmp/df2.parquet")
%%fsql
df = LOAD "/tmp/df.parquet"
df2 = LOAD "/tmp/df2.parquet"

new =  SELECT df.col1, df.col2, df2.col3
         FROM df
         LEFT JOIN df2
           ON df.col1 = df2.col1 
        WHERE df.col1 = "A"

SAVE OVERWRITE "/tmp/res.parquet" 

Variable Assignment#

As seen in the previous cell, Fugue simplifies SQL syntax by removing the need for common table expressions (CTEs). CTEs are still supported but FugueSQL users can assign tables to variables with the = sign. This reduces a significant amount of boilerplate code SQL practitioners have to deal with.

%%fsql
df = LOAD "/tmp/df.parquet"

max_vals = SELECT col1, MAX(col2) AS max_val
             FROM df
         GROUP BY col1

   SELECT df.col1, 
          df.col2 / max_vals.max_val AS normalized
     FROM df
     JOIN max_vals
       ON df.col1 = max_vals.col1
    PRINT
col1:str normalized:double
0 A 0.333333
1 A 0.666667
2 A 1.000000
3 B 0.666667
4 B 0.833333
5 B 1.000000
PandasDataFrame: col1:str,normalized:double

Anonymity (Optional)#

The boilerplate code the SQL introduces can be reduced further by using a FugueSQL featured called anonymity. If no FROM clause is used, the last table will be pulled. This way, intermediate steps don’t have to be named. The example below has no FROM clause. Tables only need to be named if they will be joined downstream.

%%fsql
LOAD "/tmp/df.parquet"

SELECT col1, MAX(col2) AS max_val
 GROUP BY col1
 PRINT
col1:str max_val:long
0 A 3
1 B 6
PandasDataFrame: col1:str,max_val:long

The PRINT keyword we used earlier actually uses anonymity. The fully written version would look like this:

%%fsql
df = LOAD "/tmp/df.parquet"

PRINT 2 ROWS FROM df
col1:str col2:long
0 A 1
1 A 2
PandasDataFrame: col1:str,col2:long

Invoking Python Code#

In all data computing frameworks, SQL is a second-class citizen often sandwiched between Python code. FugueSQL elevates SQL to be a first-class interface that can invoke Python code. We’ll show an example below, but for more details about what functions can be used, see the Fugue in 10 minutes section. The valid functions for Fugue’s transform() function will be the same as the ones in FugueSQL.

Using Python can often reduce the amount of code that we need to write. For example, let’s normalize the column like we did previously.

# schema: *+col2:float
def std_dev(df: pd.DataFrame) -> pd.DataFrame:
    return df.assign(col2=df['col2']/df['col2'].max())

The function above is defined to handle one group of data at a time. In order to apply it per group, we partition the DataFrame first by group using the PREPARTITION and TRANSFORM keywords of FugueSQL. This semantic is similar to a groupby-apply.

%%fsql
df = LOAD "/tmp/df.parquet"

TRANSFORM df PREPARTITION BY col1 USING std_dev
PRINT
col1:str col2:float
0 A 0.333333
1 A 0.666667
2 A 1.000000
3 B 0.666667
4 B 0.833333
5 B 1.000000
PandasDataFrame: col1:str,col2:float

Execution Engine#

The strongest feature of FugueSQL is that it can be run on any of the backend engines Fugue supports. Fugue supports Pandas, Spark, Dask, and DuckDB. For operations on a laptop or single machine, DuckDB may give significant improvements over Pandas because it has a query optimizer.

For data that is too large to process on a single machine, Spark or Dask can be used. All we need to do is specify the engine in the cell. For example, to run on DuckDB we can do:

%%fsql duckdb
df = LOAD "/tmp/df.parquet"

TRANSFORM df PREPARTITION BY col1 USING std_dev
PRINT
col1:str col2:float
0 A 0.333333
1 A 0.666667
2 A 1.000000
3 B 0.666667
4 B 0.833333
5 B 1.000000
PandasDataFrame: col1:str,col2:float

Or to run on Spark, we can specify %%fsql spark. If there is a SparkSession defined, it will use it. Otherwise, it will start a new SparkSession.

%%fsql spark
df = LOAD "/tmp/df.parquet"

TRANSFORM df PREPARTITION BY col1 USING std_dev
PRINT
col1:str col2:float
0 B 0.666667
1 B 0.833333
2 B 1.000000
3 A 0.333333
4 A 0.666667
5 A 1.000000
SparkDataFrame: col1:str,col2:float

FugueSQL in Scripts#

The %%fsql cell magic is meant for iteration inside Jupyter notebooks. To use FugueSQL in scripts, there is a fugue_sql_flow class that can be used. For example:

from fugue.api import fugue_sql_flow

fugue_sql_flow("""
LOAD "/tmp/df.parquet"

SELECT col1, MAX(col2) AS max_val
 GROUP BY col1
 PRINT
""").run(engine="spark");
col1:str max_val:long
0 B 6
1 A 3
SparkDataFrame: col1:str,max_val:long

If the query only has a single return, users can use fugue_sql instead. This will just grab the last DataFrame of the query.

from fugue.api import fugue_sql 

result = fugue_sql("""
LOAD "/tmp/df.parquet"

SELECT col1, MAX(col2) AS max_val
 GROUP BY col1
""", engine=None)

result.head()
col1 max_val
0 A 3
1 B 6

Distributed Computing Commands (Advanced)#

One of the weak points of SQL is that it doesn’t have the grammar to describe distributed computing operations. For example, it’s common to PERSIST DataFrames in Spark to hold them in memory so that they don’t get recomputed.

FugueSQL adds keywords such as PERSIST and BROADCAST to allow users to perform these operations without leaving SQL. In the example below, df2 will not be recomputed on the Spark and Dask engines.

%%fsql spark
df = LOAD "/tmp/df.parquet"

df2 = SELECT *
        FROM df 
       WHERE col2 > 2
     PERSIST

SAVE df2 OVERWRITE "/tmp/df-processed.parquet"

SELECT COUNT(col2) AS cnt
  FROM df2
 PRINT 
cnt:long
0 4
SparkDataFrame: cnt:long

Next Steps#

For a more detailed guide to FugueSQL, check the FugueSQL section of the documentation.