Data Type, Schema & DataFrames#

Have questions? Chat with us on Github or Slack:

Homepage Slack Status

Data Types#

Fugue does not have its own data types. Instead, we use a subset of data types from Apache Arrow. Most of the pyarrow data types are supported. To see what the complete supported list you can read this and its source code.

Non-nested Types#

For non-nested types, here is the list. is_primary means if converting from pyarrow type to Fugue expression, we will use the primary ones. If you are interested, it’s generated by this Fugue code

Primary

Fugue Expression

PyArrow

YES

bytes

binary

.

binary

binary

YES

bool

bool

.

boolean

bool

YES

date

date32[day]

YES

double

double

.

float64

double

YES

float

float

.

float32

float

YES

float16

halffloat

YES

short

int16

.

int16

int16

YES

int

int32

.

int32

int32

YES

long

int64

.

int64

int64

YES

byte

int8

.

int8

int8

YES

null

null

YES

str

string

.

string

string

YES

datetime

timestamp[us]

YES

ushort

uint16

.

uint16

uint16

YES

uint

uint32

.

uint32

uint32

YES

ulong

uint64

.

uint64

uint64

YES

ubyte

uint8

.

uint8

uint8

Nested Types#

pa.ListType and pa.StructType are supported. For list type, the type expression is [<element type>], for struct type, it is json like expression, for example {a:int,b:[str]} meaning the data is a dict with key a as int and b as a list of string.

Notice, it is just a way to express pyarrow data types, it does not invent new types.

Schema#

Again, Fugue does not invent schema, it uses pyarrow schema. But Fugue creates a special syntax to represent schema: Separated by ,, each column type pair is <name>:<type expression>

For example: a:int,b:str or a:int,b_array:[int],c_dict:{x:int,y:str}

Now let’s see some examples using the API:

from fugue import Schema

print(Schema("a:int,b:str"))
print(Schema("a:int32,b_array:[int64],c_dict:{x:int,y:string}"))

# get pyarrow schema
schema = Schema(" a : int , b : str") # space is ok
print("pa schema", schema.pa_schema)

# more ways to initialized fugue Schema
print(Schema(schema.pa_schema)) # by pyarrow schema
print(Schema(c=str,d=int)) # pythonic way
print(Schema("e:str","f:str")) # you can separate

# Compare schema with string
assert Schema("a: int, b: int64") == "a:int,b:long"

# Operations
print(Schema("a:int")+Schema("b:str"))
print(Schema("a:int")+"b:str")
print(Schema("a:int,c:int,d:int") - ["c"]) # for '-' all cols must exist
print(Schema("a:int,c:int,d:int").exclude(["c","x"])) # exclude means exclude if exists
print(Schema("a:int,c:int,d:int").extract(["d","a"]))

Schema is very flexiible, for full API reference, please read this

DataFrame#

All Fugue operations are on DataFrames, there is no concept such as RDD or arbitrary object (they are not the core concepts Fugue wants to unify, but you still can use them easily in this framework, see other tutorials). DataFrame is an abstract concept, it is schemed dataset. And schema has been defined above.

The motivation of Fugue DataFrame is significantly different from other ideas such as Dask, Modin or Koalas. Fugue DataFrame is not to become another pandas-like DataFrame. And Fugue is NOT going to use Pandas language to unify data processing. That being said, Pandas and Pandas-like dataframes are still widely used and well supported in this framework, because it’s an important component for data science.

LocalDataFrame#

These are built in local dataframes of Fugue:

  • ArrayDataFrame: the underlying data is an array

  • IterableDataFrame: the underlying is an iterable, this unbounded dataframe, it is extremely useful to process dataset with unknown size, it minimizes the memory usage and exit from the iteration any time.

  • PandasDataFrame: adapter to pandas DataFrame

  • ArrowDataFrame: adapter to arrow Table

You can convert between each other. For all DataFrames, they all can convert to local dataframes, or local bounded dataframes.

Other DataFrames#

These are built in non-local dataframes. To use them, you need to pip install the extras

  • SparkDataFrame: wrapper of Spark DataFrame

  • DaskDataFrame: wrapper of Dask DataFrame

Initialization & Conversion#

It’s more important to learn how to initialize local dataframes because by using Fugue, in most of the cases, you only deal with local dataframes.

from fugue import ArrayDataFrame, ArrowDataFrame, IterableDataFrame, PandasDataFrame

data = [[0,"a"],[1,"b"]]
schema = "x:int,y:str"

