Natural Language Processing#

Slack Status Homepage

This example shows how Fugue can be used to parallelize the pre-processing of text records because Natural Language Processing modelling tasks. The data comes from Kaggle’s Toxic Comment Classification Challenge. A mirror has been created on S3 to download and run easily.

The original preprocessing code was taken from this notebook.

Loading the Data#

We download the data from S3 using boto3. It is publicly available as a zip so we’ll need to extract it. No need to digest this code. The point is just to create an end-to-end reproducible example.

import boto3
from botocore import UNSIGNED
from botocore.client import Config
import zipfile
s3 = boto3.client('s3', region_name='us-east-2', config=Config(signature_version=UNSIGNED))
s3.download_file('fugue-demo', 'jigsaw-toxic-comment-classification-challenge.zip', '/tmp/toxic-comments.zip')

with zipfile.ZipFile("/tmp/toxic-comments.zip","r") as zip_ref:
    zip_ref.extractall("/tmp/toxic-comments-unzipped")
    
with zipfile.ZipFile("/tmp/toxic-comments-unzipped/train.csv.zip","r") as zip_ref:
    zip_ref.extractall("/tmp/toxic-comments-unzipped")

Initial Look at Data#

The important column below is the comment_text column. This contains the text that we are determining is toxic or not. The columns to the right are labels for prediction later.

import pandas as pd
df = pd.read_csv("/tmp/toxic-comments-unzipped/train.csv")
df.head()
id comment_text toxic severe_toxic obscene threat insult identity_hate
0 0000997932d777bf Explanation\nWhy the edits made under my usern... 0 0 0 0 0 0
1 000103f0d9cfb60f D'aww! He matches this background colour I'm s... 0 0 0 0 0 0
2 000113f07ec002fd Hey man, I'm really not trying to edit war. It... 0 0 0 0 0 0
3 0001b41b1c6bb37e "\nMore\nI can't make any real suggestions on ... 0 0 0 0 0 0
4 0001d958c54c6e35 You, sir, are my hero. Any chance you remember... 0 0 0 0 0 0

Preprocessing Text#

The text column contains a lot of characters and words that we don’t need for language models. For example, there are website links, HTML tags, and non-ASCII characters. We want to remove these. for each record. This function is defined below. This operates on a string, and is a common setup when dealing with text data. Again, the goal here to how to parallelize this function rather than the logic in it, so no need to fully understand what this function is doing.

import re
def preprocess_text(text, lower_case=True, clean_text=True):
    
    if lower_case:
        text = text.lower()
    
    # Remove website links
    template = re.compile(r'https?://\S+|www\.\S+') 
    text = template.sub(r'', text)
    
    # Remove HTML tags
    template = re.compile(r'<[^>]*>') 
    text = template.sub(r'', text)
    
    # Remove none ascii characters
    template = re.compile(r'[^\x00-\x7E]+') 
    text = template.sub(r'', text)
    
    # Replace none printable characters
    template = re.compile(r'[\x00-\x0F]+') 
    text = template.sub(r' ', text)
    
    if clean_text:
        # Remove special characters
        text = re.sub("'s", '', text)
        template = re.compile('["#$%&\'()\*\+-/:;<=>@\[\]\\\\^_`{|}~]') 
        text = template.sub(r' ', text)
        # Replace multiple punctuation 
        text = re.sub('[.!?]{2,}', '.', text)
        text = re.sub(',+', ',', text) 
        # Remove numbers
        text = re.sub('\d+', ' ', text) 
        
    # Remove extra spaces
    text = re.sub('\s+', ' ', text)
    
    # Remove spaces at the beginning and at the end of string
    text = text.strip() 

    return text

Wrapping Logic#

Now that we have the logic defined for one record, all we have to do is wrap it in a function. This function is defined in native Python, but we can apply it on Pandas, Spark, Dask, and Ray DataFrames with Fugue. The type annotations are used to apply a conversion to the partition data when this function is applied. For more information, see this tutorial

from typing import List, Dict, Any, Iterable

def helper(df: List[Dict[str,Any]]) -> List[Dict[str,Any]]:
    for row in df:
        row["text"] = preprocess_text(row["comment_text"])
    return df

Fugue Transform#

Now we can run it on the original DataFrame using transform(). For more information on this function, check Fugue in 10 minutes.

from fugue import transform
transform(df, helper, schema="*,text:str").head(5)[["comment_text", "text"]]
comment_text text
0 Explanation\nWhy the edits made under my usern... explanation why the edits made under my userna...
1 D'aww! He matches this background colour I'm s... d aww! he matches this background colour i m s...
2 Hey man, I'm really not trying to edit war. It... hey man i m really not trying to edit war it j...
3 "\nMore\nI can't make any real suggestions on ... more i can t make any real suggestions on impr...
4 You, sir, are my hero. Any chance you remember... you sir are my hero any chance you remember wh...

Running with a Distributed Backend#

To run the preprocessing on either Spark, Dask, or Ray, all we need to do is supply it an an engine to the transform() call. In this case, we spin up local Dask to parallelize across all four cores of the machine.

ddf = transform(df, helper, schema="*,text:str", engine="dask")
ddf = ddf.compute()[["comment_text", "text"]]
ddf.head(5)
comment_text text
0 Explanation\nWhy the edits made under my usern... explanation why the edits made under my userna...
1 D'aww! He matches this background colour I'm s... d aww! he matches this background colour i m s...
2 Hey man, I'm really not trying to edit war. It... hey man i m really not trying to edit war it j...
3 "\nMore\nI can't make any real suggestions on ... more i can t make any real suggestions on impr...
4 You, sir, are my hero. Any chance you remember... you sir are my hero any chance you remember wh...

Distributed NLP Inference#

In the next example, let’s take an existing pipeline that does both the tokenizing and inference together. HuggingFace has sentiment analysis pipelines that we can use easily. We can see a simple usage below.

from transformers import pipeline
sentiment_pipeline = pipeline("sentiment-analysis")
data = ["I love you", "I hate you"]
sentiment_pipeline(data)
No model was supplied, defaulted to distilbert-base-uncased-finetuned-sst-2-english and revision af0f99b (https://huggingface.co/distilbert-base-uncased-finetuned-sst-2-english).
Using a pipeline without specifying a model name and revision in production is not recommended.
[{'label': 'POSITIVE', 'score': 0.9998656511306763},
 {'label': 'NEGATIVE', 'score': 0.9991129040718079}]

Again, we just need to create a helper function. One of the strengths of Fugue is that we just need to specify the output type, and Fugue will handle the conversion. The sentiment_pipeline returns a List[Dict[str,Any]] so we specify that as the output annotation.

from typing import List, Dict, Any, Iterable

def pred_helper(df: pd.DataFrame) -> List[Dict[str,Any]]:
    return sentiment_pipeline(df['text'].values.tolist())

Now, we can parallelize this across Dask using the preprocessed DataFrame from earlier, which will contain the "text" column. Here, we just run this on a subset of the records because the sentiment_pipeline we used has a maximum word count that a lot of the records go over. In practice, you would need to truncate those records, but here we just demo the parallelization using Dask.

transform(ddf[90:110], pred_helper, schema="label:str,score:float", engine="dask").compute().head()
label score
0 NEGATIVE 0.998304
1 NEGATIVE 0.999272
0 NEGATIVE 0.988193
1 NEGATIVE 0.997527
0 NEGATIVE 0.998266