X-like Objects
Contents
X-like Objects#
Have questions? Chat with us on Github or Slack:
In Fugue, it’s flexibile to initialize many built-in objects. This is a tutorial for all of them.
Schema#
Fugue creates a special syntax to represent schema: Separated by ,
, each column type pair is <name>:<type expression>
For example: a:int,b:str
or a:int,b_array:[int],c_dict:{x:int,y:str}
from fugue import Schema
print(Schema("a:int,b:str"))
print(Schema("a:int32,b_array:[int64],c_dict:{x:int,y:string}"))
# get pyarrow schema
schema = Schema(" a : int , b : str") # space is ok
print("pa schema", schema.pa_schema)
# more ways to initialized fugue Schema
print(Schema(schema.pa_schema)) # by pyarrow schema
print(Schema(c=str,d=int)) # pythonic way
print(Schema(dict(c=str,d=int))) # pythonic way
print(Schema("e:str","f:str")) # you can separate
print(Schema(["e:str","f:str"], ("g",int))) # you can separate, notice int in python means long in schema
print(Schema(Schema("a:int","b:str"))) # you can separate
Parameters#
ParamDict
is not that flexible, it can only accept dict or list of tuples just like python dict. ParamDict
itself is a python dict.
from triad.collections import ParamDict
print(ParamDict())
print(ParamDict(dict(a=1,b="d")))
print(ParamDict([("a",1),("b","d")]))
DataFrame#
Normally, you should create a dataframe from ExecutionEngine or FugueWorkflow. In general, all execution engines and workflows support list/iterable of python arrays and pandas or Fugue dataframes.
from fugue import ExecutionEngine, FugueWorkflow, NativeExecutionEngine, PandasDataFrame
from fugue_dask import DaskExecutionEngine
import pandas as pd
def construct_df_by_execution_engine(eng:ExecutionEngine):
eng.to_df([[0]], "a:int", {"x":1}).show(title="from array")
df = PandasDataFrame([[0]], "a:int")
eng.to_df(df).show(title="from fugue dataframe")
eng.to_df(df.as_pandas()).show(title="from pandas dataframe")
construct_df_by_execution_engine(NativeExecutionEngine())
construct_df_by_execution_engine(DaskExecutionEngine()) # notice the dataframe types change
print("-----------------------------------")
def construct_df_by_workflow(eng:ExecutionEngine):
with FugueWorkflow(eng) as dag:
dag.df([[0]], "a:int", {"x":1}).show(title="from array")
df = PandasDataFrame([[0]], "a:int")
dag.df(df).show(title="from fugue dataframe")
dag.df(df.as_pandas()).show(title="from pandas dataframe")
construct_df_by_workflow(NativeExecutionEngine())
construct_df_by_workflow(DaskExecutionEngine()) # notice the dataframe types change
DataFrames#
DataFrames
is a type, it represents a collection of Fugue DataFrames. It can be dict-like where each dataframe has a name, or list-like. It is also an extensively used data structure in the framework
from fugue import DataFrames, ArrayDataFrame, ArrowDataFrame
df1 = ArrayDataFrame([[0]],"a:int")
df2 = ArrowDataFrame([[1]],"b:int")
dfs = DataFrames(df1, df2) # list-like
assert not dfs.has_key
assert df1 is dfs[0]
assert df2 is dfs[1]
# how to get values as an array in list-like DataFrames
print(list(dfs.values()))
dfs = DataFrames(x=df1, y=df2) # dict-like
assert dfs.has_key
assert df1 is dfs["x"]
assert df2 is dfs["y"]
assert isinstance(dfs, dict) # dfs itself is dict, so you know how to iterate
dfs = DataFrames(dict(x=df1,y=df2)) # another equal way to init dict-like
df3 = ArrowDataFrame([[1]],"a:int")
dfs1 = DataFrames(dict(x=df1,y=df2))
dfs2 = DataFrames(dfs1, z=df3) # DataFrames are immutable, but you can update in this way
dfs2 = DataFrames(dict(x=df1,y=df2), z=df3)
dfs3 = DataFrames(df1,df2)
dfs4 = DataFrames(dfs3, df3) # DataFrames are immutable, but you can update in this way
dfs4 = DataFrames([df1,df2], df3)
DataFrames in Programming Interface#
In Fugue programming interface, it’s common to use DataFrames, it’s also very flexible
from fugue import FugueWorkflow, ArrayDataFrame, DataFrames
from fugue_dask import DaskExecutionEngine
def show(dfs:DataFrames, title=""):
if title!="":
print(title)
for k,v in dfs.items():
v.show(title=k)
df1 = ArrayDataFrame([[0]],"a:int")
df2 = ArrayDataFrame([[1]],"b:int")
with FugueWorkflow(DaskExecutionEngine) as dag:
# inside output, it constructs DataFrames(df1, df2) and then convert both to WorkflowDataFrames
# you can see it's a list like dataframes, but due to the engine, they become DaskDataFrame
# all these conversions are automatic
dag.output(df1,df2,using=show, params={"title":"*args from raw dataframes"})
dag.output([df1,df2],using=show, params={"title":"list from raw dataframes"})
# dict-like input must be passed in as dicts
dag.output(dict(x=df1,y=df2),dict(z=df1),using=show, params={"title":"dict from raw dataframes"})
cdf1=dag.df(df1) # you can also convert the dataframes to WorkflowDataFrame explicitly (recommended)
dag.output(cdf1,df2,using=show, params={"title":"mixed"}) # and you can mix them, although not recommended
Partition#
from fugue import PartitionSpec
assert PartitionSpec().empty # empty partition spec means no operation needed, it can be the default value
PartitionSpec(num=4)
PartitionSpec(algo="even",num=4,by=["a","b"],presort="c,d desc") # c,d desc == c ASC, d DESC
# you can use expression in num, ROWCOUNT can be used to indicate using the row count of the dataframe to operate on
# if a df has 1000 rows, this means I want to even partition it to 10 rows per partition
PartitionSpec(algo="even",num="ROWCOUNT/10")
PartitionSpec({"num":4, "by":["a","b"]}) # from dict, using dict on `partition-like` parameters is common
PartitionSpec('{"num":4}') # from json
a = PartitionSpec(num=4)
b = PartitionSpec(by=["a"])
c = PartitionSpec(a,b) # combine
p = PartitionSpec(num=4, by=["a"])
PartitionSpec(p, by=["a","b"], algo="even") # override
RPC#
For callbacks you defined for transformers, you can provide a lambda function, a native python function, or an instance implementing RPCHandler
import pandas as pd
from fugue import FugueWorkflow
from fugue.rpc import RPCHandler
# schema: *
def print_describe_and_return(df:pd.DataFrame, cb:callable) -> pd.DataFrame:
cb(str(df.describe()))
return df
dag = FugueWorkflow()
df = dag.df([[0,0],[1,1],[0,1],[2,2]],"a:int,b:int")
# lambda
df.partition(by=["a"]).transform(print_describe_and_return, callback = lambda x:print(x)).show()
# function
def pt(x):
print(x)
df.partition(by=["a"]).transform(print_describe_and_return, callback = pt).show()
# RPCHandler
class Handler(RPCHandler):
def __init__(self):
super().__init__()
def __call__(self, x):
print(x)
df.partition(by=["a"]).transform(print_describe_and_return, callback = Handler()).show()
dag.run()