Extensions#

Extensions are Python functions that are wrapped in order to execute in the %%fsql. These are needed to implement custom logic in SQL workflows.

Creator#

Creators are functions that generate a DataFrame. The example below contains all syntax variations. Schema needs to be specified in the Python code, or in the SQL query. Pandas DataFrames have schema defined, so it does not need to be passed. The default LOAD an example of a Creator.

A common use case for Creator is reading from a different data source like MongoDB Atlas or AWS S3.

Read more about Creators

from fugue_notebook import setup
setup()
from typing import List, Any
import pandas as pd

def create1(n=1) -> pd.DataFrame:
    return pd.DataFrame([[n]],columns=["a"])

# schema: a:int
def create2(n=1) -> List[List[Any]]:
    return [[n]]

def create3(n=1) -> List[List[Any]]:
    return [[n]]
%%fsql
CREATE [[0,"hello"],[1,"world"]] SCHEMA a:int,b:str
PRINT
CREATE USING create1 
PRINT
CREATE USING create2(n=3) 
PRINT
CREATE USING create3(n=4) SCHEMA a:int PRINT
PRINT
a b
0 0 hello
1 1 world
schema: a:int,b:str
a
0 1
schema: a:long
a
0 3
schema: a:int
a
0 4
schema: a:int
a
0 4
schema: a:int

Outputter#

Outputters are functions that either write out DataFrames or display them. The default SAVE and PRINT are examples of Outputters. They do not return anything. They are invoked in SQL using the OUTPUT keyword.

PREPARTITION can be used along with Outputters to apply the logic on each partition. This is only possible if the Outputter interface is used to define the extension.

Read more about Outputters

def output(df:pd.DataFrame, n=1) -> None:
    print(n)
    print(df)
%%fsql
a=CREATE [[0]] SCHEMA a:int
OUTPUT a USING output(n=2)
OUTPUT PREPARTITION BY a USING output
2
   a
0  0
1
   a
0  0

Processor#

Processors take in multiple DataFrames and output one DataFrame. Similar to the Outputter, the SQL PROCESS keyword can be used in conjunction with PREPARTITION but only if the Processor class interface was used to define the Processor.

Read more about Processors

def concat(df1:pd.DataFrame, df2:pd.DataFrame) -> pd.DataFrame:
    return pd.concat([df1,df2]).reset_index(drop=True)
%%fsql
a = CREATE [[0,"1"]] SCHEMA a:int,b:str
b = CREATE [[1,"2"]] SCHEMA a:int,b:str
PROCESS a,b USING concat
PRINT
a b
0 0 1
1 1 2
schema: a:int,b:str

Transformer#

Transformers are the most used extension. They take one DataFrame in and output one DataFrame. This has appeared in the previous tutorials. It can be used with PREPARTITION to apply the Transformer to each parition.

Read more about Transformers

data = [
    ["A", "2020-01-01", 10],
    ["A", "2020-01-02", None],
    ["A", "2020-01-03", 30],
    ["B", "2020-01-01", 20],
    ["B", "2020-01-02", None],
    ["B", "2020-01-03", 40]
]
df = pd.DataFrame(data, columns=["id", "date", "value"])

# schema: *, shift:double
def shift(df: pd.DataFrame) -> pd.DataFrame:
    df['shift'] = df['value'].shift()
    return df
%%fsql
a = SELECT * FROM df
TRANSFORM a PREPARTITION BY id PRESORT date DESC USING shift
PRINT
TRANSFORM a USING shift    # default partition
PRINT
id date value shift
0 A 2020-01-03 30.0 NaN
1 A 2020-01-02 NaN 30.0
2 A 2020-01-01 10.0 NaN
3 B 2020-01-03 40.0 NaN
4 B 2020-01-02 NaN 40.0
5 B 2020-01-01 20.0 NaN
schema: id:str,date:str,value:double,shift:double
id date value shift
0 A 2020-01-01 10.0 NaN
1 A 2020-01-02 NaN 10.0
2 A 2020-01-03 30.0 NaN
3 B 2020-01-01 20.0 30.0
4 B 2020-01-02 NaN 20.0
5 B 2020-01-03 40.0 NaN
schema: id:str,date:str,value:double,shift:double

Spark may give inconsistent results when using TRANSFORM without using PREPARITION because the default partitions are used. Also note order is not guaranteed in a distributed environment unless explicitly specified. PREPARTITION can also be used without a PRESORT.

CoTransformer#

CoTransformers are the used on multiple DataFrames parititioned in the same way. The data is then joined together with an INNER JOIN by default, but it can be specified which join to use. In FugueSQL, TRANSFORM and ZIP are used together to apply the CoTransformer.

Read more about CoTransformers

from fugue import DataFrames

#schema: res:[str]
def to_str_with_key(dfs:DataFrames) -> List[List[Any]]:
    return [[[k+" "+x.as_array().__repr__() for k,x in dfs.items()]]]
%%fsql
df1 = CREATE [[0,1],[1,3]] SCHEMA a:int,b:int
df2 = CREATE [[0,4],[2,2]] SCHEMA a:int,c:int
df3 = CREATE [[0,2],[1,1],[1,5]] SCHEMA a:int,d:int

TRANSFORM (ZIP df1,df2,df3) USING to_str_with_key
PRINT

TRANSFORM (ZIP a=df1,b=df2,c=df3 LEFT OUTER BY a PRESORT b DESC) USING to_str_with_key
PRINT
res
0 [_0 [[0, 1]], _1 [[0, 4]], _2 [[0, 2]]]
schema: res:[str]
res
0 [a [[0, 1]], b [[0, 4]], c [[0, 2]]]
1 [a [[1, 3]], b [], c [[1, 1], [1, 5]]]
schema: res:[str]