X-like Objects
Contents
X-like Objects#
Have questions? Chat with us on Github or Slack:
In Fugue, it’s flexibile to initialize many built-in objects. This is a tutorial for all of them.
Schema#
Fugue creates a special syntax to represent schema: Separated by ,
, each column type pair is <name>:<type expression>
For example: a:int,b:str
or a:int,b_array:[int],c_dict:{x:int,y:str}
from fugue import Schema
print(Schema("a:int,b:str"))
print(Schema("a:int32,b_array:[int64],c_dict:{x:int,y:string}"))
# get pyarrow schema
schema = Schema(" a : int , b : str") # space is ok
print("pa schema", schema.pa_schema)
# more ways to initialized fugue Schema
print(Schema(schema.pa_schema)) # by pyarrow schema
print(Schema(c=str,d=int)) # pythonic way
print(Schema(dict(c=str,d=int))) # pythonic way
print(Schema("e:str","f:str")) # you can separate
print(Schema(["e:str","f:str"], ("g",int))) # you can separate, notice int in python means long in schema
print(Schema(Schema("a:int","b:str"))) # you can separate
a:int,b:str
a:int,b_array:[long],c_dict:{x:int,y:str}
pa schema a: int32
b: string
a:int,b:str
c:str,d:long
c:str,d:long
e:str,f:str
e:str,f:str,g:long
a:int,b:str
Partition#
from fugue import PartitionSpec
assert PartitionSpec().empty # empty partition spec means no operation needed, it can be the default value
PartitionSpec(num=4)
PartitionSpec(algo="even",num=4,by=["a","b"],presort="c,d desc") # c,d desc == c ASC, d DESC
# you can use expression in num, ROWCOUNT can be used to indicate using the row count of the dataframe to operate on
# if a df has 1000 rows, this means I want to even partition it to 10 rows per partition
PartitionSpec(algo="even",num="ROWCOUNT/10")
PartitionSpec({"num":4, "by":["a","b"]}) # from dict, using dict on `partition-like` parameters is common
PartitionSpec('{"num":4}') # from json
a = PartitionSpec(num=4)
b = PartitionSpec(by=["a"])
c = PartitionSpec(a,b) # combine
p = PartitionSpec(num=4, by=["a"])
PartitionSpec(p, by=["a","b"], algo="even") # override
PartitionSpec(num='4', by=['a', 'b'], presort='')
RPC#
For callbacks you defined for transformers, you can provide a lambda function, a native python function, or an instance implementing RPCHandler
import pandas as pd
import fugue.api as fa
from fugue.rpc import RPCHandler
def print_columns_and_return(df:pd.DataFrame, cb:callable) -> pd.DataFrame:
cb(str(df.columns))
return df
def pt(x):
print(x)
# RPCHandler
class Handler(RPCHandler):
def __init__(self):
super().__init__()
def __call__(self, x):
print(x)
df = pd.DataFrame([[0,0],[1,1],[0,1],[2,2]], columns=["a","b"])
# lambda
fa.transform(df, print_columns_and_return, schema="*", partition={"by": "a"}, callback = lambda x:print(x))
# function
fa.transform(df, print_columns_and_return, schema="*", partition={"by": "a"}, callback = pt)
# RPCHandler class
fa.transform(df, print_columns_and_return, schema="*", partition={"by": "a"}, callback = Handler())
Index(['a', 'b'], dtype='object')
Index(['a', 'b'], dtype='object')
Index(['a', 'b'], dtype='object')
Index(['a', 'b'], dtype='object')
Index(['a', 'b'], dtype='object')
Index(['a', 'b'], dtype='object')
Index(['a', 'b'], dtype='object')
Index(['a', 'b'], dtype='object')
Index(['a', 'b'], dtype='object')
a | b | |
---|---|---|
0 | 0 | 0 |
1 | 0 | 1 |
2 | 1 | 1 |
3 | 2 | 2 |