Fuuge with Ibis
Contents
Fuuge with Ibis#
The Ibis project tries to bridge the gap between local Python and various datastore backends including distributed systems such as Spark and Dask. The main idea is to create a pythonic interface to express SQL semantics, so the expression is agnostic to the backends.
The design idea is very aligned with Fugue. But please notice there are a few key differences:
Fugue supports both pythonic APIs and SQL, and the choice should be determined by particular cases or users’ preferences. On the other hand, Ibis focuses on the pythonic expression of SQL and perfects it.
Fugue supports SQL and non-SQL semantics for data transformation. Besides SQL, another important option is Fugue Transform. The Fugue transformers can wrap complicated Python/Pandas logic and apply them distributedly on dataframes. A typical example is distributed model inference, the inference part has to be done by Python, it can be easily achieved by a
transformer
, but the data preparation may be done nicely by SQL or Ibis.Fugue and Ibis are on different abstraction layers. Ibis is nice to construct single SQL statements to accomplish single tasks. Even it involves multiple tables and multiple steps, its final step is either outputting one table or inserting one table into a database. On the other hand,
FugueWorkflow
is to orchestrate these tasks. For example, a workflow can read a table, do the first transformation and save to a file, then do the second transformation and print. Each transformation may be done using Ibis, but loading, saving and printing and the orchestration can be done by Fugue.
This is also why Ibis can be a very nice option for Fugue users to build their pipelines. For people who prefer pythonic APIs, they can keep all the logic in Python with the help of Ibis. Although Fugue has its own functional API similar to Ibis, the programming interface of Ibis is really elegant. It usually helps users write less but more expressive code to achieve the same thing.
Hello World#
In this example, we try to achieve this SQL semantic:
SELECT a, a+1 AS b FROM
(SELECT a FROM tb1 UNION SELECT a FROM tb2)
from ibis import BaseBackend, literal
import ibis.expr.types as ir
def ibis_func(backend:BaseBackend) -> ir.TableExpr:
tb1 = backend.table("tb1")
tb2 = backend.table("tb2")
tb3 = tb1.union(tb2)
return tb3.mutate(b=tb3.a+literal(1))
Now let’s test with the pandas backend
import ibis
import pandas as pd
con = ibis.pandas.connect({
"tb1": pd.DataFrame([[0]], columns=["a"]),
"tb2": pd.DataFrame([[1]], columns=["a"])
})
ibis_func(con).execute()
a | b | |
---|---|---|
0 | 0 | 1 |
1 | 1 | 2 |
Now let’s make this a part of Fugue
from fugue import FugueWorkflow
from fugue_ibis import run_ibis
dag = FugueWorkflow()
df1 = dag.df([[0]], "a:long")
df2 = dag.df([[1]], "a:long")
df3 = run_ibis(ibis_func, tb1=df1, tb2=df2)
df3.show()
Now let’s run on Pandas
dag.run()
PandasDataFrame
a:long|b:long
------+------
0 |1
1 |2
Total count: 2
DataFrames()
Now let’s run on Dask. This assumes Dask is installed.
dag.run("dask")
DaskDataFrame
a:long|b:long
------+------
0 |1
1 |2
Total count: 2
DataFrames()
Now let’s run on DuckDB
import fugue_duckdb
dag.run("duck")
PandasDataFrame
a:long|b:long
------+------
0 |1
1 |2
Total count: 2
DataFrames()
For each different execution engine, Ibis will also run on the correspondent backend.
A deeper integration#
The above approach needs a function taking in an Ibis backend and returning a TableExpr
. The following is another approach that simpler and more elegant.
from fugue_ibis import as_ibis, as_fugue
dag = FugueWorkflow()
tb1 = as_ibis(dag.df([[0]], "a:long"))
tb2 = as_ibis(dag.df([[1]], "a:long"))
tb3 = tb1.union(tb2)
df3 = as_fugue(tb3.mutate(b=tb3.a+literal(1)))
df3.show()
dag.run()
PandasDataFrame
a:long|b:long
------+------
0 |1
1 |2
Total count: 2
DataFrames()
Alternatively, you can treat as_ibis
and as_fugue
as class methods. This is more convenient to use, but it’s a bit magical. This is achieved by adding these two methods using setattr
to the correspondent classes. This patching-like design pattern is widely used by Ibis.
import fugue_ibis # must import
dag = FugueWorkflow()
tb1 = dag.df([[0]], "a:long").as_ibis()
tb2 = dag.df([[1]], "a:long").as_ibis()
tb3 = tb1.union(tb2)
df3 = tb3.mutate(b=tb3.a+literal(1)).as_fugue()
df3.show()
dag.run()
PandasDataFrame
a:long|b:long
------+------
0 |1
1 |2
Total count: 2
DataFrames()
By importing fugue_ibis
, the two methods were automatically added.
It’s up to the users which way to go. The first approach (run_ibis
) is the best to separate Ibis logic, as you can see, it is also great for unit testing. The second approach is elegant, but you will have to unit test the code with the logic before and after the conversions. The third approach is the most intuitive, but it’s a bit magical.
Z-Score#
Now, let’s consider a practical example. We want to use Fugue to compute z-score of a dataframe, partitioning should be an option. The reason to implement it on Fugue level is that the compute becomes scale agnostic and framework agnostic.
from fugue import WorkflowDataFrame
from fugue_ibis import as_ibis, as_fugue
def z_score(df:WorkflowDataFrame, input_col:str, output_col:str) -> WorkflowDataFrame:
by = df.partition_spec.partition_by
idf = as_ibis(df)
col = idf[input_col]
if len(by)==0:
return as_fugue(idf.mutate(**{output_col:(col - col.mean())/col.std()}))
agg = idf.group_by(by).aggregate(mean_=col.mean(), std_=col.std())
j = idf.inner_join(agg, by)[idf, ((idf[input_col]-agg.mean_)/agg.std_).name(output_col)]
return as_fugue(j)
Now, generate testing data
import numpy as np
np.random.seed(0)
pdf = pd.DataFrame(dict(
a=np.random.choice(["a","b"], 100),
b=np.random.choice(["c","d"], 100),
c=np.random.rand(100),
))
pdf["expected1"] = (pdf.c - pdf.c.mean())/pdf.c.std()
pdf = pdf.groupby(["a", "b"]).apply(lambda tdf: tdf.assign(expected2=(tdf.c - tdf.c.mean())/tdf.c.std())).reset_index(drop=True)
And here is the final code.
dag = FugueWorkflow()
df = z_score(dag.df(pdf), "c", "z1")
df = z_score(df.partition_by("a", "b"), "c", "z2")
df.show()
dag.run()
PandasDataFrame
a:str|b:str|c:double |expected1:double |expected2:double |z1:double |z2:double
-----+-----+-------------+------------------+------------------+-------------+------------
a |c |0.84640867247|1.144636809499835 |1.577770556350802 |1.14463680949|1.5777705563
| |11278 | | |98345 |508022
a |c |0.69947927531|0.6163095213101546|1.0316513450169476|0.61630952131|1.0316513450
| |75043 | | |01543 |169478
a |c |0.81379781970|1.0273750242983348|1.4565598691775665|1.02737502429|1.4565598691
| |24772 | | |83344 |77567
a |c |0.39650574084|-0.473119748536303|-0.094465490044129|-0.4731197485|-0.094465490
| |698464 |06 |49 |3630345 |0441295
a |c |0.58127287263|0.1912640955118427|0.5922921108698395|0.19126409551|0.5922921108
| |58587 |9 | |18424 |698396
a |c |0.29828232595|-0.826310542423581|-0.459550319315429|-0.8263105424|-0.459550319
| |603077 |5 |6 |235819 |3154297
a |c |0.57432524884|0.1662818979021328|0.5664686140318282|0.16628189790|0.5664686140
| |95788 |8 | |21325 |318284
a |c |0.43141843543|-0.347581023054070|0.0353008712338056|-0.3475810230|0.0353008712
| |397396 |96 |9 |540714 |33805695
a |c |0.43586492526|-0.331592378438154|0.0518279486404278|-0.3315923784|0.0518279486
| |56268 |5 |1 |3815487 |4042782
a |c |0.14944830465|-1.361486459469925|-1.012748793586346|-1.3614864594|-1.012748793
| |799375 |9 |2 |699263 |5863462
DataFrames()
Consistency issues#
Ibis as of 2.0.0 can have different behaviors on different backends. Here are some examples from the common discrepencies between pandas and SQL.
# pandas drops null keys on group (by default), SQL doesn't
dag = FugueWorkflow()
df = dag.df([["a",1],[None,2]], "a:str,b:int").as_ibis()
df.groupby(["a"]).aggregate(s=df.b.sum()).as_fugue().show()
dag.run()
dag.run("duckdb")
PandasDataFrame
a:str|s:long
-----+------
a |1
Total count: 1
PandasDataFrame
a:str|s:long
-----+------
a |1
NULL |2
Total count: 2
DataFrames()
# pandas joins on NULLs, SQL doesn't
dag = FugueWorkflow()
df1 = dag.df([["a",1],[None,2]], "a:str,b:int").as_ibis()
df2 = dag.df([["a",1],[None,2]], "a:str,c:int").as_ibis()
df1.inner_join(df2, ["a"])[df1, df2.c].as_fugue().show()
dag.run()
dag.run("duckdb")
PandasDataFrame
a:str|b:int|c:int
-----+-----+-----
a |1 |1
NULL |2 |2
Total count: 2
PandasDataFrame
a:str|b:int|c:int
-----+-----+-----
a |1 |1
Total count: 1
DataFrames()
Since Ibis integration is experimental, we rely on Ibis to achieve consistent behaviors. If you have any Ibis specific question please also consider asking in Ibis issues.