Extensions
Contents
Extensions#
Extensions are Python functions that are wrapped in order to execute in the %%fsql
. These are needed to implement custom logic in SQL workflows.
Have questions? Chat with us on Github or Slack:
Creator#
Creators
are functions that generate a DataFrame
. The example below contains all syntax variations. Schema needs to be specified in the Python code, or in the SQL query. Pandas DataFrames
have schema defined, so it does not need to be passed. The default LOAD
an example of a Creator
.
A common use case for Creator
is reading from a different data source like MongoDB Atlas or AWS S3.
from fugue_jupyter import setup
setup()
from typing import List, Any
import pandas as pd
def create1(n=1) -> pd.DataFrame:
return pd.DataFrame([[n]],columns=["a"])
# schema: a:int
def create2(n=1) -> List[List[Any]]:
return [[n]]
def create3(n=1) -> List[List[Any]]:
return [[n]]
%%fsql
CREATE [[0,"hello"],[1,"world"]] SCHEMA a:int,b:str
PRINT
CREATE USING create1
PRINT
CREATE USING create2(n=3)
PRINT
CREATE USING create3(n=4) SCHEMA a:int PRINT
PRINT
a | b | |
---|---|---|
0 | 0 | hello |
1 | 1 | world |
a | |
---|---|
0 | 1 |
a | |
---|---|
0 | 3 |
a | |
---|---|
0 | 4 |
a | |
---|---|
0 | 4 |
Outputter#
Outputters
are functions that either write out DataFrames
or display them. The default SAVE
and PRINT
are examples of Outputters. They do not return anything. They are invoked in SQL using the OUTPUT
keyword.
PREPARTITION
can be used along with Outputters
to apply the logic on each partition. This is only possible if the Outputter interface
is used to define the extension.
def output(df:pd.DataFrame, n=1) -> None:
print(n)
print(df)
%%fsql
a=CREATE [[0]] SCHEMA a:int
OUTPUT a USING output(n=2)
OUTPUT PREPARTITION BY a USING output
2
a
0 0
1
a
0 0
Processor#
Processors
take in multiple DataFrames
and output one DataFrame
. Similar to the Outputter
, the SQL PROCESS
keyword can be used in conjunction with PREPARTITION
but only if the Processor class interface
was used to define the Processor
.
def concat(df1:pd.DataFrame, df2:pd.DataFrame) -> pd.DataFrame:
return pd.concat([df1,df2]).reset_index(drop=True)
%%fsql
a = CREATE [[0,"1"]] SCHEMA a:int,b:str
b = CREATE [[1,"2"]] SCHEMA a:int,b:str
PROCESS a,b USING concat
PRINT
a | b | |
---|---|---|
0 | 0 | 1 |
1 | 1 | 2 |
Transformer#
Transformers
are the most used extension. They take one DataFrame
in and output one DataFrame
. This has appeared in the previous tutorials. It can be used with PREPARTITION
to apply the Transformer
to each parition.
data = [
["A", "2020-01-01", 10],
["A", "2020-01-02", None],
["A", "2020-01-03", 30],
["B", "2020-01-01", 20],
["B", "2020-01-02", None],
["B", "2020-01-03", 40]
]
df = pd.DataFrame(data, columns=["id", "date", "value"])
# schema: *, shift:double
def shift(df: pd.DataFrame) -> pd.DataFrame:
df['shift'] = df['value'].shift()
return df
%%fsql
a = SELECT * FROM df
TRANSFORM a PREPARTITION BY id PRESORT date DESC USING shift
PRINT
TRANSFORM a USING shift # default partition
PRINT
id | date | value | shift | |
---|---|---|---|---|
0 | A | 2020-01-03 | 30.0 | NaN |
1 | A | 2020-01-02 | NaN | 30.0 |
2 | A | 2020-01-01 | 10.0 | NaN |
3 | B | 2020-01-03 | 40.0 | NaN |
4 | B | 2020-01-02 | NaN | 40.0 |
5 | B | 2020-01-01 | 20.0 | NaN |
id | date | value | shift | |
---|---|---|---|---|
0 | A | 2020-01-01 | 10.0 | NaN |
1 | A | 2020-01-02 | NaN | 10.0 |
2 | A | 2020-01-03 | 30.0 | NaN |
3 | B | 2020-01-01 | 20.0 | 30.0 |
4 | B | 2020-01-02 | NaN | 20.0 |
5 | B | 2020-01-03 | 40.0 | NaN |
Spark may give inconsistent results when using TRANSFORM
without using PREPARITION
because the default partitions are used. Also note order is not guaranteed in a distributed environment unless explicitly specified. PREPARTITION
can also be used without a PRESORT
.
CoTransformer#
CoTransformers
are the used on multiple DataFrames
parititioned in the same way. The data is then joined together with an INNER JOIN
by default, but it can be specified which join to use. In FugueSQL
, TRANSFORM
and ZIP
are used together to apply the CoTransformer
.
Read more about CoTransformers
from fugue import DataFrames
#schema: res:[str]
def to_str_with_key(dfs:DataFrames) -> List[List[Any]]:
return [[[k+" "+x.as_array().__repr__() for k,x in dfs.items()]]]
%%fsql
df1 = CREATE [[0,1],[1,3]] SCHEMA a:int,b:int
df2 = CREATE [[0,4],[2,2]] SCHEMA a:int,c:int
df3 = CREATE [[0,2],[1,1],[1,5]] SCHEMA a:int,d:int
TRANSFORM (ZIP df1,df2,df3) USING to_str_with_key
PRINT
TRANSFORM (ZIP a=df1,b=df2,c=df3 LEFT OUTER BY a PRESORT b DESC) USING to_str_with_key
PRINT
res | |
---|---|
0 | [_0 [[0, 1]], _1 [[0, 4]], _2 [[0, 2]]] |
res | |
---|---|
0 | [a [[0, 1]], b [[0, 4]], c [[0, 2]]] |
1 | [a [[1, 3]], b [], c [[1, 1], [1, 5]]] |