Transformations#

Have questions? Chat with us on Github or Slack:

Homepage Slack Status

We already saw some Fugue API functions including transform(), save(), load(), and show(). This section covers the other available functions under the Fugue API. All the details of the individual functions can be found in the Fugue API documentation.

Setup#

import pandas as pd
import fugue.api as fa 
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

df = pd.DataFrame({"a": [1,2,3], "b": ["Apple", "Banana", "Carrot"]})
df.to_parquet("/tmp/df.parquet")
sdf = spark.createDataFrame(df)

Engine-awareness#

Some of the functions presented can take in an engine while others just operate on whatever DataFrame is passed in. This is because it makes less sense to expose conversion for some operations. For example, drop_columns() or rename() all just get directly executed on whatever DataFrame is passed in.

In practice though, users don’t need to worry about which functions are engine-aware and which aren’t because the main interface for using these would be through the engine_context(). For example, the code below will run on Pandas if no engine is provided. If spark is used as the engine, the Spark DataFrame will be used for all the operations.

The top level API documentation contains which functions are engine-aware.

with fa.engine_context():
    df = fa.load("/tmp/df.parquet")
    res = fa.drop_columns(df, ["b"])
    res = fa.rename(res, {"a": "_a"})
    fa.show(res)
PandasDataFrame
_a:long
-------
1      
2      
3      
Total count: 3

Note that some functions can be evaluated lazily because of the underlying execution engine used. For example:

fa.drop_columns(sdf, ["a"])
DataFrame[b: string]

To evaluate these, we need to call an action like fa.show() or fa.save()

Engine-less Functions#

Alter Columns#

Takes a Fugue schema expression and updates the column types.

fa.alter_columns(sdf, "a:float")
DataFrame[a: float, b: string]

Drop Columns#

Drops columns from a DataFrame.

fa.drop_columns(df, ["a"])
b
0 Apple
1 Banana
2 Carrot

Rename#

Takes in a dictionary mapping to rename columns of the DataFrame.

fa.rename(df, {"a": "_a"})
_a b
0 1 Apple
1 2 Banana
2 3 Carrot

Select Columns#

Takes a list of columns to return.

fa.select_columns(df, ["b"])
b
0 Apple
1 Banana
2 Carrot

Engine-aware Functions#

These functions will execute using the engine specified.

Distinct#

Returns distinct rows of a DataFrame.

temp = pd.DataFrame({"a": [1,1]})
fa.show(fa.distinct(temp, engine="spark"))
SparkDataFrame
a:long
------
1     
Total count: 1

Dropna#

Drops records with NA values. This function has some additional kwargs that can be found in the documentation.

temp = pd.DataFrame({"a": [None,1]})
fa.dropna(temp, engine=None)
a
0 1.0

Fillna#

Fills null values with a specified value. This function has some additional kwargs that can be found in the documentation.

fa.fillna(temp, value=1, engine=None)
a
0 1.0
1 1.0

Sample#

Samples the DataFrame using either number of rows or fraction of data.

fa.show(fa.sample(df, n=2, engine=spark))
SparkDataFrame
a:long|b:str                                                                                        
------+---------------------------------------------------------------------------------------------
1     |Apple                                                                                        
2     |Banana                                                                                       
Total count: 2

Take#

Returns top n rows per partition. If the DataFrame is not partitioned, it takes the top n rows.

df
a b
0 1 Apple
1 2 Banana
2 3 Carrot
fa.show(fa.take(df, 2, presort="b desc", partition=None))
fa.show(fa.take(df, 1, presort="b desc", partition={"by": "a"}))   # returns 1 for for each value of a
PandasDataFrame
a:long|b:str                                                                                        
------+---------------------------------------------------------------------------------------------
3     |Carrot                                                                                       
2     |Banana                                                                                       
Total count: 2

PandasDataFrame
a:long|b:str                                                                                        
------+---------------------------------------------------------------------------------------------
3     |Carrot                                                                                       
2     |Banana                                                                                       
1     |Apple                                                                                        
Total count: 3

Raw SQL#

fa.raw_sql() allows us to run a SQL query as a chain of strings and Python DataFrames. Fugue does not do any extra parsing or handling with this function call. It gets pushed down to the execution engine like SparkSQL.

fa.raw_sql("SELECT * FROM",df)
a b
0 1 Apple
1 2 Banana
2 3 Carrot
# Using duckdb
fa.raw_sql("SELECT * FROM",df, engine="duckdb")
pyarrow.Table
a: int64
b: string
----
a: [[1,2,3]]
b: [["Apple","Banana","Carrot"]]
df2 = pd.DataFrame({"a": [1,2,3], "c": [1,2,3]})
fa.show(fa.raw_sql("SELECT df.a,b,c FROM",df," AS df INNER JOIN",df2," AS df2 ON df.a=df2.a", engine=spark))
SparkDataFrame
a:long|b:str                                                                                 |c:long
------+--------------------------------------------------------------------------------------+------
2     |Banana                                                                                |2     
3     |Carrot                                                                                |3     
1     |Apple                                                                                 |1     
Total count: 3