{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Execution Engine\n", "\n", "Have questions? Chat with us on Github or Slack:\n", "\n", "[![Homepage](https://img.shields.io/badge/fugue-source--code-red?logo=github)](https://github.com/fugue-project/fugue)\n", "[![Slack Status](https://img.shields.io/badge/slack-join_chat-white.svg?logo=slack&style=social)](http://slack.fugue.ai)\n", "\n", "We have already seen that the `transform()` function takes in an `engine`. There are three main ways to define the engine for Fugue. Remember that using an execution engine requires that library to be installed." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Setup\n", "\n", "We can test all of the engines using this dummy setup. Note that the schema is already defined with a schema hint." ] }, { "cell_type": "code", "execution_count": 1, "metadata": {}, "outputs": [], "source": [ "import pandas as pd\n", "from fugue import transform \n", "\n", "df = pd.DataFrame({\"col1\": [1,2,3,4], \"col2\": [1,2,3,4]})\n", "\n", "# schema: *, col3:int\n", "def add_cols(df:pd.DataFrame) -> pd.DataFrame:\n", " return df.assign(col3 = df['col1'] + df['col2'])" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Passing a String\n", "\n", "This is the easiest to use. Normally, this means that the engine will be spun up locally and use all cores of the machine.\n", "\n", "Fugue can take in the following strings: `\"spark\"`, `\"dask\"`, `\"ray\"`" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "**Spark**" ] }, { "cell_type": "code", "execution_count": 3, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+----+----+----+\n", "|col1|col2|col3|\n", "+----+----+----+\n", "| 1| 1| 2|\n", "| 2| 2| 4|\n", "| 3| 3| 6|\n", "| 4| 4| 8|\n", "+----+----+----+\n", "\n" ] } ], "source": [ "spark_df = transform(df, add_cols, engine=\"spark\")\n", "spark_df.show()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "**Dask**" ] }, { "cell_type": "code", "execution_count": 5, "metadata": {}, "outputs": [ { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
col1col2col3
0112
1224
0336
1448
\n", "
" ], "text/plain": [ " col1 col2 col3\n", "0 1 1 2\n", "1 2 2 4\n", "0 3 3 6\n", "1 4 4 8" ] }, "execution_count": 5, "metadata": {}, "output_type": "execute_result" } ], "source": [ "dask_df = transform(df, add_cols, engine=\"dask\")\n", "dask_df.compute().head()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "**Ray**" ] }, { "cell_type": "code", "execution_count": 6, "metadata": {}, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ "2023-01-02 17:51:25,398\tINFO worker.py:1509 -- Started a local Ray instance. View the dashboard at \u001b[1m\u001b[32mhttp://127.0.0.1:8265 \u001b[39m\u001b[22m\n", "Repartition: 25%|██▌ | 4/16 [00:00<00:01, 6.16it/s]\n", "Map_Batches: 100%|██████████| 16/16 [00:01<00:00, 10.87it/s]" ] }, { "name": "stdout", "output_type": "stream", "text": [ "{'col1': 1, 'col2': 1, 'col3': 2}\n", "{'col1': 2, 'col2': 2, 'col3': 4}\n", "{'col1': 3, 'col2': 3, 'col3': 6}\n", "{'col1': 4, 'col2': 4, 'col3': 8}\n" ] }, { "name": "stderr", "output_type": "stream", "text": [ "\n" ] } ], "source": [ "ray_df = transform(df, add_cols, engine=\"ray\")\n", "ray_df.show(5)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Passing the Client or Session\n", "\n", "Fugue will also know how to interpret the engine if a Client or Session is passed." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "**Spark**" ] }, { "cell_type": "code", "execution_count": 7, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+----+----+----+\n", "|col1|col2|col3|\n", "+----+----+----+\n", "| 1| 1| 2|\n", "| 2| 2| 4|\n", "| 3| 3| 6|\n", "| 4| 4| 8|\n", "+----+----+----+\n", "\n" ] } ], "source": [ "from pyspark.sql import SparkSession\n", "spark = SparkSession.builder.getOrCreate()\n", "\n", "spark_df = transform(df, add_cols, engine=spark)\n", "spark_df.show()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "**Dask**" ] }, { "cell_type": "code", "execution_count": 8, "metadata": {}, "outputs": [ { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
col1col2col3
0112
1224
2336
3448
\n", "
" ], "text/plain": [ " col1 col2 col3\n", "0 1 1 2\n", "1 2 2 4\n", "2 3 3 6\n", "3 4 4 8" ] }, "execution_count": 8, "metadata": {}, "output_type": "execute_result" } ], "source": [ "from distributed import Client\n", "dask_client = Client()\n", "\n", "dask_df = transform(df, add_cols, engine=dask_client)\n", "dask_df.compute().head()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "**Ray**\n", "\n", "This one is a bit different because Ray code doesn't invoke the Client." ] }, { "cell_type": "code", "execution_count": 9, "metadata": {}, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ "2023-01-02 17:52:30,226\tINFO worker.py:1351 -- Calling ray.init() again after it has already been called.\n", "Repartition: 25%|██▌ | 4/16 [00:00<00:00, 418.10it/s]\n", "Map_Batches: 100%|██████████| 16/16 [00:00<00:00, 166.52it/s]" ] }, { "name": "stdout", "output_type": "stream", "text": [ "{'col1': 1, 'col2': 1, 'col3': 2}\n", "{'col1': 2, 'col2': 2, 'col3': 4}\n", "{'col1': 3, 'col2': 3, 'col3': 6}\n", "{'col1': 4, 'col2': 4, 'col3': 8}\n" ] }, { "name": "stderr", "output_type": "stream", "text": [ "\n" ] } ], "source": [ "import ray\n", "ray.init(ignore_reinit_error=True)\n", "\n", "ray_df = transform(df, add_cols, engine=\"ray\")\n", "ray_df.show(5)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Passing a Cluster Address\n", "\n", "Fugue also has utilities to interact with clusters directly. For these, you need to be authenticated with the service like Databricks, Coiled or Anyscale.\n", "\n", "The more complete documentation can be found in the [cloudprovider section](../integrations/cloudproviders/index.md)\n", "\n", "Some examples:\n", "\n", "```python\n", "# Databricks\n", "transform(tdf, dummy, engine=\"db\", engine_conf=conf)\n", "\n", "# Coiled\n", "transform(df, add_cols, engine=\"coiled:my_cluster\")\n", "\n", "# Anyscale\n", "transform(df, add_cols, engine=\"anyscale://project/cluster-1\")\n", "```\n", "\n", "These will naturally require more configuration so the Fugue Cloudprovider documentation will be have more details." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Engine Conf\n", "\n", "As seen in the Databricks example above, we can also pass in a configuration to our execution engines. The most common one is:" ] }, { "cell_type": "code", "execution_count": 13, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+----+----+----+\n", "|col1|col2|col3|\n", "+----+----+----+\n", "| 1| 1| 2|\n", "| 2| 2| 4|\n", "+----+----+----+\n", "only showing top 2 rows\n", "\n" ] } ], "source": [ "spark_df = transform(df, \n", " add_cols, \n", " engine=spark, \n", " engine_conf={\"fugue.spark.use_pandas_udf\":True})\n", "spark_df.show(2)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "For a full list of configurations, check [this page](../advanced/useful_config.ipynb)" ] } ], "metadata": { "kernelspec": { "display_name": "Python 3.8.13 ('fugue')", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.8.13" }, "metadata": { "interpreter": { "hash": "f7f9294720e464cd08733c6cd5cfe1a4599977fa03668bc63f2dfd97f1a61807" } }, "vscode": { "interpreter": { "hash": "9fcd6e71927f6b3e5f4fa4280b4e8e6a66aa8d4365bb61cf7ef4017620fc09b9" } } }, "nbformat": 4, "nbformat_minor": 2 }