{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Natural Language Processing\n",
"\n",
"[![Slack Status](https://img.shields.io/badge/slack-join_chat-white.svg?logo=slack&style=social)](http://slack.fugue.ai)\n",
"[![Homepage](https://img.shields.io/badge/fugue-source--code-red?logo=github)](https://github.com/fugue-project/fugue)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"This example shows how Fugue can be used to parallelize the pre-processing of text records because Natural Language Processing modelling tasks. The data comes from Kaggle's [Toxic Comment Classification Challenge](https://www.kaggle.com/competitions/jigsaw-toxic-comment-classification-challenge). A mirror has been created on S3 to download and run easily. \n",
"\n",
"The original preprocessing code was taken from [this notebook](https://www.kaggle.com/code/zsofislosar/toxicity-data-prep)."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Loading the Data\n",
"\n",
"We download the data from S3 using `boto3`. It is publicly available as a zip so we'll need to extract it. No need to digest this code. The point is just to create an end-to-end reproducible example."
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {},
"outputs": [],
"source": [
"import boto3\n",
"from botocore import UNSIGNED\n",
"from botocore.client import Config\n",
"import zipfile"
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {},
"outputs": [],
"source": [
"s3 = boto3.client('s3', region_name='us-east-2', config=Config(signature_version=UNSIGNED))\n",
"s3.download_file('fugue-demo', 'jigsaw-toxic-comment-classification-challenge.zip', '/tmp/toxic-comments.zip')\n",
"\n",
"with zipfile.ZipFile(\"/tmp/toxic-comments.zip\",\"r\") as zip_ref:\n",
" zip_ref.extractall(\"/tmp/toxic-comments-unzipped\")\n",
" \n",
"with zipfile.ZipFile(\"/tmp/toxic-comments-unzipped/train.csv.zip\",\"r\") as zip_ref:\n",
" zip_ref.extractall(\"/tmp/toxic-comments-unzipped\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Initial Look at Data\n",
"\n",
"The important column below is the `comment_text` column. This contains the text that we are determining is toxic or not. The columns to the right are labels for prediction later."
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
"
\n",
"\n",
"
\n",
" \n",
" \n",
" | \n",
" id | \n",
" comment_text | \n",
" toxic | \n",
" severe_toxic | \n",
" obscene | \n",
" threat | \n",
" insult | \n",
" identity_hate | \n",
"
\n",
" \n",
" \n",
" \n",
" 0 | \n",
" 0000997932d777bf | \n",
" Explanation\\nWhy the edits made under my usern... | \n",
" 0 | \n",
" 0 | \n",
" 0 | \n",
" 0 | \n",
" 0 | \n",
" 0 | \n",
"
\n",
" \n",
" 1 | \n",
" 000103f0d9cfb60f | \n",
" D'aww! He matches this background colour I'm s... | \n",
" 0 | \n",
" 0 | \n",
" 0 | \n",
" 0 | \n",
" 0 | \n",
" 0 | \n",
"
\n",
" \n",
" 2 | \n",
" 000113f07ec002fd | \n",
" Hey man, I'm really not trying to edit war. It... | \n",
" 0 | \n",
" 0 | \n",
" 0 | \n",
" 0 | \n",
" 0 | \n",
" 0 | \n",
"
\n",
" \n",
" 3 | \n",
" 0001b41b1c6bb37e | \n",
" \"\\nMore\\nI can't make any real suggestions on ... | \n",
" 0 | \n",
" 0 | \n",
" 0 | \n",
" 0 | \n",
" 0 | \n",
" 0 | \n",
"
\n",
" \n",
" 4 | \n",
" 0001d958c54c6e35 | \n",
" You, sir, are my hero. Any chance you remember... | \n",
" 0 | \n",
" 0 | \n",
" 0 | \n",
" 0 | \n",
" 0 | \n",
" 0 | \n",
"
\n",
" \n",
"
\n",
"
"
],
"text/plain": [
" id comment_text toxic \\\n",
"0 0000997932d777bf Explanation\\nWhy the edits made under my usern... 0 \n",
"1 000103f0d9cfb60f D'aww! He matches this background colour I'm s... 0 \n",
"2 000113f07ec002fd Hey man, I'm really not trying to edit war. It... 0 \n",
"3 0001b41b1c6bb37e \"\\nMore\\nI can't make any real suggestions on ... 0 \n",
"4 0001d958c54c6e35 You, sir, are my hero. Any chance you remember... 0 \n",
"\n",
" severe_toxic obscene threat insult identity_hate \n",
"0 0 0 0 0 0 \n",
"1 0 0 0 0 0 \n",
"2 0 0 0 0 0 \n",
"3 0 0 0 0 0 \n",
"4 0 0 0 0 0 "
]
},
"execution_count": 5,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"import pandas as pd\n",
"df = pd.read_csv(\"/tmp/toxic-comments-unzipped/train.csv\")\n",
"df.head()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Preprocessing Text\n",
"\n",
"The text column contains a lot of characters and words that we don't need for language models. For example, there are website links, HTML tags, and non-ASCII characters. We want to remove these. for each record. This function is defined below. This operates on a string, and is a common setup when dealing with text data. Again, the goal here to how to parallelize this function rather than the logic in it, so no need to fully understand what this function is doing."
]
},
{
"cell_type": "code",
"execution_count": 6,
"metadata": {},
"outputs": [],
"source": [
"import re\n",
"def preprocess_text(text, lower_case=True, clean_text=True):\n",
" \n",
" if lower_case:\n",
" text = text.lower()\n",
" \n",
" # Remove website links\n",
" template = re.compile(r'https?://\\S+|www\\.\\S+') \n",
" text = template.sub(r'', text)\n",
" \n",
" # Remove HTML tags\n",
" template = re.compile(r'<[^>]*>') \n",
" text = template.sub(r'', text)\n",
" \n",
" # Remove none ascii characters\n",
" template = re.compile(r'[^\\x00-\\x7E]+') \n",
" text = template.sub(r'', text)\n",
" \n",
" # Replace none printable characters\n",
" template = re.compile(r'[\\x00-\\x0F]+') \n",
" text = template.sub(r' ', text)\n",
" \n",
" if clean_text:\n",
" # Remove special characters\n",
" text = re.sub(\"'s\", '', text)\n",
" template = re.compile('[\"#$%&\\'()\\*\\+-/:;<=>@\\[\\]\\\\\\\\^_`{|}~]') \n",
" text = template.sub(r' ', text)\n",
" # Replace multiple punctuation \n",
" text = re.sub('[.!?]{2,}', '.', text)\n",
" text = re.sub(',+', ',', text) \n",
" # Remove numbers\n",
" text = re.sub('\\d+', ' ', text) \n",
" \n",
" # Remove extra spaces\n",
" text = re.sub('\\s+', ' ', text)\n",
" \n",
" # Remove spaces at the beginning and at the end of string\n",
" text = text.strip() \n",
"\n",
" return text"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Wrapping Logic\n",
"\n",
"Now that we have the logic defined for one record, all we have to do is wrap it in a function. This function is defined in native Python, but we can apply it on Pandas, Spark, Dask, and Ray DataFrames with Fugue. The type annotations are used to apply a conversion to the partition data when this function is applied. For more information, see [this tutorial](https://fugue-tutorials.readthedocs.io/tutorials/quick_look/ten_minutes.html#type-hint-conversion)"
]
},
{
"cell_type": "code",
"execution_count": 7,
"metadata": {},
"outputs": [],
"source": [
"from typing import List, Dict, Any, Iterable\n",
"\n",
"def helper(df: List[Dict[str,Any]]) -> List[Dict[str,Any]]:\n",
" for row in df:\n",
" row[\"text\"] = preprocess_text(row[\"comment_text\"])\n",
" return df\n",
"\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Fugue Transform\n",
"\n",
"Now we can run it on the original DataFrame using `transform()`. For more information on this function, check [Fugue in 10 minutes](https://fugue-tutorials.readthedocs.io/tutorials/quick_look/ten_minutes.html#bringing-a-function-to-spark-or-dask)."
]
},
{
"cell_type": "code",
"execution_count": 10,
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
"\n",
"\n",
"
\n",
" \n",
" \n",
" | \n",
" comment_text | \n",
" text | \n",
"
\n",
" \n",
" \n",
" \n",
" 0 | \n",
" Explanation\\nWhy the edits made under my usern... | \n",
" explanation why the edits made under my userna... | \n",
"
\n",
" \n",
" 1 | \n",
" D'aww! He matches this background colour I'm s... | \n",
" d aww! he matches this background colour i m s... | \n",
"
\n",
" \n",
" 2 | \n",
" Hey man, I'm really not trying to edit war. It... | \n",
" hey man i m really not trying to edit war it j... | \n",
"
\n",
" \n",
" 3 | \n",
" \"\\nMore\\nI can't make any real suggestions on ... | \n",
" more i can t make any real suggestions on impr... | \n",
"
\n",
" \n",
" 4 | \n",
" You, sir, are my hero. Any chance you remember... | \n",
" you sir are my hero any chance you remember wh... | \n",
"
\n",
" \n",
"
\n",
"
"
],
"text/plain": [
" comment_text \\\n",
"0 Explanation\\nWhy the edits made under my usern... \n",
"1 D'aww! He matches this background colour I'm s... \n",
"2 Hey man, I'm really not trying to edit war. It... \n",
"3 \"\\nMore\\nI can't make any real suggestions on ... \n",
"4 You, sir, are my hero. Any chance you remember... \n",
"\n",
" text \n",
"0 explanation why the edits made under my userna... \n",
"1 d aww! he matches this background colour i m s... \n",
"2 hey man i m really not trying to edit war it j... \n",
"3 more i can t make any real suggestions on impr... \n",
"4 you sir are my hero any chance you remember wh... "
]
},
"execution_count": 10,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"from fugue import transform\n",
"transform(df, helper, schema=\"*,text:str\").head(5)[[\"comment_text\", \"text\"]]"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Running with a Distributed Backend\n",
"\n",
"To run the preprocessing on either Spark, Dask, or Ray, all we need to do is supply it an an engine to the `transform()` call. In this case, we spin up local Dask to parallelize across all four cores of the machine."
]
},
{
"cell_type": "code",
"execution_count": 19,
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
"\n",
"\n",
"
\n",
" \n",
" \n",
" | \n",
" comment_text | \n",
" text | \n",
"
\n",
" \n",
" \n",
" \n",
" 0 | \n",
" Explanation\\nWhy the edits made under my usern... | \n",
" explanation why the edits made under my userna... | \n",
"
\n",
" \n",
" 1 | \n",
" D'aww! He matches this background colour I'm s... | \n",
" d aww! he matches this background colour i m s... | \n",
"
\n",
" \n",
" 2 | \n",
" Hey man, I'm really not trying to edit war. It... | \n",
" hey man i m really not trying to edit war it j... | \n",
"
\n",
" \n",
" 3 | \n",
" \"\\nMore\\nI can't make any real suggestions on ... | \n",
" more i can t make any real suggestions on impr... | \n",
"
\n",
" \n",
" 4 | \n",
" You, sir, are my hero. Any chance you remember... | \n",
" you sir are my hero any chance you remember wh... | \n",
"
\n",
" \n",
"
\n",
"
"
],
"text/plain": [
" comment_text \\\n",
"0 Explanation\\nWhy the edits made under my usern... \n",
"1 D'aww! He matches this background colour I'm s... \n",
"2 Hey man, I'm really not trying to edit war. It... \n",
"3 \"\\nMore\\nI can't make any real suggestions on ... \n",
"4 You, sir, are my hero. Any chance you remember... \n",
"\n",
" text \n",
"0 explanation why the edits made under my userna... \n",
"1 d aww! he matches this background colour i m s... \n",
"2 hey man i m really not trying to edit war it j... \n",
"3 more i can t make any real suggestions on impr... \n",
"4 you sir are my hero any chance you remember wh... "
]
},
"execution_count": 19,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"ddf = transform(df, helper, schema=\"*,text:str\", engine=\"dask\")\n",
"ddf = ddf.compute()[[\"comment_text\", \"text\"]]\n",
"ddf.head(5)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Distributed NLP Inference\n",
"\n",
"In the next example, let's take an existing pipeline that does both the tokenizing and inference together. [HuggingFace](https://huggingface.co/) has sentiment analysis pipelines that we can use easily. We can see a simple usage below. "
]
},
{
"cell_type": "code",
"execution_count": 12,
"metadata": {},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
"No model was supplied, defaulted to distilbert-base-uncased-finetuned-sst-2-english and revision af0f99b (https://huggingface.co/distilbert-base-uncased-finetuned-sst-2-english).\n",
"Using a pipeline without specifying a model name and revision in production is not recommended.\n"
]
},
{
"data": {
"text/plain": [
"[{'label': 'POSITIVE', 'score': 0.9998656511306763},\n",
" {'label': 'NEGATIVE', 'score': 0.9991129040718079}]"
]
},
"execution_count": 12,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"from transformers import pipeline\n",
"sentiment_pipeline = pipeline(\"sentiment-analysis\")\n",
"data = [\"I love you\", \"I hate you\"]\n",
"sentiment_pipeline(data)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Again, we just need to create a helper function. One of the strengths of Fugue is that we just need to specify the output type, and Fugue will handle the conversion. The `sentiment_pipeline` returns a `List[Dict[str,Any]]` so we specify that as the output annotation."
]
},
{
"cell_type": "code",
"execution_count": 13,
"metadata": {},
"outputs": [],
"source": [
"from typing import List, Dict, Any, Iterable\n",
"\n",
"def pred_helper(df: pd.DataFrame) -> List[Dict[str,Any]]:\n",
" return sentiment_pipeline(df['text'].values.tolist())"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Now, we can parallelize this across Dask using the preprocessed DataFrame from earlier, which will contain the `\"text\"` column. Here, we just run this on a subset of the records because the `sentiment_pipeline` we used has a maximum word count that a lot of the records go over. In practice, you would need to truncate those records, but here we just demo the parallelization using Dask."
]
},
{
"cell_type": "code",
"execution_count": 20,
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
"\n",
"\n",
"
\n",
" \n",
" \n",
" | \n",
" label | \n",
" score | \n",
"
\n",
" \n",
" \n",
" \n",
" 0 | \n",
" NEGATIVE | \n",
" 0.998304 | \n",
"
\n",
" \n",
" 1 | \n",
" NEGATIVE | \n",
" 0.999272 | \n",
"
\n",
" \n",
" 0 | \n",
" NEGATIVE | \n",
" 0.988193 | \n",
"
\n",
" \n",
" 1 | \n",
" NEGATIVE | \n",
" 0.997527 | \n",
"
\n",
" \n",
" 0 | \n",
" NEGATIVE | \n",
" 0.998266 | \n",
"
\n",
" \n",
"
\n",
"
"
],
"text/plain": [
" label score\n",
"0 NEGATIVE 0.998304\n",
"1 NEGATIVE 0.999272\n",
"0 NEGATIVE 0.988193\n",
"1 NEGATIVE 0.997527\n",
"0 NEGATIVE 0.998266"
]
},
"execution_count": 20,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"transform(ddf[90:110], pred_helper, schema=\"label:str,score:float\", engine=\"dask\").compute().head()"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3.8.13 ('fugue')",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.8.13"
},
"orig_nbformat": 4,
"vscode": {
"interpreter": {
"hash": "9fcd6e71927f6b3e5f4fa4280b4e8e6a66aa8d4365bb61cf7ef4017620fc09b9"
}
}
},
"nbformat": 4,
"nbformat_minor": 2
}