Stock Sentiment Analysis (Preprocessing)
Contents
Stock Sentiment Analysis (Preprocessing)#
Have questions? Chat with us on Github or Slack:
In this example, we are looking at this Kaggle dataset and we are borrowing ideas from this notebook.
Our goal is to show how Fugue can be used in the preprocessing step for this NLP problem. Compared with using pandas to do such analysis, Fugue is slightly more complicated, but the advantages are:
Every step, every function is intuitive and easy to understand
The Fugue version is platform and scale agnostic. It can run on any ExecutionEngine and can handle very large dataset that can’t fit in one machine.
Install Dependencies#
!pip install nltk
import nltk
nltk.download('wordnet')
nltk.download('stopwords')
!pip install spacy
!python -m spacy download en_core_web_sm
Requirement already satisfied: nltk in /opt/anaconda3/envs/fugue/lib/python3.8/site-packages (3.7)
Requirement already satisfied: click in /opt/anaconda3/envs/fugue/lib/python3.8/site-packages (from nltk) (8.0.4)
Requirement already satisfied: regex>=2021.8.3 in /opt/anaconda3/envs/fugue/lib/python3.8/site-packages (from nltk) (2022.7.25)
Requirement already satisfied: tqdm in /opt/anaconda3/envs/fugue/lib/python3.8/site-packages (from nltk) (4.64.0)
Requirement already satisfied: joblib in /opt/anaconda3/envs/fugue/lib/python3.8/site-packages (from nltk) (1.1.0)
[nltk_data] Downloading package wordnet to
[nltk_data] /Users/kevinkho/nltk_data...
[nltk_data] Package wordnet is already up-to-date!
[nltk_data] Downloading package stopwords to
[nltk_data] /Users/kevinkho/nltk_data...
[nltk_data] Package stopwords is already up-to-date!
Requirement already satisfied: spacy in /opt/anaconda3/envs/fugue/lib/python3.8/site-packages (2.3.7)
Requirement already satisfied: catalogue<1.1.0,>=0.0.7 in /opt/anaconda3/envs/fugue/lib/python3.8/site-packages (from spacy) (1.0.0)
Requirement already satisfied: murmurhash<1.1.0,>=0.28.0 in /opt/anaconda3/envs/fugue/lib/python3.8/site-packages (from spacy) (1.0.8)
Requirement already satisfied: tqdm<5.0.0,>=4.38.0 in /opt/anaconda3/envs/fugue/lib/python3.8/site-packages (from spacy) (4.64.0)
Requirement already satisfied: numpy>=1.15.0 in /opt/anaconda3/envs/fugue/lib/python3.8/site-packages (from spacy) (1.23.5)
Requirement already satisfied: plac<1.2.0,>=0.9.6 in /opt/anaconda3/envs/fugue/lib/python3.8/site-packages (from spacy) (1.1.3)
Requirement already satisfied: wasabi<1.1.0,>=0.4.0 in /opt/anaconda3/envs/fugue/lib/python3.8/site-packages (from spacy) (0.10.1)
Requirement already satisfied: thinc<7.5.0,>=7.4.1 in /opt/anaconda3/envs/fugue/lib/python3.8/site-packages (from spacy) (7.4.5)
Requirement already satisfied: preshed<3.1.0,>=3.0.2 in /opt/anaconda3/envs/fugue/lib/python3.8/site-packages (from spacy) (3.0.7)
Requirement already satisfied: requests<3.0.0,>=2.13.0 in /opt/anaconda3/envs/fugue/lib/python3.8/site-packages (from spacy) (2.28.1)
Requirement already satisfied: srsly<1.1.0,>=1.0.2 in /opt/anaconda3/envs/fugue/lib/python3.8/site-packages (from spacy) (1.0.5)
Requirement already satisfied: cymem<2.1.0,>=2.0.2 in /opt/anaconda3/envs/fugue/lib/python3.8/site-packages (from spacy) (2.0.6)
Requirement already satisfied: setuptools in /opt/anaconda3/envs/fugue/lib/python3.8/site-packages (from spacy) (61.2.0)
Requirement already satisfied: blis<0.8.0,>=0.4.0 in /opt/anaconda3/envs/fugue/lib/python3.8/site-packages (from spacy) (0.7.8)
Requirement already satisfied: idna<4,>=2.5 in /opt/anaconda3/envs/fugue/lib/python3.8/site-packages (from requests<3.0.0,>=2.13.0->spacy) (3.3)
Requirement already satisfied: certifi>=2017.4.17 in /opt/anaconda3/envs/fugue/lib/python3.8/site-packages (from requests<3.0.0,>=2.13.0->spacy) (2022.6.15)
Requirement already satisfied: urllib3<1.27,>=1.21.1 in /opt/anaconda3/envs/fugue/lib/python3.8/site-packages (from requests<3.0.0,>=2.13.0->spacy) (1.26.11)
Requirement already satisfied: charset-normalizer<3,>=2 in /opt/anaconda3/envs/fugue/lib/python3.8/site-packages (from requests<3.0.0,>=2.13.0->spacy) (2.1.0)
Collecting en_core_web_sm==2.3.1
Using cached en_core_web_sm-2.3.1-py3-none-any.whl
Requirement already satisfied: spacy<2.4.0,>=2.3.0 in /opt/anaconda3/envs/fugue/lib/python3.8/site-packages (from en_core_web_sm==2.3.1) (2.3.7)
Requirement already satisfied: wasabi<1.1.0,>=0.4.0 in /opt/anaconda3/envs/fugue/lib/python3.8/site-packages (from spacy<2.4.0,>=2.3.0->en_core_web_sm==2.3.1) (0.10.1)
Requirement already satisfied: tqdm<5.0.0,>=4.38.0 in /opt/anaconda3/envs/fugue/lib/python3.8/site-packages (from spacy<2.4.0,>=2.3.0->en_core_web_sm==2.3.1) (4.64.0)
Requirement already satisfied: srsly<1.1.0,>=1.0.2 in /opt/anaconda3/envs/fugue/lib/python3.8/site-packages (from spacy<2.4.0,>=2.3.0->en_core_web_sm==2.3.1) (1.0.5)
Requirement already satisfied: preshed<3.1.0,>=3.0.2 in /opt/anaconda3/envs/fugue/lib/python3.8/site-packages (from spacy<2.4.0,>=2.3.0->en_core_web_sm==2.3.1) (3.0.7)
Requirement already satisfied: cymem<2.1.0,>=2.0.2 in /opt/anaconda3/envs/fugue/lib/python3.8/site-packages (from spacy<2.4.0,>=2.3.0->en_core_web_sm==2.3.1) (2.0.6)
Requirement already satisfied: murmurhash<1.1.0,>=0.28.0 in /opt/anaconda3/envs/fugue/lib/python3.8/site-packages (from spacy<2.4.0,>=2.3.0->en_core_web_sm==2.3.1) (1.0.8)
Requirement already satisfied: plac<1.2.0,>=0.9.6 in /opt/anaconda3/envs/fugue/lib/python3.8/site-packages (from spacy<2.4.0,>=2.3.0->en_core_web_sm==2.3.1) (1.1.3)
Requirement already satisfied: requests<3.0.0,>=2.13.0 in /opt/anaconda3/envs/fugue/lib/python3.8/site-packages (from spacy<2.4.0,>=2.3.0->en_core_web_sm==2.3.1) (2.28.1)
Requirement already satisfied: thinc<7.5.0,>=7.4.1 in /opt/anaconda3/envs/fugue/lib/python3.8/site-packages (from spacy<2.4.0,>=2.3.0->en_core_web_sm==2.3.1) (7.4.5)
Requirement already satisfied: blis<0.8.0,>=0.4.0 in /opt/anaconda3/envs/fugue/lib/python3.8/site-packages (from spacy<2.4.0,>=2.3.0->en_core_web_sm==2.3.1) (0.7.8)
Requirement already satisfied: catalogue<1.1.0,>=0.0.7 in /opt/anaconda3/envs/fugue/lib/python3.8/site-packages (from spacy<2.4.0,>=2.3.0->en_core_web_sm==2.3.1) (1.0.0)
Requirement already satisfied: setuptools in /opt/anaconda3/envs/fugue/lib/python3.8/site-packages (from spacy<2.4.0,>=2.3.0->en_core_web_sm==2.3.1) (61.2.0)
Requirement already satisfied: numpy>=1.15.0 in /opt/anaconda3/envs/fugue/lib/python3.8/site-packages (from spacy<2.4.0,>=2.3.0->en_core_web_sm==2.3.1) (1.23.5)
Requirement already satisfied: charset-normalizer<3,>=2 in /opt/anaconda3/envs/fugue/lib/python3.8/site-packages (from requests<3.0.0,>=2.13.0->spacy<2.4.0,>=2.3.0->en_core_web_sm==2.3.1) (2.1.0)
Requirement already satisfied: certifi>=2017.4.17 in /opt/anaconda3/envs/fugue/lib/python3.8/site-packages (from requests<3.0.0,>=2.13.0->spacy<2.4.0,>=2.3.0->en_core_web_sm==2.3.1) (2022.6.15)
Requirement already satisfied: idna<4,>=2.5 in /opt/anaconda3/envs/fugue/lib/python3.8/site-packages (from requests<3.0.0,>=2.13.0->spacy<2.4.0,>=2.3.0->en_core_web_sm==2.3.1) (3.3)
Requirement already satisfied: urllib3<1.27,>=1.21.1 in /opt/anaconda3/envs/fugue/lib/python3.8/site-packages (from requests<3.0.0,>=2.13.0->spacy<2.4.0,>=2.3.0->en_core_web_sm==2.3.1) (1.26.11)
✔ Download and installation successful
You can now load the model via spacy.load('en_core_web_sm')
You must restart kernel after installation
Explore the data#
We load the data print and do some basic analytics
import fugue.api as fa
with fa.engine_context():
df = fa.load("../../../data/stock_sentiment_data.csv", header=True)
fa.show(df)
fa.show(fa.raw_sql("SELECT Sentiment, COUNT(*) AS ct FROM",df, "GROUP BY Sentiment"))
PandasDataFrame
Text:str |Sentiment:str
--------------------------------------------------------------------------------------+-------------
Kickers on my watchlist XIDE TIT SOQ PNK CPW BPZ AJ trade method 1 or method 2, see p|1
rev posts |
user: AAP MOVIE. 55% return for the FEA/GEED indicator just 15 trades for the year. A|1
WESOME. |
user I'd be afraid to short AMZN - they are looking like a near-monopoly in eBooks and|1
infrastructure-as-a-service |
MNTA Over 12.00 |1
OI Over 21.37 |1
PGNX Over 3.04 |1
AAP - user if so then the current downtrend will break. Otherwise just a short-term co|-1
rrection in med-term downtrend. |
Monday's relative weakness. NYX WIN TIE TAP ICE INT BMC AON C CHK BIIB |-1
GOOG - ower trend line channel test & volume support. |1
AAP will watch tomorrow for ONG entry. |1
PandasDataFrame
Sentiment:str|ct:long
-------------+-------
-1 |2106
1 |3685
Total count: 2
Clean Up#
Here, we will:
Make all column names lower cased
Add a unique and deterministic id to each row
Convert sentiment to bool because it has only two values
Here we use transformer to do it. And I can write the transformer in pure native python. Read this to learn more details.
from triad.utils.hash import to_uuid
from typing import Iterable, Dict, Any, List
# schema: id:str,sentiment:bool,text:str
def preprocess(df:Iterable[Dict[str,Any]]) -> Iterable[Dict[str,Any]]:
for row in df:
yield dict(id=to_uuid(row["Text"]).split("-")[-1],
sentiment = str(row["Sentiment"])=="1",
text = row["Text"])
with fa.engine_context():
df = fa.load("../../../data/stock_sentiment_data.csv", header=True)
fa.show(fa.transform(df, preprocess))
PandasDataFrame
id:str |sentiment:bool|text:str
---------+--------------+-----------------------------------------------------------------------
4102bd2ab|True |Kickers on my watchlist XIDE TIT SOQ PNK CPW BPZ AJ trade method 1 or
986 | |method 2, see prev posts
c0b462af2|True |user: AAP MOVIE. 55% return for the FEA/GEED indicator just 15 trades f
eb6 | |or the year. AWESOME.
30464f4d4|True |user I'd be afraid to short AMZN - they are looking like a near-monopol
31a | |y in eBooks and infrastructure-as-a-service
9e7926685|True |MNTA Over 12.00
732 | |
473e64d5b|True |OI Over 21.37
792 | |
541c485a0|True |PGNX Over 3.04
00b | |
4c6d3d0d2|False |AAP - user if so then the current downtrend will break. Otherwise just
13f | |a short-term correction in med-term downtrend.
116c35bfc|False |Monday's relative weakness. NYX WIN TIE TAP ICE INT BMC AON C CHK BIIB
f78 | |
5f8bc8dec|True |GOOG - ower trend line channel test & volume support.
421 | |
e8ee0aade|True |AAP will watch tomorrow for ONG entry.
6f5 | |
Tokenize#
We will convert the raw text to tokens, so we will to lower the text, remove punctuations, and stem. These will be done inside a transformer.
For this case, Iterable
as input and output are most intuitive and convenient. We will also write an additional transformer to convert all data to word sentiment pairs to get some statistics. This is often seen on Spark hello world examples.
from nltk.stem import WordNetLemmatizer
from nltk.corpus import stopwords
import string
# schema: id:str,sentiment:bool,words:[str]
def tokenize(df:Iterable[Dict[str,Any]]) -> Iterable[List[Any]]:
lem=WordNetLemmatizer()
stop=set(stopwords.words('english'))
translator = str.maketrans('', '', string.punctuation)
for row in df:
words = row["text"].lower().translate(translator).split(" ")
words = [lem.lemmatize(w) for w in words if w not in stop and w!=""]
yield [row["id"], row["sentiment"], words]
# schema: word:str, sentiment:bool
def to_single(df:Iterable[Dict[str,Any]]) -> Iterable[List[Any]]:
for row in df:
sentiment = row["sentiment"]
for w in row["words"]:
yield [w,sentiment]
with fa.engine_context():
df = fa.load("../../../data/stock_sentiment_data.csv", header=True)
tk = fa.transform(fa.transform(df,preprocess), tokenize)
fa.show(tk, 5)
words = fa.transform(tk, to_single)
fa.show(fa.raw_sql("word, sentiment, COUNT(*) AS ct FROM",words,"GROUP BY word, sentiment ORDER BY ct DESC LIMIT 10"), 5)
PandasDataFrame
id:str |sentiment:bool|words:[str]
---------+--------------+------------------------------------------------------------------------
4102bd2ab|True |['kicker', 'watchlist', 'xide', 'tit', 'soq', 'pnk', 'cpw', 'bpz', 'aj',
986 | | 'trade', 'method', '1', 'method', '2', 'see', 'prev', 'post']
c0b462af2|True |['user', 'aap', 'movie', '55', 'return', 'feageed', 'indicator', '15', '
eb6 | |trade', 'year', 'awesome']
30464f4d4|True |['user', 'id', 'afraid', 'short', 'amzn', 'looking', 'like', 'nearmonopo
31a | |ly', 'ebooks', 'infrastructureasaservice']
9e7926685|True |['mnta', '1200']
732 | |
473e64d5b|True |['oi', '2137']
792 | |
PandasDataFrame
word:str|sentiment:bool|ct:long
--------+--------------+-------
aap |True |513
user |True |441
aap |False |407
short |False |362
today |True |253
Entity Detection#
Entity linking can generate very powerful features, so we want to do it for each sentence. Spacy is a popular package to use for this.
Entity linking can be very expensive, so making it run distributedly is how we deal with large dataset. In Fugue, it helps you separate the concerns.
Transformer is to handle a partition of data on a single machine, so scalabity and throughput is not the concern of a transformer
How to run a created transformer is associated with scalability and throughput concerns. But again, Fugue is very abstract, you can just tell the system I want to apply the transformation and let the system to optimize the distribution. And actually you can have full control of the distribution, but here we don’t go into too much details, let’s just focus on WHAT intead of HOW
Another thing to point out is that Fugue avoids the semantic of map
and only uses mapPartitions
. entity_linking
in the following code is a perfect example to demonstrate why. spacy.load
is expensive, but it’s called only once and it can be used for all items in the partition. For the cases you don’t have expensive initialization, this approach is neither more complicated nor slower than map
. So there is no good reason to directly use map
.
import spacy
import json
# schema: id:str,sentiment:bool,entities:str
def entity_linking(df:Iterable[Dict[str,Any]]) -> Iterable[Dict[str,Any]]:
nlp = spacy.load('en_core_web_sm')
for row in df:
doc = nlp(row["text"])
row["entities"] = json.dumps({str(ent).lower():str(ent.label_) for ent in doc.ents})
yield row
# schema: name:str,label:str,sentiment:bool
def to_single_entities(df:Iterable[Dict[str,Any]]) -> Iterable[List[Any]]:
for row in df:
for k,v in json.loads(row["entities"]).items():
yield [k,v,row["sentiment"]]
with fa.engine_context():
df = fa.load("../../../data/stock_sentiment_data.csv", header=True)
df = fa.raw_sql("* FROM",df," LIMIT 100")
pre = fa.transform(fa.transform(df,preprocess),entity_linking)
fa.show(pre, 5)
entities = fa.transform(pre, to_single_entities)
fa.show(fa.raw_sql("name, label, sentiment, COUNT(*) AS ct FROM",entities,"GROUP BY name, label, sentiment ORDER BY ct DESC LIMIT 10"), 5)
PandasDataFrame
id:str |sentiment:bool|entities:str
----------+--------------+---------------------------------------------------------------------
4102bd2ab9|True |{"pnk cpw bpz": "PRODUCT", "1": "CARDINAL", "2": "CARDINAL"}
86 | |
c0b462af2e|True |{"aap movie": "ORG", "55%": "PERCENT", "fea/geed": "ORG", "15": "CARD
b6 | |INAL", "the year": "DATE"}
30464f4d43|True |{"ebooks": "ORG"}
1a | |
9e79266857|True |{"12.00": "CARDINAL"}
32 | |
473e64d5b7|True |{"21.37": "CARDINAL"}
92 | |
PandasDataFrame
name:str|label:str|sentiment:bool|ct:long
--------+---------+--------------+-------
aap |ORG |True |9
today |DATE |True |6
bac |ORG |True |4
goog |ORG |True |4
aap |ORG |False |3
Bring It to Spark!#
Now it seems both tokenization and entity detection are working well on small data locally. We are going to combine them together. To make it truly scalable, we will use Spark as the execution engine.
Pay attention to a few things:
This logic can run on any executon engine, you may create an end to end test on small data using the Pandas-based
NativeExecutionEngine
For the transformers it uses, they have no dependency on Fugue.
This step may be slow on binder, if possible, try it with larger data on a real Spark cluster
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
with fa.engine_context(spark):
df = fa.load("../../../data/stock_sentiment_data.csv", escape='"', header=True)
pre = fa.transform(df,preprocess)
tokens = fa.transform(pre,tokenize)
entities = fa.transform(pre, entity_linking)
result = fa.inner_join(tokens,entities)
fa.show(result,5)
fa.save(result, "/tmp/stock_sentiment.parquet")
SparkDataFrame
id:str |sentiment:bool|words:[str] |entities:str
-------+--------------+------------------------------+--------------------------------
4102bd2|True |['kicker', 'watchlist', 'xide'|{"pnk cpw bpz": "PRODUCT", "1":
ab986 | |, 'tit', 'soq', 'pnk', 'cpw', |"CARDINAL", "2": "CARDINAL"}
| |'bpz', 'aj', 'trade', 'method'|
| |, '1', 'method', '2', 'see', '|
| |prev', 'post'] |
c0b462a|True |['user', 'aap', 'movie', '55',|{"aap movie": "ORG", "55%": "PER
f2eb6 | | 'return', 'feageed', 'indicat|CENT", "fea/geed": "ORG", "15":
| |or', '15', 'trade', 'year', 'a|"CARDINAL", "the year": "DATE"}
| |wesome'] |
30464f4|True |['user', 'id', 'afraid', 'shor|{"ebooks": "ORG"}
d431a | |t', 'amzn', 'looking', 'like',|
| | 'nearmonopoly', 'ebooks', 'in|
| |frastructureasaservice'] |
9e79266|True |['mnta', '1200'] |{"12.00": "CARDINAL"}
85732 | | |
473e64d|True |['oi', '2137'] |{"21.37": "CARDINAL"}
5b792 | | |
Last, let’s show an alternative way to describe the end to end logic – using Fugue SQL. It also adds the steps to print some stats from tokens and entities. In this example, tokens and entities will also be auto persisted because they are also used for multiple times.
%%fsql spark
LOAD "../../../data/stock_sentiment_data.csv"(header=true,escape='"')
df = TRANSFORM USING preprocess
tokens = TRANSFORM df USING tokenize
entities = TRANSFORM df USING entity_linking
result =
SELECT df.*, words, entities
FROM df
INNER JOIN tokens ON df.id = tokens.id
INNER JOIN entities ON df.id = entities.id
PRINT 5 ROWS
SAVE OVERWRITE "/tmp/stock_sentiment.parquet"
SELECT word, sentiment, COUNT(*) AS ct
FROM (TRANSFORM tokens USING to_single)
GROUP BY word, sentiment
ORDER BY ct DESC LIMIT 10
PRINT TITLE "tokens"
SELECT name, label, sentiment, COUNT(*) AS ct
FROM (TRANSFORM entities USING to_single_entities)
GROUP BY name, label, sentiment
ORDER BY ct DESC LIMIT 10
PRINT TITLE "entities"
SparkDataFrame
id:str|sentiment:bool|text:str |words:[str] |entities:str
------+--------------+-------------------+-------------------+-----------------------
4102bd|True |Kickers on my watch|['kicker', 'watchli|{"pnk cpw bpz": "PRODUC
2ab986| |list XIDE TIT SOQ P|st', 'xide', 'tit',|T", "1": "CARDINAL", "2
| |NK CPW BPZ AJ trad| 'soq', 'pnk', 'cpw|": "CARDINAL"}
| |e method 1 or metho|', 'bpz', 'aj', 'tr|
| |d 2, see prev posts|ade', 'method', '1'|
| | |, 'method', '2', 's|
| | |ee', 'prev', 'post'|
| | |] |
c0b462|True |user: AAP MOVIE. 55|['user', 'aap', 'mo|{"aap movie": "ORG", "5
af2eb6| |% return for the FE|vie', '55', 'return|5%": "PERCENT", "fea/ge
| |A/GEED indicator ju|', 'feageed', 'indi|ed": "ORG", "15": "CARD
| |st 15 trades for th|cator', '15', 'trad|INAL", "the year": "DAT
| |e year. AWESOME. |e', 'year', 'awesom|E"}
| | |e'] |
30464f|True |user I'd be afraid |['user', 'id', 'afr|{"ebooks": "ORG"}
4d431a| |to short AMZN - the|aid', 'short', 'amz|
| |y are looking like |n', 'looking', 'lik|
| |a near-monopoly in |e', 'nearmonopoly',|
| |eBooks and infrastr| 'ebooks', 'infrast|
| |ucture-as-a-service|ructureasaservice']|
9e7926|True |MNTA Over 12.00 |['mnta', '1200'] |{"12.00": "CARDINAL"}
685732| | | |
473e64|True |OI Over 21.37 |['oi', '2137'] |{"21.37": "CARDINAL"}
d5b792| | | |
tokens
SparkDataFrame
word:str|sentiment:bool|ct:long
--------+--------------+-------
aap |True |513
user |True |441
aap |False |407
short |False |362
today |True |253
volume |True |241
stock |True |241
day |True |223
long |True |210
user |False |202
entities
SparkDataFrame
name:str |label:str|sentiment:bool|ct:long
-------------------------------------------------------------------+---------+--------------+-------
aap |ORG |True |429
aap |ORG |False |351
today |DATE |True |234
bac |ORG |True |110
goog |ORG |True |91
today |DATE |False |78
goog |ORG |False |55
tomorrow |DATE |True |45
yesterday |DATE |True |40
one |CARDINAL |True |39
Fugue is not only a framework, it’s also a way of thinking – you should keep your code as native as possible and it should be less coupled with any particular computing framework including Fugue itself.