Operators#

The previous section talked about FugueSQL syntax. Along with the standard SQL operations, FugueSQL has implemented some additional keywords (and continues to add more). These keywords have equivalent methods in the programming interface. FugueSQL aims to make more coding fun and more English-like. Our goal is to provide an intuitive interface that is easy to read.

This is not a complete reference, it just contains the most used keywords.

from fugue_notebook import setup
setup()
import pandas as pd
data = [
    ["A", "2020-01-01", 10],
    ["A", "2020-01-02", None],
    ["A", "2020-01-03", 30],
    ["B", "2020-01-01", 20],
    ["B", "2020-01-02", None],
    ["B", "2020-01-03", 40]
]
data = pd.DataFrame(data, columns=["id", "date", "value"])

Input and Output Operations#

PRINT#

Prints a dataframe.

Usage:

PRINT [number] [ROW|ROWS] [FROM dataframe] [ROWCOUNT] [TITLE “title”]

  • dataframe - If not provided, takes the last dataframe

  • number - Number of rows

  • ROW|ROWS - No difference

  • ROWCOUNT - Displays number of rows for dataframe

  • TITLE - Title for display

Note: the ROWCOUNT operation is expensive for Spark and Dask. For distributed environments, persisting will help before doing this operation.

%%fsql
-- PRINT example
df = CREATE [[0,"hello"],[1,"world"]] SCHEMA a:int,b:str
PRINT 2 ROWS FROM df TITLE "xyz"

xyz

a b
0 0 hello
1 1 world
schema: a:int,b:str

LOAD#

Loads a CSV, JSON, PARQUET or AVRO file as a DataFrame

Usage:

LOAD [PARQUET|CSV|JSON|AVRO] "path" (params) [COLUMNS schema|columns]

  • PARQUET|CSV|JSON - File type to load. Required if the file has no extension

  • path - File path to load

  • params

    • infer_schema - infer the schema, true or false

    • remaining arguments are passed on to underlying execution engine loading method

  • COLUMNS - Columns to grab or schema to load it in as

SAVE (or SAVE AND USE)#

Saves a CSV, JSON, PARQUET or AVRO file as a DataFrame. SAVE AND USE just returns the dataframe so there is no need to load it back in.

Usage:

SAVE [dataframe] [PREPARTITION statement] [OVERWRITE|APPEND|TO] [SINGLE] [PARQUET|CSV|JSON|AVRO] "path" [(params)]

or

SAVE AND USE [dataframe] [PREPARTITION statement] [OVERWRITE|APPEND|TO] [SINGLE] [PARQUET|CSV|JSON] "path" [(params)]

  • dataframe - If not provided, takes the last dataframe

  • PREPARTITION - Partitions for file

  • OVERWRITE|APPEND|TO - Choose the mode for writing the file out. TO throws an error if the file exists

  • SINGLE - One file output

  • PARQUET|CSV|JSON - Choose file type (Parquet, CSV, or JSON) for output. Required if path has no extension

  • path - File path to write out to

  • params - Passed on to underlying execution engine saving method

%%fsql
-- SAVE and LOAD example
CREATE [[0,"1"]] SCHEMA a:int,b:str
SAVE OVERWRITE "/tmp/f.parquet"
SAVE OVERWRITE "/tmp/f.csv" (header=true)
SAVE OVERWRITE "/tmp/f.json"
SAVE OVERWRITE PARQUET "/tmp/f"

LOAD "/tmp/f.parquet" PRINT
LOAD "/tmp/f.parquet" COLUMNS a PRINT
LOAD PARQUET "/tmp/f" PRINT
LOAD "/tmp/f.csv" (header=true) PRINT
LOAD "/tmp/f.csv" (header=true) COLUMNS a:int,b:str PRINT
LOAD "/tmp/f.json" PRINT
LOAD "/tmp/f.json" COLUMNS a:int,b:str PRINT
a b
0 0 1
schema: a:int,b:str
a
0 0
schema: a:int
a b
0 0 1
schema: a:int,b:str
a b
0 0 1
schema: a:str,b:str
a b
0 0 1
schema: a:int,b:str
a b
0 0 1
schema: a:long,b:long
a b
0 0 1
schema: a:int,b:str

