{
"cells": [
{
"cell_type": "markdown",
"id": "fdfcf286",
"metadata": {},
"source": [
"# Nixtla\n",
"\n",
"Have questions? Chat with us on Github or Slack:\n",
"\n",
"[](https://github.com/fugue-project/fugue)\n",
"[](http://slack.fugue.ai)\n",
"\n",
"[Fugue](https://github.com/fugue-project/fugue) is a low-code unified interface for different computing frameworks such as Spark, Dask and Pandas. Nixtla is an open-source project focused on state-of-the-art time series forecasting. They have a couple of libraries such as [StatsForecast](https://github.com/Nixtla/statsforecast) for statistical models, [NeuralForecast](https://github.com/Nixtla/neuralforecast) for deep learning, and [HierarchicalForecast](https://github.com/Nixtla/hierarchichalforecast) for forecast aggregations across different levels of hierarchies. These are production-ready time series libraries focused on different modeling techniques.\n",
"\n",
"## Setup\n",
"\n",
"When dealing with large time series data, users normally have to deal with thousands of logically independent time series (think of telemetry of different users or different product sales). In this case, we can train one big model over all of the series, or we can create one model for each series. Both are valid approaches since the bigger model will pick up trends across the population, while training thousands of models may fit individual series data better.\n",
"\n",
"*Note: to pick up both the micro and macro trends of the time series population in one model, check the Nixtla HierarchicalForecast library, but this is also more computationally expensive and trickier to scale.*\n",
"\n",
"This article will deal with the scenario where we train a couple of models (AutoARIMA or ETS) per univariate time series. For this setup, we group the full data by time series, and then train each model for each group. The image below illustrates this. The input DataFrame can either be a Pandas, Spark or Dask DataFrame.\n",
"\n",
"\n",
"\n",
"## StatsForecast First Look\n",
"\n",
"Let's start with the most standard example, this is how to use the StatsForecast package."
]
},
{
"cell_type": "code",
"execution_count": 3,
"id": "398b0e09",
"metadata": {
"scrolled": true
},
"outputs": [
{
"data": {
"text/html": [
"
\n",
"\n",
"
\n",
" \n",
" \n",
" | \n",
" ds | \n",
" AutoARIMA | \n",
"
\n",
" \n",
" unique_id | \n",
" | \n",
" | \n",
"
\n",
" \n",
" \n",
" \n",
" 0 | \n",
" 2000-03-28 | \n",
" 1.626143 | \n",
"
\n",
" \n",
" 0 | \n",
" 2000-03-29 | \n",
" 1.287569 | \n",
"
\n",
" \n",
" 0 | \n",
" 2000-03-30 | \n",
" 1.019489 | \n",
"
\n",
" \n",
" 0 | \n",
" 2000-03-31 | \n",
" 0.807224 | \n",
"
\n",
" \n",
" 0 | \n",
" 2000-04-01 | \n",
" 0.639155 | \n",
"
\n",
" \n",
"
\n",
"
"
],
"text/plain": [
" ds AutoARIMA\n",
"unique_id \n",
"0 2000-03-28 1.626143\n",
"0 2000-03-29 1.287569\n",
"0 2000-03-30 1.019489\n",
"0 2000-03-31 0.807224\n",
"0 2000-04-01 0.639155"
]
},
"execution_count": 3,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"import pandas as pd\n",
"from statsforecast.utils import generate_series\n",
"from statsforecast.models import AutoARIMA\n",
"from statsforecast.core import StatsForecast\n",
"\n",
"series = generate_series(n_series=100, seed=1)\n",
"\n",
"model = StatsForecast(df=series,\n",
" models=[AutoARIMA()], \n",
" freq='D', \n",
" n_jobs=-1)\n",
"\n",
"forecasts = model.forecast(7)\n",
"forecasts.head()"
]
},
{
"cell_type": "markdown",
"id": "f9ddfaaf",
"metadata": {},
"source": [
"## Bringing it to Fugue\n",
"\n",
"We can bring StatsForecast to Fugue using the general `transform()` function which takes in a general function and distributed it on top of the [execution engine](../../beginner/execution_engine.ipynb). Below is an example of how to wrap the code presented above into a function. The `set_index()` call is needed because Nixtla assumes the presence of an index. However, Spark DataFrames don't have an index, so we need to set it inside the function. Similarly, we need to call `reset_index()` at the end of the function.\n",
"\n",
"The other important thing is that we need to set `n_jobs=1` because Nixtla can try to parallelize on a single machine. Using two-stage parallelism (on the Spark level and Nixtla level for example) can often lead to resource contention and bottlenecks in processing. Fugue parallelizes across the partitions defined in the `transform()` call. \n",
"\n",
"The code below will run on the same data above."
]
},
{
"cell_type": "code",
"execution_count": 5,
"id": "e8b4a0ba",
"metadata": {},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
"[Stage 5:> (0 + 1) / 1]\r"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"+---------+----------+----------+\n",
"|unique_id| ds| AutoARIMA|\n",
"+---------+----------+----------+\n",
"| 0|2000-03-28| 1.6261432|\n",
"| 0|2000-03-29| 1.2875694|\n",
"| 0|2000-03-30| 1.0194888|\n",
"| 0|2000-03-31| 0.8072244|\n",
"| 0|2000-04-01|0.63915485|\n",
"+---------+----------+----------+\n",
"only showing top 5 rows\n",
"\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
" \r"
]
}
],
"source": [
"from fugue import transform\n",
"\n",
"def forecast_series(df: pd.DataFrame, models) -> pd.DataFrame:\n",
" tdf = df.set_index(\"unique_id\")\n",
" model = StatsForecast(df=tdf, models=models, freq='D', n_jobs=1)\n",
" return model.forecast(7).reset_index()\n",
"\n",
"transform(series.reset_index(),\n",
" forecast_series,\n",
" params=dict(models=[AutoARIMA()]),\n",
" schema=\"unique_id:int, ds:date, AutoARIMA:float\",\n",
" partition={\"by\": \"unique_id\"},\n",
" engine=\"spark\"\n",
" ).show(5)"
]
},
{
"cell_type": "markdown",
"id": "24d33f67",
"metadata": {},
"source": [
"Running with the Fugue `transform()` with Spark may be slower for small data. This is because there will be an overhead to distribute and spinning up Spark."
]
},
{
"cell_type": "markdown",
"id": "2a4c5df4",
"metadata": {},
"source": [
"## Forecast function\n",
"\n",
"To simplify the user experience for using StatsForecast on top of Spark, Dask, and Ray, a `FugueBackend` and `forecast()` function were added to the statsforecast library. Users can pass in a DataFrame or file path as the first argument. The advantage of using the file path is that Fugue can use the backend to load the file as well. For example, if the backend is using Spark, we can load the file distributedly using Spark.\n",
"\n",
"Again, `FugueBackend` can take in any [execution engine](../../beginner/execution_engine.ipynb). This is just an example where we pass in the `SparkSession` directly but we can also pass in a Ray or Dask client. We can also pass the string as seen above."
]
},
{
"cell_type": "code",
"execution_count": 6,
"id": "8f8f01af",
"metadata": {},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
" \r"
]
},
{
"data": {
"text/html": [
"\n",
"\n",
"
\n",
" \n",
" \n",
" | \n",
" ds | \n",
" unique_id | \n",
" AutoARIMA | \n",
"
\n",
" \n",
" \n",
" \n",
" 0 | \n",
" 2000-03-27 18:00:00 | \n",
" 0 | \n",
" 1.626143 | \n",
"
\n",
" \n",
" 1 | \n",
" 2000-03-28 18:00:00 | \n",
" 0 | \n",
" 1.287569 | \n",
"
\n",
" \n",
" 2 | \n",
" 2000-03-29 18:00:00 | \n",
" 0 | \n",
" 1.019489 | \n",
"
\n",
" \n",
" 3 | \n",
" 2000-03-30 18:00:00 | \n",
" 0 | \n",
" 0.807224 | \n",
"
\n",
" \n",
" 4 | \n",
" 2000-03-31 18:00:00 | \n",
" 0 | \n",
" 0.639155 | \n",
"
\n",
" \n",
"
\n",
"
"
],
"text/plain": [
" ds unique_id AutoARIMA\n",
"0 2000-03-27 18:00:00 0 1.626143\n",
"1 2000-03-28 18:00:00 0 1.287569\n",
"2 2000-03-29 18:00:00 0 1.019489\n",
"3 2000-03-30 18:00:00 0 0.807224\n",
"4 2000-03-31 18:00:00 0 0.639155"
]
},
"execution_count": 6,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"from statsforecast.distributed.utils import forecast\n",
"from statsforecast.distributed.fugue import FugueBackend\n",
"from statsforecast.models import AutoARIMA\n",
"\n",
"series.to_parquet(\"/tmp/100.parquet\")\n",
"\n",
"from pyspark.sql import SparkSession\n",
"\n",
"spark = SparkSession.builder.getOrCreate()\n",
"backend = FugueBackend(spark, {\"fugue.spark.use_pandas_udf\":True})\n",
"\n",
"forecast(\"/tmp/100.parquet\", \n",
" [AutoARIMA()], \n",
" freq=\"D\", \n",
" h=7, \n",
" parallel=backend).toPandas().head()"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3.8.13 ('fugue')",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.8.13"
},
"vscode": {
"interpreter": {
"hash": "9fcd6e71927f6b3e5f4fa4280b4e8e6a66aa8d4365bb61cf7ef4017620fc09b9"
}
}
},
"nbformat": 4,
"nbformat_minor": 5
}