# The most basic initialization is the same
ArrayDataFrame(data,schema).show()
ArrowDataFrame(data,schema).show()
IterableDataFrame(data,schema).show()
PandasDataFrame(data,schema).show()

# common methods as_array, as_array_iterable, as_dict_iterable
print(ArrowDataFrame(data,schema).as_array())
print(ArrowDataFrame(data,schema).as_array_iterable()) # iterator object
# type safe is very useful
df = ArrayDataFrame(data, "x:str,y:str")
assert isinstance(df.as_array()[0][0], int) # as_array or as_array_iterable by default returns raw data
assert isinstance(df.as_array(type_safe=True)[0][0], str) # turn on type safe to return the data according to schema


# as_pandas is the common interface for all DataFrames
pdf = ArrayDataFrame(data,schema).as_pandas()
print(pdf)
PandasDataFrame(pdf).show() # convert pd.DataFrame to PandasDataFrame

# as_arrow is the common interface for all DataFrames
adf = ArrayDataFrame(data,schema).as_arrow()
print(adf)
ArrowDataFrame(adf).show() # convert arrow table to ArrowDataFrame

# access the native data structure using .native, it applies for all built in DataFrames
print(ArrayDataFrame(data,schema).native)
print(ArrowDataFrame(data,schema).native)

IterableDataFrame is special because it is not bounded. But one important feature of Fugue IterableDataFrame is that it is empty aware, so at any point you can check if the dataframe is empty and peek the current first row, it will not affect the iteration. If you are interested in the detail, read this

from fugue import IterableDataFrame

data = [[0,"a"],[1,"b"]]
schema = "x:int,y:str"

# The most basic initialization is the same
df = IterableDataFrame(data,schema)
assert not df.empty
df.show()
assert df.empty # because show consumes all elements of the iterable

# it is ok to convert to other types, but they can work only once
print(IterableDataFrame(data,schema).as_array())
print(IterableDataFrame(data,schema).as_pandas())
print(IterableDataFrame(data,schema).as_arrow())

# common way to use
df = IterableDataFrame(data,schema)
for row in df.as_dict_iterable():
    print(row)

    
from fugue.dataframe.utils import to_local_df, to_local_bounded_df

df = IterableDataFrame(data,schema)
assert to_local_df(df) is df # because it is already local dataframe
assert to_local_bounded_df(df) is not df # because it is not bounded

For non-local dataframes, you can convert them to local dataframe. But the initialization will depend on specific execution engines. Here we only use DaskDataFrame as an example

from fugue_dask import DaskDataFrame, DaskExecutionEngine

data = [[0,"a"],[1,"b"]]
schema = "x:int,y:str"

engine = DaskExecutionEngine()
df = engine.to_df(data, schema) # use engine.to_df is the best way to generate engine dependent dataframes
assert isinstance(df, DaskDataFrame)

df.as_local().show() # it converts to a local DataFrame, for Dask, it becomes a PandasDataFrame
print(df.native) # access the dask dataframe


from fugue.dataframe.utils import to_local_df, to_local_bounded_df

df = engine.to_df(data, schema)
to_local_df(df).show()
to_local_bounded_df(df).show() # this is stronger, it prevents using IterableDataFrame

Commonly, you only tell Fugue: I want to create a dataframe, and here is the raw data source. And Fugue with certain execution engine will do the job for you. In your own logic, you mostly care about two abstract types in you functions: DataFrame and LocalDataFrame. They can be seen in the Extensions section of the tutorials

DataFrames#

DataFrames is a type, it represents a collection of Fugue DataFrames. It can be dict-like where each dataframe has a name, or list-like. It is also an extensively used data structure in the framework

from fugue import DataFrames, ArrayDataFrame, ArrowDataFrame

df1 = ArrayDataFrame([[0]],"a:int")
df2 = ArrowDataFrame([[1]],"b:int")

dfs = DataFrames(df1, df2) # list-like
assert not dfs.has_key
assert df1 is dfs[0]
assert df2 is dfs[1]
# how to get values as an array in list-like DataFrames
print(list(dfs.values()))

dfs = DataFrames(x=df1, y=df2) # dict-like
assert dfs.has_key
assert df1 is dfs["x"]
assert df2 is dfs["y"]
assert isinstance(dfs, dict) # dfs itself is dict, so you know how to iterate

dfs = DataFrames(dict(x=df1,y=df2)) # another equal way to init dict-like