Stock Sentiment Analysis (Preprocessing)#

In this example, we are looking at this Kaggle dataset and we are borrowing ideas from this notebook.

Our goal is to show how Fugue can be used in the preprocessing step for this NLP problem. Compared with using pandas to do such analysis, Fugue is slightly more complicated, but the advantages are:

  • Every step, every function is intuitive and easy to understand

  • The Fugue version is platform and scale agnostic. It can run on any ExecutionEngine and can handle very large dataset that can’t fit in one machine.

Install Dependencies#

!pip install nltk
import nltk'wordnet')'stopwords')
!pip install spacy
!python -m spacy download en_core_web_sm

You must restart kernel after installation

Explore the data#

We load the data print and do some basic analytics

from fugue import FugueWorkflow as Dag

with Dag() as dag:
    df = dag.load("../../data/stock_sentiment_data.csv", header=True)"Sentiment, COUNT(*) AS ct FROM",df, "GROUP BY Sentiment").show()

Clean Up#

This is based on my personal preference, it’s just for demo purposes, I want to:

  • Make all column names lower cased

  • Add a unique and deterministic id to each row

  • Convert sentiment to bool because it has only two values

Here I am using a transformer to do it. And I can write the transformer in pure native python. Read this to learn more details.

from triad.utils.hash import to_uuid
from typing import Iterable, Dict, Any, List

# schema: id:str,sentiment:bool,text:str
def preprocess(df:Iterable[Dict[str,Any]]) -> Iterable[Dict[str,Any]]:
    for row in df:
        yield dict(id=to_uuid(row["Text"]).split("-")[-1],
                   sentiment = str(row["Sentiment"])=="1",
                   text = row["Text"])
with Dag() as dag:
    df = dag.load("../../data/stock_sentiment_data.csv", header=True)


I want to convert the raw text to tokens, so I am going to lower the text, remove punctuations, and stem. These will be done inside a transformer.

I feel Iterable as input and output are most intuitive and convenient, so I simply write in the way I like. I also write an additional transformer to convert all data to word sentiment pairs to get some statistics. This is often seen on Spark hello world examples.

from nltk.stem import WordNetLemmatizer
from nltk.corpus import stopwords
import string

# schema: id:str,sentiment:bool,words:[str]
def tokenize(df:Iterable[Dict[str,Any]]) -> Iterable[List[Any]]:
    translator = str.maketrans('', '', string.punctuation)
    for row in df:
        words = row["text"].lower().translate(translator).split(" ")
        words = [lem.lemmatize(w) for w in words if w not in stop and w!=""]
        yield [row["id"], row["sentiment"], words]
# schema: word:str, sentiment:bool
def to_single(df:Iterable[Dict[str,Any]]) -> Iterable[List[Any]]:
    for row in df:
        sentiment = row["sentiment"]
        for w in row["words"]:
            yield [w,sentiment]

with Dag() as dag:
    df = dag.load("../../data/stock_sentiment_data.csv", header=True)
    tk = df.transform(preprocess).transform(tokenize)
    words = tk.transform(to_single)"word, sentiment, COUNT(*) AS ct FROM",words,"GROUP BY word, sentiment ORDER BY ct DESC LIMIT 10").show()

Entity Detection#

Entity linking can generate very powerful features, so we want to do it for each sentence. And spacy is a popular package to use

Entity linking can be very expensive, so making it run distributedly is how we deal with large dataset. In Fugue, it helps you separate the concerns.

  • Transformer is to handle a partition of data on a single machine, so scalabity and throughput is not the concern of a transformer

  • How to run a created transformer is associated with scalability and throughput concerns. But again, Fugue is very abstract, you can just tell the system I want to apply the transformation and let the system to optimize the distribution. And actually you can have full control of the distribution, but here we don’t go into too much details, let’s just focus on WHAT intead of HOW

