Fugue with Ibis#

The Ibis project tries to bridge the gap between local Python and various datastore backends including distributed systems such as Spark and Dask. The main idea is to create a pythonic interface to express SQL semantics, so the expression is agnostic to the backends.

The design idea is very aligned with Fugue. But please notice there are a few key differences:

  • Fugue supports both pythonic APIs and SQL, and the choice should be determined by particular cases or users’ preferences. On the other hand, Ibis focuses on the pythonic expression of SQL and perfects it.

  • Fugue supports SQL and non-SQL semantics for data transformation. Besides SQL, another important option is Fugue Transform. The Fugue transformers can wrap complicated Python/Pandas logic and apply them distributedly on dataframes. A typical example is distributed model inference, the inference part has to be done by Python, it can be easily achieved by a transformer, but the data preparation may be done nicely by SQL or Ibis.

  • Fugue and Ibis are on different abstraction layers. Ibis is nice to construct single SQL statements to accomplish single tasks. Even it involves multiple tables and multiple steps, its final step is either outputting one table or inserting one table into a database. On the other hand, FugueWorkflow is to orchestrate these tasks. For example, a workflow can read a table, do the first transformation and save to a file, then do the second transformation and print. Each transformation may be done using Ibis, but loading, saving and printing and the orchestration can be done by Fugue.

This is also why Ibis can be a very nice option for Fugue users to build their pipelines. For people who prefer pythonic APIs, they can keep all the logic in Python with the help of Ibis. Although Fugue has its own functional API similar to Ibis, the programming interface of Ibis is really elegant. It usually helps users write less but more expressive code to achieve the same thing.

Hello World#

In this example, we try to achieve this SQL semantic:

SELECT a, a+1 AS b FROM
    (SELECT a FROM tb1 UNION SELECT a FROM tb2)
from ibis import BaseBackend, literal
import ibis.expr.types as ir

def ibis_func(backend:BaseBackend) -> ir.TableExpr:
    tb1 = backend.table("tb1")
    tb2 = backend.table("tb2")
    tb3 = tb1.union(tb2)
    return tb3.mutate(b=tb3.a+literal(1))

Now let’s test with the pandas backend

import ibis
import pandas as pd

con = ibis.pandas.connect({
    "tb1": pd.DataFrame([[0]], columns=["a"]),
    "tb2": pd.DataFrame([[1]], columns=["a"])
})
ibis_func(con).execute()
a b
0 0 1
1 1 2

Now let’s make this a part of Fugue

from fugue import FugueWorkflow
from fugue_ibis import run_ibis

dag = FugueWorkflow()
df1 = dag.df([[0]], "a:long")
df2 = dag.df([[1]], "a:long")
df3 = run_ibis(ibis_func, tb1=df1, tb2=df2)
df3.show()

Now let’s run on Pandas

dag.run()
PandasDataFrame
a:long|b:long
------+------
0     |1     
1     |2     
Total count: 2
DataFrames()

Now let’s run on Dask. This assumes Dask is installed.

dag.run("dask")
DaskDataFrame
a:long|b:long
------+------
0     |1     
1     |2     
Total count: 2
DataFrames()

Now let’s run on DuckDB

import fugue_duckdb

dag.run("duck")
PandasDataFrame
a:long|b:long
------+------
0     |1     
1     |2     
Total count: 2
DataFrames()

For each different execution engine, Ibis will also run on the correspondent backend.

A deeper integration#

The above approach needs a function taking in an Ibis backend and returning a TableExpr. The following is another approach that simpler and more elegant.

from fugue_ibis import as_ibis, as_fugue

dag = FugueWorkflow()
tb1 = as_ibis(dag.df([[0]], "a:long"))
tb2 = as_ibis(dag.df([[1]], "a:long"))
tb3 = tb1.union(tb2)
df3 = as_fugue(tb3.mutate(b=tb3.a+literal(1)))
df3.show()

dag.run()
PandasDataFrame
a:long|b:long
------+------
0     |1     
1     |2     
Total count: 2
DataFrames()

Alternatively, you can treat as_ibis and as_fugue as class methods. This is more convenient to use, but it’s a bit magical. This is achieved by adding these two methods using setattr to the correspondent classes. This patching-like design pattern is widely used by Ibis.

import fugue_ibis  # must import

dag = FugueWorkflow()
tb1 = dag.df([[0]], "a:long").as_ibis()
tb2 = dag.df([[1]], "a:long").as_ibis()
tb3 = tb1.union(tb2)
df3 = tb3.mutate(b=tb3.a+literal(1)).as_fugue()
df3.show()

dag.run()
PandasDataFrame
a:long|b:long
------+------
0     |1     
1     |2     
Total count: 2
DataFrames()

By importing fugue_ibis, the two methods were automatically added.

It’s up to the users which way to go. The first approach (run_ibis) is the best to separate Ibis logic, as you can see, it is also great for unit testing. The second approach is elegant, but you will have to unit test the code with the logic before and after the conversions. The third approach is the most intuitive, but it’s a bit magical.

Z-Score#

Now, let’s consider a practical example. We want to use Fugue to compute z-score of a dataframe, partitioning should be an option. The reason to implement it on Fugue level is that the compute becomes scale agnostic and framework agnostic.

from fugue import WorkflowDataFrame
from fugue_ibis import as_ibis, as_fugue

def z_score(df:WorkflowDataFrame, input_col:str, output_col:str) -> WorkflowDataFrame:
    by = df.partition_spec.partition_by
    idf = as_ibis(df)
    col = idf[input_col]    
    if len(by)==0:
        return as_fugue(idf.mutate(**{output_col:(col - col.mean())/col.std()}))
    agg = idf.group_by(by).aggregate(mean_=col.mean(), std_=col.std())
    j = idf.inner_join(agg, by)[idf, ((idf[input_col]-agg.mean_)/agg.std_).name(output_col)]
    return as_fugue(j)

Now, generate testing data

import numpy as np

np.random.seed(0)
pdf = pd.DataFrame(dict(
    a=np.random.choice(["a","b"], 100),
    b=np.random.choice(["c","d"], 100),
    c=np.random.rand(100),
))

pdf["expected1"] = (pdf.c - pdf.c.mean())/pdf.c.std()
pdf = pdf.groupby(["a", "b"]).apply(lambda tdf: tdf.assign(expected2=(tdf.c - tdf.c.mean())/tdf.c.std())).reset_index(drop=True)

And here is the final code.

dag = FugueWorkflow()
df = z_score(dag.df(pdf), "c", "z1")
df = z_score(df.partition_by("a", "b"), "c", "z2")
df.show()

dag.run()
PandasDataFrame
a:str|b:str|c:double     |expected1:double  |expected2:double  |z1:double    |z2:double   
-----+-----+-------------+------------------+------------------+-------------+------------
a    |c    |0.84640867247|1.144636809499835 |1.577770556350802 |1.14463680949|1.5777705563
     |     |11278        |                  |                  |98345        |508022      
a    |c    |0.69947927531|0.6163095213101546|1.0316513450169476|0.61630952131|1.0316513450
     |     |75043        |                  |                  |01543        |169478      
a    |c    |0.81379781970|1.0273750242983348|1.4565598691775665|1.02737502429|1.4565598691
     |     |24772        |                  |                  |83344        |77567       
a    |c    |0.39650574084|-0.473119748536303|-0.094465490044129|-0.4731197485|-0.094465490
     |     |698464       |06                |49                |3630345      |0441295     
a    |c    |0.58127287263|0.1912640955118427|0.5922921108698395|0.19126409551|0.5922921108
     |     |58587        |9                 |                  |18424        |698396      
a    |c    |0.29828232595|-0.826310542423581|-0.459550319315429|-0.8263105424|-0.459550319
     |     |603077       |5                 |6                 |235819       |3154297     
a    |c    |0.57432524884|0.1662818979021328|0.5664686140318282|0.16628189790|0.5664686140
     |     |95788        |8                 |                  |21325        |318284      
a    |c    |0.43141843543|-0.347581023054070|0.0353008712338056|-0.3475810230|0.0353008712
     |     |397396       |96                |9                 |540714       |33805695    
a    |c    |0.43586492526|-0.331592378438154|0.0518279486404278|-0.3315923784|0.0518279486
     |     |56268        |5                 |1                 |3815487      |4042782     
a    |c    |0.14944830465|-1.361486459469925|-1.012748793586346|-1.3614864594|-1.012748793
     |     |799375       |9                 |2                 |699263       |5863462     
DataFrames()

Consistency issues#

Ibis as of 2.0.0 can have different behaviors on different backends. Here are some examples from the common discrepancies between pandas and SQL.

# pandas drops null keys on group (by default), SQL doesn't

dag = FugueWorkflow()
df = dag.df([["a",1],[None,2]], "a:str,b:int").as_ibis()
df.groupby(["a"]).aggregate(s=df.b.sum()).as_fugue().show()

dag.run()
dag.run("duckdb")
PandasDataFrame
a:str|s:long
-----+------
a    |1     
Total count: 1

PandasDataFrame
a:str|s:long
-----+------
a    |1     
NULL |2     
Total count: 2
DataFrames()
# pandas joins on NULLs, SQL doesn't

dag = FugueWorkflow()
df1 = dag.df([["a",1],[None,2]], "a:str,b:int").as_ibis()
df2 = dag.df([["a",1],[None,2]], "a:str,c:int").as_ibis()
df1.inner_join(df2, ["a"])[df1, df2.c].as_fugue().show()

dag.run()
dag.run("duckdb")
PandasDataFrame
a:str|b:int|c:int
-----+-----+-----
a    |1    |1    
NULL |2    |2    
Total count: 2

PandasDataFrame
a:str|b:int|c:int
-----+-----+-----
a    |1    |1    
Total count: 1
DataFrames()

Since Ibis integration is experimental, we rely on Ibis to achieve consistent behaviors. If you have any Ibis specific question please also consider asking in Ibis issues.