Fugue with Dask-sql
Fugue with Dask-sql#
Pandas and Spark already have solutions that allow users to execute SQL code to describe computation workflows. Dask, on the other hand, does not have a standard SQL interface yet.
FugueSQL provides this feature with the DaskExecutionEngine, but users should also be aware that dask-sql is a relatively new project and has a majority of SQL keywords implemented already. Additionally, it is also faster than FugueSql on average. However, there are still some features under development. Most notably, the SQL
WINDOW is not yet implemented.
We are collaborating to have our solutions converge to create the de facto SQL interface for Dask. In the meantime, we have unified our solutions by allowing
FugueSQL to use dask-sql as an execution engine. The dask_sql project has added a
DaskSQLExecutionEngine into their code to let us import it and pass it into
FugueSQLWorkflow. Note this is a different engine from
FugueSQLWorkflow usage is nearly identical to the
fsql function we saw previously. The main difference is that it takes in a SQL engine as seen in the example below.
This example below shows that when the SQL query cannot be executed in
dask-sql, it will use the
FugueSQL. We are able to use the
dask-sql. We can also use the
TRANSFORM and PREPARTITION even if these are
dask-sql together can provide a more powerful solution. This allows us to use both solutions to get the best of both worlds in terms of speed and operation completeness. All we need to do is pass the
NOTE: In order for the code below to run,
dask-sql needs to be installed.
from dask_sql.integrations.fugue import DaskSQLExecutionEngine from fugue_sql import FugueSQLWorkflow import pandas as pd data = [ ["A", "2020-01-01", 10], ["A", "2020-01-02", 20], ["A", "2020-01-03", 30], ["B", "2020-01-01", 20], ["B", "2020-01-02", 30], ["B", "2020-01-03", 40] ] schema = "id:str,date:date,value:int" # schema: *, cumsum:int def cumsum(df: pd.DataFrame) -> pd.DataFrame: df["cumsum"] = df['value'].cumsum() return df # Run the DAG on the DaskSQLExecutionEngine by dask-sql with FugueSQLWorkflow(DaskSQLExecutionEngine) as dag: df = dag.df(data, schema) dag(""" SELECT * FROM df TRANSFORM PREPARTITION BY id PRESORT date ASC USING cumsum TAKE 5 ROWS PRINT """)
When a SQL keywords don’t exist in
dask-sql, it will default to the
Fugue DaskExecutionEngine. However, when the keyword is registered by
dask-sql it will use their implementation.
OVER PARITION is registered but still being developed, which will cause errors. One workaround is to use
Fugue's TRANSFORM and PREPARTITION like above to avoid using
OVER PARTITION for now.