X-like Objects#

In Fugue, it’s flexibile to initialize many built-in objects. This is a tutorial for all of them.

Schema#

Fugue creates a special syntax to represent schema: Separated by ,, each column type pair is <name>:<type expression>

For example: a:int,b:str or a:int,b_array:[int],c_dict:{x:int,y:str}

from fugue import Schema

print(Schema("a:int,b:str"))
print(Schema("a:int32,b_array:[int64],c_dict:{x:int,y:string}"))

# get pyarrow schema
schema = Schema(" a : int , b : str") # space is ok
print("pa schema", schema.pa_schema)

# more ways to initialized fugue Schema
print(Schema(schema.pa_schema)) # by pyarrow schema
print(Schema(c=str,d=int)) # pythonic way
print(Schema(dict(c=str,d=int))) # pythonic way
print(Schema("e:str","f:str")) # you can separate
print(Schema(["e:str","f:str"], ("g",int))) # you can separate, notice int in python means long in schema
print(Schema(Schema("a:int","b:str"))) # you can separate

Parameters#

ParamDict is not that flexible, it can only accept dict or list of tuples just like python dict. ParamDict itself is a python dict.

from triad.collections import ParamDict

print(ParamDict())
print(ParamDict(dict(a=1,b="d")))
print(ParamDict([("a",1),("b","d")]))

DataFrame#

Normally, you should create a dataframe from ExecutionEngine or FugueWorkflow. In general, all execution engines and workflows support list/iterable of python arrays and pandas or Fugue dataframes.

from fugue import ExecutionEngine, FugueWorkflow, NativeExecutionEngine, PandasDataFrame
from fugue_dask import DaskExecutionEngine
import pandas as pd

def construct_df_by_execution_engine(eng:ExecutionEngine):
    eng.to_df([[0]], "a:int", {"x":1}).show(title="from array")
    df = PandasDataFrame([[0]], "a:int")
    eng.to_df(df).show(title="from fugue dataframe")
    eng.to_df(df.as_pandas()).show(title="from pandas dataframe")
    
construct_df_by_execution_engine(NativeExecutionEngine())
construct_df_by_execution_engine(DaskExecutionEngine())  # notice the dataframe types change

print("-----------------------------------")

def construct_df_by_workflow(eng:ExecutionEngine):
    with FugueWorkflow(eng) as dag:
        dag.df([[0]], "a:int", {"x":1}).show(title="from array")
        df = PandasDataFrame([[0]], "a:int")
        dag.df(df).show(title="from fugue dataframe")
        dag.df(df.as_pandas()).show(title="from pandas dataframe")
        
construct_df_by_workflow(NativeExecutionEngine())
construct_df_by_workflow(DaskExecutionEngine())  # notice the dataframe types change   

DataFrames#

DataFrames is a type, it represents a collection of Fugue DataFrames. It can be dict-like where each dataframe has a name, or list-like. It is also an extensively used data structure in the framework

from fugue import DataFrames, ArrayDataFrame, ArrowDataFrame

df1 = ArrayDataFrame([[0]],"a:int")
df2 = ArrowDataFrame([[1]],"b:int")

dfs = DataFrames(df1, df2) # list-like
assert not dfs.has_key
assert df1 is dfs[0]
assert df2 is dfs[1]
# how to get values as an array in list-like DataFrames
print(list(dfs.values()))

dfs = DataFrames(x=df1, y=df2) # dict-like
assert dfs.has_key
assert df1 is dfs["x"]
assert df2 is dfs["y"]
assert isinstance(dfs, dict) # dfs itself is dict, so you know how to iterate

dfs = DataFrames(dict(x=df1,y=df2)) # another equal way to init dict-like

df3 = ArrowDataFrame([[1]],"a:int")
dfs1 = DataFrames(dict(x=df1,y=df2))
dfs2 = DataFrames(dfs1, z=df3)  # DataFrames are immutable, but you can update in this way
dfs2 = DataFrames(dict(x=df1,y=df2), z=df3)

dfs3 = DataFrames(df1,df2)
dfs4 = DataFrames(dfs3, df3)  # DataFrames are immutable, but you can update in this way
dfs4 = DataFrames([df1,df2], df3) 

DataFrames in Programming Interface#

In Fugue programming interface, it’s common to use DataFrames, it’s also very flexible

from fugue import FugueWorkflow, ArrayDataFrame, DataFrames
from fugue_dask import DaskExecutionEngine

def show(dfs:DataFrames, title=""):
    if title!="":
        print(title)
    for k,v in dfs.items():
        v.show(title=k)

df1 = ArrayDataFrame([[0]],"a:int")
df2 = ArrayDataFrame([[1]],"b:int")

with FugueWorkflow(DaskExecutionEngine) as dag:
    # inside output, it constructs DataFrames(df1, df2) and then convert both to WorkflowDataFrames
    # you can see it's a list like dataframes, but due to the engine, they become DaskDataFrame
    # all these conversions are automatic
    dag.output(df1,df2,using=show, params={"title":"*args from raw dataframes"})
    dag.output([df1,df2],using=show, params={"title":"list from raw dataframes"})
    # dict-like input must be passed in as dicts
    dag.output(dict(x=df1,y=df2),dict(z=df1),using=show, params={"title":"dict from raw dataframes"})
    
    cdf1=dag.df(df1)  # you can also convert the dataframes to WorkflowDataFrame explicitly (recommended)
    dag.output(cdf1,df2,using=show, params={"title":"mixed"})  # and you can mix them, although not recommended   

Partition#

from fugue import PartitionSpec

assert PartitionSpec().empty # empty partition spec means no operation needed, it can be the default value
PartitionSpec(num=4)
PartitionSpec(algo="even",num=4,by=["a","b"],presort="c,d desc") # c,d desc == c ASC, d DESC

# you can use expression in num, ROWCOUNT can be used to indicate using the row count of the dataframe to operate on
# if a df has 1000 rows, this means I want to even partition it to 10 rows per partition
PartitionSpec(algo="even",num="ROWCOUNT/10")

PartitionSpec({"num":4, "by":["a","b"]}) # from dict, using dict on `partition-like`  parameters is common
PartitionSpec('{"num":4}') # from json

a = PartitionSpec(num=4)
b = PartitionSpec(by=["a"])
c = PartitionSpec(a,b) # combine

p = PartitionSpec(num=4, by=["a"])
PartitionSpec(p, by=["a","b"], algo="even") # override

RPC#

For callbacks you defined for transformers, you can provide a lambda function, a native python function, or an instance implementing RPCHandler

import pandas as pd
from fugue import FugueWorkflow
from fugue.rpc import RPCHandler

# schema: *
def print_describe_and_return(df:pd.DataFrame, cb:callable) -> pd.DataFrame:
    cb(str(df.describe()))
    return df

dag = FugueWorkflow()
df = dag.df([[0,0],[1,1],[0,1],[2,2]],"a:int,b:int")

# lambda
df.partition(by=["a"]).transform(print_describe_and_return, callback = lambda x:print(x)).show()

# function
def pt(x):
    print(x)

df.partition(by=["a"]).transform(print_describe_and_return, callback = pt).show()

# RPCHandler
class Handler(RPCHandler):
    def __init__(self):
        super().__init__()
        
    def __call__(self, x):
        print(x)
        
df.partition(by=["a"]).transform(print_describe_and_return, callback = Handler()).show()

dag.run()