Partitioning#

Partitioning is an important part of distributed computing. Partitioning data means splitting the data and spreading it across several machines. Sorting it determines how the data is ordered prior to this separation. Data is arranged into different logical partitions before performing operations. This is normally used in conjunction with Fugue extensions. This is a clause that is specified as part of statements.

PREPARTITION#

Partitions a dataframe in preparation for a following operation

Usage:

PREPARTITION [RAND|HASH|EVEN] [number] [BY columns] [PRESORT statement]

  • RAND|HASH|EVEN - Algorithm for prepartition. Read this

  • number - Number of partitions

  • columns - Columns to partition on

  • statement - Presort hint. Check PRESORT syntax

PRESORT#

Usage:

PRESORT column [ASC|DESC]

Defines a presort to be performed before another operation. This is a clause mainly used with PREPARTITION. Multiple column, order pairs can be used separated by ,

  • column - Name of columns to sort on.

  • ASC|DESC - Order of sort.

The example below shows how to use PREPARTITION and PRESORT. We need to define a transformer to apply it with.

# PREPARTITION and PRESORT example
import pandas as pd

# schema: *, shift:double
def shift(df: pd.DataFrame) -> pd.DataFrame:
    df['shift'] = df['value'].shift()
    return df
%%fsql
-- PREPARTITION and PRESORT example
TRANSFORM data PREPARTITION BY id PRESORT date ASC USING shift
PRINT
id date value shift
0 A 2020-01-01 10.0 NaN
1 A 2020-01-02 NaN 10.0
2 A 2020-01-03 30.0 NaN
3 B 2020-01-01 20.0 NaN
4 B 2020-01-02 NaN 20.0
5 B 2020-01-03 40.0 NaN
schema: id:str,date:str,value:double,shift:double

Column and Schema Opeartions#

RENAME COLUMNS#

Usage:

RENAME COLUMNS params [FROM dataframe]

  • params : Pairs of old_name:new_name separated by ,

  • dataframe: If none is provided, take the previous one

ALTER COLUMNS#

Changes data type of columns

Usage:

ALTER COLUMNS params [FROM dataframe]

  • params : Pairs of column:dtype separated by ,

  • dataframe - If not provided, takes the last one

DROP COLUMNS#

Drops columns from DataFrame

Usage:

DROP COLUMNS colnames [IF EXISTS] [FROM dataframe]

  • colnames - Column names separated by ,

  • IF EXISTS - Drops if the column exists, otherwise error

  • dataframe - If not provided, takes the last

%%fsql
-- RENAME COLUMNS, ALTER COLUMNS, DROP COLUMNS example
df = CREATE [[0,"1"]] SCHEMA a:int,b:str
df2 = RENAME COLUMNS a:aa, b:bb FROM df
PRINT df2
df3 = ALTER COLUMNS aa:str, bb:int FROM df2
PRINT df3
df4 = DROP COLUMNS bb, c IF EXISTS FROM df3
PRINT df4
aa bb
0 0 1
schema: aa:int,bb:str
aa bb
0 0 1
schema: aa:str,bb:int
aa
0 0
schema: aa:str

NULL Handling#

DROP ROWS#

Drops rows from DataFrame containing NULLs

Usage:

DROP ROWS IF ALL|ANY NULL|NULLS [ON columns] [FROM dataframe]

  • ALL|ANY - All values are NULL or any value is NULL in the row of data

  • NULL|NULLS - There is no difference

  • columns - Columns to check for NULL values

  • dataframe - If not provided, takes the last

FILL#

Fills values from DataFrame containing NULLs

Usage:

FILL NULL|NULLS PARAMS params [FROM dataframe]

  • NULL|NULLS - There is no difference

  • params - Pairs of column_name:fill_value

  • dataframe - If not provided, takes the last dataframe

