Pivot#

To pivot a DataFrame, you will need to directly inherit the Transformer base class because it needs a lot of customization.

import pandas as pd
from typing import List, Any

from fugue import DataFrame, LocalDataFrame, Transformer, Schema, ArrayDataFrame

class Pivot(Transformer):
    def get_output_schema(self, df:DataFrame):
        self.index_cols = self.partition_spec.partition_by
        self.key_col = self.params.get_or_throw("key_col", str)
        self.value_col = self.params.get_or_throw("value_col", str)
        self.values = self.params.get_or_throw("values", list)
        self.value_type = df.schema[self.value_col].type
        
        sub_schema = Schema([(v, self.value_type) for v in self.values])
        return df.schema.extract(self.index_cols) + sub_schema
    
    def validate_on_compile(self) -> None:
        assert len(self.partition_spec.partition_by) > 0
        
    def transform(self, df:LocalDataFrame):
        pdf = df[[self.key_col, self.value_col]].as_pandas()
        res = self.cursor.key_value_array
        kv = dict(zip(pdf[self.key_col], pdf[self.value_col]))
        values = [kv.get(k, None) for k in self.values]
        return ArrayDataFrame([res+values], self.output_schema)
df = pd.DataFrame(dict(
    id = [1,1,2,2,2],
    id2 = [10,10,20,20,20],
    key = ["a","b","a","b","c"],
    v1 = [3,1,2,3,4],
    v2 = [30,10,20,30,40],
))
from fugue import transform

transform(
    df, Pivot,
    params=dict(key_col="key", value_col="v1", values=["a","b","c"]),
    partition={"by":"id"},
)
id a b c
0 1 3 1 NaN
1 2 2 3 4.0

This can also be run distributedly by supplying an execution engine.

from fugue import FugueWorkflow, WorkflowDataFrame, module

@module(as_method=True)
def pivot(df:WorkflowDataFrame, index, key_col, value_col, values) -> WorkflowDataFrame:
    return df.partition(by=index).transform(Pivot, params=dict(key_col=key_col, value_col=value_col, values=values))
dag = FugueWorkflow()
dag.df(df).pivot("id", "key", "v1", ["a","b","c"]).show()

dag.run()
PandasDataFrame
id:long|a:long|b:long|c:long
-------+------+------+------
1      |3     |1     |NULL  
2      |2     |3     |4     
Total count: 2
DataFrames()
from fugue_notebook import setup
setup()
%%fsql
SUB df USING pivot(index='id', key_col='key', value_col='v1', values=['a','b','c'])
PRINT
id a b c
0 1 3 1 NaN
1 2 2 3 4.0
schema: id:long,a:long,b:long,c:long

This is also related with a concept chain that is difficult to understand (we don’t most average users to understand) that is transformer (worker side extension) -> processor (driver side extension) -> module (workflow level extension). We expect most users will just use transformers to solve problems and not to implement any Fugue interface. But we do have a sophisticated system behind the simple scenarios. Also, if you want to use Spark as the backend, you should let Fugue use pandas udf to accelerate, that can be much faster.

from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/06/16 11:11:56 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
transform(
    df, Pivot,
    params=dict(key_col="key", value_col="v1", values=["a","b","c"]),
    partition={"by":"id"},
    engine=spark, engine_conf={"fugue.spark.use_pandas_udf":True}
)
DataFrame[id: bigint, a: bigint, b: bigint, c: bigint]