Another thing to point out is that Fugue avoids the semantic of map and only uses mapPartitions. entity_linking in the following code is a perfect example to demonstrate why. spacy.load is expensive, but it’s called only once and it can be used for all items in the partition. For the cases you don’t have expensive initialization, this approach is neither more complicated nor slower than map. So there is no good reason to directly use map.

import spacy
import json

# schema: id:str,sentiment:bool,entities:str
def entity_linking(df:Iterable[Dict[str,Any]]) -> Iterable[Dict[str,Any]]:
    nlp = spacy.load('en_core_web_sm')
    for row in df:
        doc = nlp(row["text"])
        row["entities"] = json.dumps({str(ent).lower():str(ent.label_) for ent in doc.ents})
        yield row

# schema: name:str,label:str,sentiment:bool
def to_single_entities(df:Iterable[Dict[str,Any]]) -> Iterable[List[Any]]:
    for row in df:
        for k,v in json.loads(row["entities"]).items():
            yield [k,v,row["sentiment"]]

with Dag() as dag:
    df = dag.load("../../data/stock_sentiment_data.csv", header=True)
    df ="* FROM",df," LIMIT 100")
    pre = df.transform(preprocess).transform(entity_linking)
    entities = pre.transform(to_single_entities)"name, label, sentiment, COUNT(*) AS ct FROM",entities,"GROUP BY name, label, sentiment ORDER BY ct DESC LIMIT 10").show()

Bring It to Spark!#

Now it seems both tokenization and entity detection are working well on small data locally. We are going to combine them together. To make it truly scalable, we will use Spark as the execution engine.

Pay attention to a few things:

  • Auto persist and parallel run are enabled. So even you write your logic in this simplest way, it will auto persist df and result because they are used multiple times. Also tokenization and entity linking will run in parallel (if there is enough executors)

  • This logic can run on any executon engine, you may create an end to end test on small data using NativeExectuonEngine

  • For the transformers it uses, they have no dependency on Fugue.

This step may be slow on binder, if possible, try it with larger data on a real Spark cluster

from fugue_spark import SparkExecutionEngine
from pyspark.sql import SparkSession

conf = {"fugue.workflow.concurrency":10,

session = SparkSession.builder.getOrCreate()

with Dag(SparkExecutionEngine(session, conf=conf)) as dag:
    df = dag.load("../../data/stock_sentiment_data.csv", escape='"', header=True).transform(preprocess)
    tokens = df.transform(tokenize)
    entities = df.transform(entity_linking)
    result = df.inner_join(tokens,entities)"/tmp/stock_sentiment.parquet")

Last, let’s show an alternative way to describe the end to end logic – using Fugue SQL. It also adds the steps to print some stats from tokens and entities. In this example, tokens and entities will also be auto persisted because they are also used for multiple times.

from fugue_sql import FugueSQLWorkflow

with FugueSQLWorkflow(SparkExecutionEngine(session, conf=conf)) as dag:
    LOAD "../../data/stock_sentiment_data.csv"(header=true,escape='"')
    df = TRANSFORM USING preprocess
    tokens = TRANSFORM df USING tokenize
    entities = TRANSFORM df USING entity_linking
    result =
        SELECT df.*, words, entities
        FROM df 
        INNER JOIN tokens ON =
        INNER JOIN entities ON =
    SAVE OVERWRITE "/tmp/stock_sentiment.parquet"

    SELECT word, sentiment, COUNT(*) AS ct 
        FROM (TRANSFORM tokens USING to_single)
        GROUP BY word, sentiment 
        ORDER BY ct DESC LIMIT 10
    PRINT TITLE "tokens"
    SELECT name, label, sentiment, COUNT(*) AS ct
        FROM (TRANSFORM entities USING to_single_entities)
        GROUP BY name, label, sentiment
        ORDER BY ct DESC LIMIT 10
    PRINT TITLE "entities"

Fugue is not only a framework, it’s also a way of thinking – you should keep your code as native as possible and it should be less coupled with any particular computing framework including Fugue itself.