%%fsql
-- DROP ROWS and FILL example
df = CREATE [[NULL,"1"]] SCHEMA a:double,b:str
df2 = DROP ROWS IF ANY NULL ON a FROM df
PRINT df2
df3 = DROP ROWS IF ALL NULLS FROM df
PRINT df3
df4 = FILL NULLS PARAMS a:1 FROM df
PRINT df4
a b
schema: a:double,b:str
a b
0 None 1
schema: a:double,b:str
a b
0 1.0 1
schema: a:double,b:str

Sampling#

SAMPLE#

Takes a sample of the DataFrame, potentially with replacement. Use either number of rows or percent of dataframe

Usage:

SAMPLE [REPLACE] [rows ROWS | percent PERCENT] [SEED seed] [FROM dataframe]

  • REPLACE - Sample with replacement

  • rows - Integer for number of rows

  • percent - Integer or Decimal indicating percent of dataframe to be returned

  • seed - Random seed for sampling, used to reproduce the same random sampling each time

  • dataframe - If not provided, takes the last dataframe

TAKE#

TAKE returns the top N rows of a DataFrame. If used with PREPARTITION, it returns the top N rows of each partition. PRESPORT can be applied before taking the top N rows.

Equivalent to Pandas head

Usage:

TAKE rows ROW|ROWS [FROM dataframe ] [PREPARTITION statement] [NULL|NULLS FIRST|LAST]

  • rows - Integer for number of rows

  • dataframe - If not provided, takes the last dataframe

  • PREPARTITION - See syntax for PREPARTITION

  • NULL|NULLS - No difference

  • FIRST|LAST - If there is a PRESORT, sort with NULLS at the top or NULLS at the bottom

%%fsql
-- SAMPLE and TAKE example
df = CREATE [[1,"1"],[2,"2"],[3,"3"],[4,"4"],[5,"5"]] SCHEMA a:double,b:str
df2 = SAMPLE 2 ROWS SEED 42 FROM df
PRINT df2
df3 = SAMPLE 40 PERCENT SEED 42 FROM df
PRINT df3
df4 = TAKE 3 ROWS FROM df
PRINT df4
df5 = TAKE 1 ROW FROM df PREPARTITION BY a   # Returns 1 row for each partition
PRINT df5
a b
0 2.0 2
1 5.0 5
schema: a:double,b:str
a b
0 2.0 2
1 5.0 5
schema: a:double,b:str
a b
0 1.0 1
1 2.0 2
2 3.0 3
schema: a:double,b:str
a b
0 1.0 1
1 2.0 2
2 3.0 3
3 4.0 4
4 5.0 5
schema: a:double,b:str

Distributed Computing Operations#

These next keywords are used for distributed environments to save repeated computation.

BROADCAST#

Copies a DataFrame (ideally a small one) to worker nodes to prevent shuffling when joining to larger dataframes. This is used after any FugueSQL statement that outputs a DataFrame. It is used by adding it to the end of a statement.

PERSIST or CHECKPOINT#

Caches a dataframe. Fugue has many types of CHECKPOINT. Please read this for a deep dive when to use each type. Similar to BROADCAST, it it used by appending the keyword after another FugueSQL statement that outputs a DataFrame.

%%fsql
-- BROADCAST and PERSIST example
df = CREATE [[1,"1"],[2,"2"],[3,"3"],[4,"4"],[5,"5"]] SCHEMA a:double,b:str
df2 = TAKE 2 ROWS FROM df BROADCAST
df3 = TAKE 2 ROWS FROM df PERSIST

YIELD#

By default, dataframes inside a FugueSQL cell are only local. YIELD is used to make DataFrames available in succeeding FugueSQL cells. There are two commands YIELD DATAFRAME and YIELD FILE. Using YIELD DATAFRAME holds the DataFrame in memory while YIELD FILE saves the file in a memory location and loads it when needed. YIELD FILE is intended for larger DataFrames.

%%fsql
-- YIELD example
yielded_df = CREATE [[1,"1"],[2,"2"],[3,"3"],[4,"4"],[5,"5"]] SCHEMA a:double,b:str
YIELD DATAFRAME AS yielded
%%fsql
-- yielded is available from the previous cell
SELECT * FROM yielded
PRINT
a b
0 1.0 1
1 2.0 2
2 3.0 3
3 4.0 4
4 5.0 5
schema: a:double,b:str