Joins#

Have questions? Chat with us on Github or Slack:

Homepage Slack Status

We have seen three engine-agnostic functions with save(), load() and transform(). Here, we’ll show how to combine DataFrames through joins. The following joins are supported in Fugue: LEFT OUTER, RIGHT OUTER, CROSS, LEFT SEMI, LEFT ANTI, INNER, FULL OUTER. Most of these joins should be familiar, so this section will just be about providing examples on how to use them in Fugue.

Join and Renaming Columns#

Sometimes there will be a naming conflict with columns. In such situations, the rename() function needs to be used like the code snippet below. The join() function takes all of the join types mentioned above for the how argument. on takes a list of columns. The columns to join on can be inferred based on the columns, but explicitly specifying is better.

# Hide Pandas deprecation warnings
import warnings
warnings.filterwarnings('ignore')
import pandas as pd
import fugue.api as fa

df1 = pd.DataFrame({"id": ["a","b"], "val1": [1,2]})
df2 = pd.DataFrame({"id": ["a","b"], "val1": [2,3]})

fa.join(df1, fa.rename(df2, {"val1":"val2"}), how="left_outer", on=["id"])
id val1 val2
0 a 1 2
1 b 2 3

All functions in the fugue.api accept an execution engine as a parameter, we can also pass the execution engine to the join() function. Note that using the "dask" engine executes lazily so we have to call .compute().

res = fa.join(df1, fa.rename(df2, {"val1":"val2"}), how="left_outer", on=["id"], engine="dask")
res.compute().head()
id val1 val2
0 b 2 3
0 a 1 2

We can also use the engine_context that we learned in the last section.

with fa.engine_context("dask"):
    res = fa.join(df1, fa.rename(df2, {"val1":"val2"}), how="left_outer", on=["id"])
    fa.show(res)
DaskDataFrame
id:str|val1:long|val2:long
------+---------+---------
b     |2        |3        
a     |1        |2        
Total count: 2

SQL vs pandas Joins#

Joins in SQL and pandas can have different outcomes. The clearest example of this is None joining with None. In such cases, Fugue is consistent with SQL and Spark rather than pandas. Notice that column a has a row with None after the join below.

df1 = pd.DataFrame({'a': [None, "a"], 'b': [1, 2]})
df2 = pd.DataFrame({'a': [None, "a"], 'b': [1, 2]})
df1.merge(df2, how="inner", on=["a", "b"])
a b
0 None 1
1 a 2

With Fugue, the row with None will be dropped because it follows SQL convention.

fa.join(df1, df2, how="inner", on=["a","b"]) # None,1 is excluded
a b
0 a 2

Multiple Joins#

Multiple DataFrames can be joined together if there is no conflict. This will work across all engines.

df1 = pd.DataFrame({"a": [1], "b": [2]})
df2 = pd.DataFrame({"a": [1], "c": [3]})
df3 = pd.DataFrame({"a": [1], "d": [4]})
df4 = pd.DataFrame({"a": [1], "e": [5]})

fa.join(df1, df2, df3, df4, how="inner", on=["a"])
a b c d e
0 1 2 3 4 5

Union, Intersect, Subtract#

Fugue has support for Union, Intersect and Subtract. Union combines two DataFrames with the same columns. By default, only unique items are kept. Everything can be kept by setting distinct=False. Intersect gets the distinct elements of the intersection of the two DataFrames. Subtract gets the distinct elements of the left DataFrame that are not in the right DataFrame. Examples shown below.

df1 = pd.DataFrame({"a": [0,1], "b": [1,2]})
df2 = pd.DataFrame({"a": [0,0,1], "b": [1,1,2]})

fa.union(df1, df2)
a b
0 0 1
1 1 2
fa.union(df1, df2, distinct=False)
a b
0 0 1
1 1 2
2 0 1
3 0 1
4 1 2
fa.intersect(df1, df2)
a b
0 0 1
1 1 2
fa.subtract(df1, df2)
a b

Remember that join(), union(), subtract() and intersect() can all take in an engine argument to use the appropriate backend. They will also work with the engine_context().

Summary#

This sections covers all of the base operations Fugue offers when combining two or more DataFrames. If there is logic that is not covered by this functionality, then a user can implement a custom Fugue extension. The transformer we covered in previous sections is the most commonly used Fugue extension. In the extension section, we’ll cover the other extensions.