Data Type, Schema & DataFrames
Contents
Data Type, Schema & DataFrames#
Have questions? Chat with us on Github or Slack:
Data Types#
Fugue does not have its own data types. Instead, we use a subset of data types from Apache Arrow. Most of the pyarrow data types are supported. To see what the complete supported list you can read this and its source code.
Non-nested Types#
For non-nested types, here is the list. is_primary
means if converting from pyarrow type to Fugue expression, we will use the primary ones. If you are interested, it’s generated by this Fugue code
Primary |
Fugue Expression |
PyArrow |
---|---|---|
YES |
bytes |
binary |
. |
binary |
binary |
YES |
bool |
bool |
. |
boolean |
bool |
YES |
date |
date32[day] |
YES |
double |
double |
. |
float64 |
double |
YES |
float |
float |
. |
float32 |
float |
YES |
float16 |
halffloat |
YES |
short |
int16 |
. |
int16 |
int16 |
YES |
int |
int32 |
. |
int32 |
int32 |
YES |
long |
int64 |
. |
int64 |
int64 |
YES |
byte |
int8 |
. |
int8 |
int8 |
YES |
null |
null |
YES |
str |
string |
. |
string |
string |
YES |
datetime |
timestamp[us] |
YES |
ushort |
uint16 |
. |
uint16 |
uint16 |
YES |
uint |
uint32 |
. |
uint32 |
uint32 |
YES |
ulong |
uint64 |
. |
uint64 |
uint64 |
YES |
ubyte |
uint8 |
. |
uint8 |
uint8 |
Nested Types#
pa.ListType
and pa.StructType
are supported. For list type, the type expression is [<element type>]
, for struct type, it is json like expression, for example {a:int,b:[str]}
meaning the data is a dict with key a
as int and b
as a list of string.
Notice, it is just a way to express pyarrow data types, it does not invent new types.
Schema#
Again, Fugue does not invent schema, it uses pyarrow schema. But Fugue creates a special syntax to represent schema: Separated by ,
, each column type pair is <name>:<type expression>
For example: a:int,b:str
or a:int,b_array:[int],c_dict:{x:int,y:str}
Now let’s see some examples using the API:
from fugue import Schema
print(Schema("a:int,b:str"))
print(Schema("a:int32,b_array:[int64],c_dict:{x:int,y:string}"))
# get pyarrow schema
schema = Schema(" a : int , b : str") # space is ok
print("pa schema", schema.pa_schema)
# more ways to initialized fugue Schema
print(Schema(schema.pa_schema)) # by pyarrow schema
print(Schema(c=str,d=int)) # pythonic way
print(Schema("e:str","f:str")) # you can separate
# Compare schema with string
assert Schema("a: int, b: int64") == "a:int,b:long"
# Operations
print(Schema("a:int")+Schema("b:str"))
print(Schema("a:int")+"b:str")
print(Schema("a:int,c:int,d:int") - ["c"]) # for '-' all cols must exist
print(Schema("a:int,c:int,d:int").exclude(["c","x"])) # exclude means exclude if exists
print(Schema("a:int,c:int,d:int").extract(["d","a"]))
Schema
is very flexiible, for full API reference, please read this
DataFrame#
All Fugue operations are on DataFrames, there is no concept such as RDD
or arbitrary object (they are not the core concepts Fugue wants to unify, but you still can use them easily in this framework, see other tutorials). DataFrame
is an abstract concept, it is schemed dataset. And schema has been defined above.
The motivation of Fugue DataFrame is significantly different from other ideas such as Dask, Modin or Koalas. Fugue DataFrame is not to become another pandas-like DataFrame. And Fugue is NOT going to use Pandas language to unify data processing. That being said, Pandas and Pandas-like dataframes are still widely used and well supported in this framework, because it’s an important component for data science.
LocalDataFrame#
These are built in local dataframes of Fugue:
ArrayDataFrame: the underlying data is an array
IterableDataFrame: the underlying is an iterable, this unbounded dataframe, it is extemely useful to process dataset with unknown size, it minimizes the memory usage and exit from the iteration any time.
PandasDataFrame: adapter to pandas DataFrame
ArrowDataFrame: adapter to arrow Table
You can convert between each other. For all DataFrames, they all can convert to local dataframes, or local bounded dataframes.
Other DataFrames#
These are built in non-local dataframes. To use them, you need to pip install the extras
SparkDataFrame: wrapper of Spark DataFrame
DaskDataFrame: wrapper of Dask DataFrame
Initialization & Conversion#
It’s more important to learn how to initialize local dataframes because by using Fugue, in most of the cases, you only deal with local dataframes.
from fugue import ArrayDataFrame, ArrowDataFrame, IterableDataFrame, PandasDataFrame
data = [[0,"a"],[1,"b"]]
schema = "x:int,y:str"
# The most basic initialization is the same
ArrayDataFrame(data,schema).show()
ArrowDataFrame(data,schema).show()
IterableDataFrame(data,schema).show()
PandasDataFrame(data,schema).show()
# common methods as_array, as_array_iterable, as_dict_iterable
print(ArrowDataFrame(data,schema).as_array())
print(ArrowDataFrame(data,schema).as_array_iterable()) # iterator object
# type safe is very useful
df = ArrayDataFrame(data, "x:str,y:str")
assert isinstance(df.as_array()[0][0], int) # as_array or as_array_iterable by default returns raw data
assert isinstance(df.as_array(type_safe=True)[0][0], str) # turn on type safe to return the data according to schema
# as_pandas is the common interface for all DataFrames
pdf = ArrayDataFrame(data,schema).as_pandas()
print(pdf)
PandasDataFrame(pdf).show() # convert pd.DataFrame to PandasDataFrame
# as_arrow is the common interface for all DataFrames
adf = ArrayDataFrame(data,schema).as_arrow()
print(adf)
ArrowDataFrame(adf).show() # convert arrow table to ArrowDataFrame
# access the native data structure using .native, it applies for all built in DataFrames
print(ArrayDataFrame(data,schema).native)
print(ArrowDataFrame(data,schema).native)
IterableDataFrame
is special because it is not bounded. But one important feature of Fugue IterableDataFrame
is that it is empty aware, so at any point you can check if the dataframe is empty and peek the current first row, it will not affect the iteration. If you are interested in the detail, read this
from fugue import IterableDataFrame
data = [[0,"a"],[1,"b"]]
schema = "x:int,y:str"
# The most basic initialization is the same
df = IterableDataFrame(data,schema)
assert not df.empty
df.show()
assert df.empty # because show consumes all elements of the iterable
# it is ok to convert to other types, but they can work only once
print(IterableDataFrame(data,schema).as_array())
print(IterableDataFrame(data,schema).as_pandas())
print(IterableDataFrame(data,schema).as_arrow())
# common way to use
df = IterableDataFrame(data,schema)
for row in df.as_dict_iterable():
print(row)
from fugue.dataframe.utils import to_local_df, to_local_bounded_df
df = IterableDataFrame(data,schema)
assert to_local_df(df) is df # because it is already local dataframe
assert to_local_bounded_df(df) is not df # because it is not bounded
For non-local dataframes, you can convert them to local dataframe. But the initialization will depend on specific execution engines. Here we only use DaskDataFrame as an example
from fugue_dask import DaskDataFrame, DaskExecutionEngine
data = [[0,"a"],[1,"b"]]
schema = "x:int,y:str"
engine = DaskExecutionEngine()
df = engine.to_df(data, schema) # use engine.to_df is the best way to generate engine dependent dataframes
assert isinstance(df, DaskDataFrame)
df.as_local().show() # it converts to a local DataFrame, for Dask, it becomes a PandasDataFrame
print(df.native) # access the dask dataframe
from fugue.dataframe.utils import to_local_df, to_local_bounded_df
df = engine.to_df(data, schema)
to_local_df(df).show()
to_local_bounded_df(df).show() # this is stronger, it prevents using IterableDataFrame
Commonly, you only tell Fugue: I want to create a dataframe, and here is the raw data source. And Fugue with certain execution engine will do the job for you. In your own logic, you mostly care about two abstract types in you functions: DataFrame
and LocalDataFrame
. They can be seen in the Extensions section of the tutorials
DataFrames#
DataFrames
is a type, it represents a collection of Fugue DataFrames. It can be dict-like where each dataframe has a name, or list-like. It is also an extensively used data structure in the framework
from fugue import DataFrames, ArrayDataFrame, ArrowDataFrame
df1 = ArrayDataFrame([[0]],"a:int")
df2 = ArrowDataFrame([[1]],"b:int")
dfs = DataFrames(df1, df2) # list-like
assert not dfs.has_key
assert df1 is dfs[0]
assert df2 is dfs[1]
# how to get values as an array in list-like DataFrames
print(list(dfs.values()))
dfs = DataFrames(x=df1, y=df2) # dict-like
assert dfs.has_key
assert df1 is dfs["x"]
assert df2 is dfs["y"]
assert isinstance(dfs, dict) # dfs itself is dict, so you know how to iterate
dfs = DataFrames(dict(x=df1,y=df2)) # another equal way to init dict-like