Transformations
Contents
Transformations#
Have questions? Chat with us on Github or Slack:
We already saw some Fugue API functions including transform()
, save()
, load()
, and show()
. This section covers the other available functions under the Fugue API. All the details of the individual functions can be found in the Fugue API documentation.
Setup#
import pandas as pd
import fugue.api as fa
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
df = pd.DataFrame({"a": [1,2,3], "b": ["Apple", "Banana", "Carrot"]})
df.to_parquet("/tmp/df.parquet")
sdf = spark.createDataFrame(df)
Engine-awareness#
Some of the functions presented can take in an engine while others just operate on whatever DataFrame is passed in. This is because it makes less sense to expose conversion for some operations. For example, drop_columns()
or rename()
all just get directly executed on whatever DataFrame is passed in.
In practice though, users don’t need to worry about which functions are engine-aware and which aren’t because the main interface for using these would be through the engine_context()
. For example, the code below will run on Pandas if no engine is provided. If spark
is used as the engine, the Spark DataFrame will be used for all the operations.
The top level API documentation contains which functions are engine-aware.
with fa.engine_context():
df = fa.load("/tmp/df.parquet")
res = fa.drop_columns(df, ["b"])
res = fa.rename(res, {"a": "_a"})
fa.show(res)
PandasDataFrame
_a:long
-------
1
2
3
Total count: 3
Note that some functions can be evaluated lazily because of the underlying execution engine used. For example:
fa.drop_columns(sdf, ["a"])
DataFrame[b: string]
To evaluate these, we need to call an action like fa.show()
or fa.save()
Engine-less Functions#
Alter Columns#
Takes a Fugue schema expression and updates the column types.
fa.alter_columns(sdf, "a:float")
DataFrame[a: float, b: string]
Drop Columns#
Drops columns from a DataFrame.
fa.drop_columns(df, ["a"])
b | |
---|---|
0 | Apple |
1 | Banana |
2 | Carrot |
Head#
Returns the first n
rows of the DataFrame. head()
is special because the result will always be a local DataFrame even if it operates on a distributed DataFrame. For example, a Spark DataFrame input will still result in Pandas DataFrame output.
# Notice Spark is converted
fa.head(sdf, n=2)
a | b | |
---|---|---|
0 | 1 | Apple |
1 | 2 | Banana |
Rename#
Takes in a dictionary mapping to rename columns of the DataFrame.
fa.rename(df, {"a": "_a"})
_a | b | |
---|---|---|
0 | 1 | Apple |
1 | 2 | Banana |
2 | 3 | Carrot |
Select Columns#
Takes a list of columns to return.
fa.select_columns(df, ["b"])
b | |
---|---|
0 | Apple |
1 | Banana |
2 | Carrot |
Engine-aware Functions#
These functions will execute using the engine specified.
Distinct#
Returns distinct rows of a DataFrame.
temp = pd.DataFrame({"a": [1,1]})
fa.show(fa.distinct(temp, engine="spark"))
SparkDataFrame
a:long
------
1
Total count: 1
Dropna#
Drops records with NA values. This function has some additional kwargs that can be found in the documentation.
temp = pd.DataFrame({"a": [None,1]})
fa.dropna(temp, engine=None)
a | |
---|---|
0 | 1.0 |
Fillna#
Fills null values with a specified value. This function has some additional kwargs that can be found in the documentation.
fa.fillna(temp, value=1, engine=None)
a | |
---|---|
0 | 1.0 |
1 | 1.0 |
Sample#
Samples the DataFrame using either number of rows or fraction of data.
fa.show(fa.sample(df, n=2, engine=spark))
SparkDataFrame
a:long|b:str
------+---------------------------------------------------------------------------------------------
1 |Apple
2 |Banana
Total count: 2
Take#
Returns top n rows per partition. If the DataFrame is not partitioned, it takes the top n
rows.
df
a | b | |
---|---|---|
0 | 1 | Apple |
1 | 2 | Banana |
2 | 3 | Carrot |
fa.show(fa.take(df, 2, presort="b desc", partition=None))
fa.show(fa.take(df, 1, presort="b desc", partition={"by": "a"})) # returns 1 for for each value of a
PandasDataFrame
a:long|b:str
------+---------------------------------------------------------------------------------------------
3 |Carrot
2 |Banana
Total count: 2
PandasDataFrame
a:long|b:str
------+---------------------------------------------------------------------------------------------
3 |Carrot
2 |Banana
1 |Apple
Total count: 3
Raw SQL#
fa.raw_sql()
allows us to run a SQL query as a chain of strings and Python DataFrames. Fugue does not do any extra parsing or handling with this function call. It gets pushed down to the execution engine like SparkSQL.
fa.raw_sql("SELECT * FROM",df)
a | b | |
---|---|---|
0 | 1 | Apple |
1 | 2 | Banana |
2 | 3 | Carrot |
# Using duckdb
fa.raw_sql("SELECT * FROM",df, engine="duckdb")
pyarrow.Table
a: int64
b: string
----
a: [[1,2,3]]
b: [["Apple","Banana","Carrot"]]
df2 = pd.DataFrame({"a": [1,2,3], "c": [1,2,3]})
fa.show(fa.raw_sql("SELECT df.a,b,c FROM",df," AS df INNER JOIN",df2," AS df2 ON df.a=df2.a", engine=spark))
SparkDataFrame
a:long|b:str |c:long
------+--------------------------------------------------------------------------------------+------
2 |Banana |2
3 |Carrot |3
1 |Apple |1
Total count: 3