X-like Objects#

Have questions? Chat with us on Github or Slack:

Homepage Slack Status

In Fugue, it’s flexibile to initialize many built-in objects. This is a tutorial for all of them.

Schema#

Fugue creates a special syntax to represent schema: Separated by ,, each column type pair is <name>:<type expression>

For example: a:int,b:str or a:int,b_array:[int],c_dict:{x:int,y:str}

from fugue import Schema

print(Schema("a:int,b:str"))
print(Schema("a:int32,b_array:[int64],c_dict:{x:int,y:string}"))

# get pyarrow schema
schema = Schema(" a : int , b : str") # space is ok
print("pa schema", schema.pa_schema)

# more ways to initialized fugue Schema
print(Schema(schema.pa_schema)) # by pyarrow schema
print(Schema(c=str,d=int)) # pythonic way
print(Schema(dict(c=str,d=int))) # pythonic way
print(Schema("e:str","f:str")) # you can separate
print(Schema(["e:str","f:str"], ("g",int))) # you can separate, notice int in python means long in schema
print(Schema(Schema("a:int","b:str"))) # you can separate
a:int,b:str
a:int,b_array:[long],c_dict:{x:int,y:str}
pa schema a: int32
b: string
a:int,b:str
c:str,d:long
c:str,d:long
e:str,f:str
e:str,f:str,g:long
a:int,b:str

Partition#

from fugue import PartitionSpec

assert PartitionSpec().empty # empty partition spec means no operation needed, it can be the default value
PartitionSpec(num=4)
PartitionSpec(algo="even",num=4,by=["a","b"],presort="c,d desc") # c,d desc == c ASC, d DESC

# you can use expression in num, ROWCOUNT can be used to indicate using the row count of the dataframe to operate on
# if a df has 1000 rows, this means I want to even partition it to 10 rows per partition
PartitionSpec(algo="even",num="ROWCOUNT/10")

PartitionSpec({"num":4, "by":["a","b"]}) # from dict, using dict on `partition-like`  parameters is common
PartitionSpec('{"num":4}') # from json

a = PartitionSpec(num=4)
b = PartitionSpec(by=["a"])
c = PartitionSpec(a,b) # combine

p = PartitionSpec(num=4, by=["a"])
PartitionSpec(p, by=["a","b"], algo="even") # override
PartitionSpec(num='4', by=['a', 'b'], presort='')

RPC#

For callbacks you defined for transformers, you can provide a lambda function, a native python function, or an instance implementing RPCHandler

import pandas as pd
import fugue.api as fa
from fugue.rpc import RPCHandler

def print_columns_and_return(df:pd.DataFrame, cb:callable) -> pd.DataFrame:
    cb(str(df.columns))
    return df

def pt(x):
    print(x)

# RPCHandler
class Handler(RPCHandler):
    def __init__(self):
        super().__init__()
        
    def __call__(self, x):
        print(x)

df = pd.DataFrame([[0,0],[1,1],[0,1],[2,2]], columns=["a","b"])

# lambda
fa.transform(df, print_columns_and_return, schema="*", partition={"by": "a"}, callback = lambda x:print(x))

# function
fa.transform(df, print_columns_and_return, schema="*", partition={"by": "a"}, callback = pt)

# RPCHandler class
fa.transform(df, print_columns_and_return, schema="*", partition={"by": "a"}, callback = Handler())
Index(['a', 'b'], dtype='object')
Index(['a', 'b'], dtype='object')
Index(['a', 'b'], dtype='object')
Index(['a', 'b'], dtype='object')
Index(['a', 'b'], dtype='object')
Index(['a', 'b'], dtype='object')
Index(['a', 'b'], dtype='object')
Index(['a', 'b'], dtype='object')
Index(['a', 'b'], dtype='object')
a b
0 0 0
1 0 1
2 1 1
3 2 2