Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -134,3 +134,8 @@ dmypy.json

# Pyre type checker
.pyre/

backend/twitter_keys.py

# Credentials
backend/creds.json
100 changes: 100 additions & 0 deletions backend/backend.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
from stock_metadata import *
import tweepy
import time
from twitter_keys import *
from stock_metadata import company_and_ticks
from pprint import pprint
import re
import sys
# init server

class TweetStreamListener(tweepy.StreamListener):

def __init__(self):
self.backoff_timeout = 1
super(TweetStreamListener,self).__init__()
self.query_string = list()
self.query_string.extend(list(company_and_ticks.keys()))
#self.query_string.extend(list(company_and_ticks.values()))
#self.query_string.remove("V")

def on_status(self, status):

#reset timeout
self.backoff_timeout = 1

#send message on namespace
tweet = self.construct_tweet(status)
if (tweet):
print(tweet)

def on_error(self, status_code):

# exp back-off if rate limit error
if status_code == 420:
time.sleep(self.backoff_timeout)
self.backoff_timeout *= 2
return True
else:
print("Error {0} occurred".format(status_code))
return False

def construct_tweet(self, status):
try:
tweet_text = ""
if hasattr(status, 'retweeted_status') and hasattr(status.retweeted_status, 'extended_tweet'):
tweet_text = status.retweeted_status.extended_tweet['full_text']
elif hasattr(status, 'full_text'):
tweet_text = status.full_text
elif hasattr(status, 'extended_tweet'):
tweet_text = status.extended_tweet['full_text']
elif hasattr(status, 'quoted_status'):
if hasattr(status.quoted_status, 'extended_tweet'):
tweet_text = status.quoted_status.extended_tweet['full_text']
else:
tweet_text = status.quoted_status.text
else:
tweet_text = status.text
tweet_data = dict()
for q_string in self.query_string:
if tweet_text.lower().find(q_string.lower()) != -1:
tweet_data = {
"text": TweetStreamListener.sanitize_text(tweet_text),
"tic": company_and_ticks[q_string],
"date": status.created_at
}
break
return tweet_data
except Exception as e:
print("Exception occur while parsing status object:", e)

@staticmethod
def sanitize_text(tweet):
tweet = tweet.replace('\n', '').replace('"', '').replace('\'', '')
return re.sub(r"http\S+", "", tweet)

class TwitterStreamer:

def __init__(self):
self.twitter_api = None
self.__get_twitter_connection()
self.listener = TweetStreamListener()
self.tweet_stream = tweepy.Stream(auth=self.twitter_api.auth, listener=self.listener, tweet_mode='extended')

def __get_twitter_connection(self):
try:
auth = tweepy.OAuthHandler(tw_access_key, tw_secret_key)
auth.set_access_token(tw_access_token, tw_access_token_secret)
self.twitter_api = tweepy.API(auth, wait_on_rate_limit=True)
except Exception as e:
print("Exception occurred : {0}".format(e))

def start_tweet_streaming(self):
# start stream to listen to company tweets
self.tweet_stream.filter(track=self.listener.query_string, languages=['en'])

if __name__=="__main__":

#init twitter connection
twitter_streamer = TwitterStreamer()
twitter_streamer.start_tweet_streaming()
32 changes: 32 additions & 0 deletions backend/companies.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
{
"MMM": "3M",
"AXP": "American Express",
"AMGN": "Amgen",
"AAPL": "Apple",
"BA": "Boeing",
"CAT": "Caterpillar",
"CVX": "Chevron",
"CSCO": "Cisco",
"KO": "Coca-Cola",
"DOW": "Dow",
"GS": "Goldman Sachs",
"HD": "Home Depot",
"HON": "Honeywell",
"IBM": "IBM",
"INTC": "Intel",
"JNJ": "Johnson & Johnson",
"JPM": "JPMorgan",
"MCD": "McDonald's",
"MRK": "Merck",
"MSFT": "Microsoft",
"NKE": "Nike",
"PG": "Procter & Gamble",
"CRM": "Salesforce",
"TRV": "The Travelers Companies",
"UNH": "UnitedHealth",
"VZ": "Verizon",
"V": "Visa",
"WBA": "Walgreens Boots Alliance",
"WMT": "Walmart",
"DIS": "Walt Disney"
}
33 changes: 33 additions & 0 deletions backend/stock_metadata.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
company_and_ticks = {
"3M Company": "MMM",
"American Express": "AXP",
"Amgen": "AMGN",
"Apple": "AAPL",
"Boeing": "BA",
"Caterpillar": "CAT",
"Chevron":"CVX",
"Cisco": "CSCO",
"Coca-Cola": "KO",
"Dow": "DOW",
"Goldman Sachs": "GS",
"Home Depot": "HD",
"Honeywell": "HON",
"IBM": "IBM",
"Intel": "INTC",
"Johnson & Johnson": "JNJ",
"JPMorgan": "JPM",
"McDonald": "MCD",
"Merck": "MRK",
"Microsoft": "MSFT",
"Nike": "NKE",
"Proctor & Gamble": "PG",
"Salesforce": "CRM",
"The Travelers Companies": "TRV",
"UnitedHealth": "UNH",
"Verizon": "VZ",
"Visa": "V",
"Walgreens Boots Alliance": "WBA",
"Walmart": "WMT",
"Disney": "DIS"
}

112 changes: 112 additions & 0 deletions backend/stream_reddit.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
import argparse
import json
import math
import praw
import threading
import time

from kafka import KafkaProducer

redditClient = None

class CommentsFetcher (threading.Thread):
die = False
sr_obj = None
companies = {}
def __init__(self, subreddit, companies, exit_on_fail=False, producer=None, topic=None):
threading.Thread.__init__(self)
self.name = 'fetch_comments_{0}'.format(subreddit)
self.companies = companies
self.exit_on_fail = exit_on_fail
self.producer = producer
self.topic = topic
lock = threading.RLock()
with lock:
self.sr_obj = redditClient.subreddit(subreddit)

def run(self):
while not self.die:
try:
self.fetchComments()
except Exception as e:
if self.exit_on_fail:
raise
else:
print("Thread {1}, Error {0} occurred while streaming comments, continuing".format(e, self.name))

def join(self):
self.die = True
super().join()

def fetchComments(self):
for comment in self.sr_obj.stream.comments(skip_existing=True, pause_after=5):
comment_text = comment.body.casefold()
for ticker in self.companies:
casefolded_company = self.companies[ticker].casefold()
if ('{0} '.format(ticker) in comment.body or
' {0}'.format(ticker) in comment.body or
'{0} '.format(casefolded_company) in comment_text or
' {0}'.format(casefolded_company) in comment_text):
comment_obj = { "ticker": ticker, "text": comment.body, "timestamp": math.ceil(time.time_ns()/1000000) }
self.output(comment_obj)
break

def output(self, comment):
if self.producer is None:
print(comment)
else:
if self.topic is None:
raise ValueError("topic not supplied")
key = "{0}_{1}".format(comment["ticker"],comment["timestamp"])
try:
key_bytes = bytes(key, encoding='utf-8')
value = json.dumps(comment_obj)
value_bytes = bytes(value, encoding='utf-8')
self.producer.send(self.topic, key=key_bytes, value=value_bytes)
except Exception as e:
print("Error {0} occurred while publishing message with key {1}".format(e, key))

if __name__=='__main__':
parser = argparse.ArgumentParser(description='Stream reddit comments to stdout or kafka topic')
parser.add_argument('-t', '--topic', metavar='<topic_name>', help='Kafka topic name')
parser.add_argument('-H', '--host', metavar='<hostname_port>', default='localhost:9092', help='Hostname:port of bootstrap server')
args = parser.parse_args()
creds = json.loads(open("creds.json","r").read())
redditClient = praw.Reddit(client_id=creds['client_id'],
client_secret=creds['client_secret'],
password=creds['password'],
user_agent=creds['user_agent'],
username=creds['username'])


subreddits = [sr.strip() for sr in open("subreddits","r").read().split(',')]
companies = json.loads(open("companies.json","r").read())

producer = None
if args.topic is not None:
producer = KafkaProducer(bootstrap_servers=[args.host], api_version=(0, 10))

# start fetch thread for every subreddit
fetch_threads = []
for sr in subreddits:
th = CommentsFetcher(sr, companies, producer, args.topic)
th.start()
fetch_threads.append(th)

try:
while True:
time.sleep(2)
except KeyboardInterrupt:
for th in fetch_threads:
th.join()


"""

This module is responsible for

Streaming comments

Stream comments from reddit and write to specified source (stdout or kafka)

"""
1 change: 1 addition & 0 deletions backend/subreddits
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
wallstreetbets,SecurityAnalysis,Finance,Options,Investing,Stocks,StockMarket
3 changes: 2 additions & 1 deletion config/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@
# data
#TRAINING_DATA_FILE = "data/ETF_SPY_2009_2020.csv"
TRAINING_DATA_FILE = "data/dow_30_2009_2020.csv"

# List of stock tickers
stock_tickers=['MMM','AXP','AMGN','AAPL','BA','CAT','CVX','CSCO','KO','DIS','DOW','GS','HD','HON','IBM','INTC','JNJ','JPM','MCD','MRK','MSFT','NKE','PG','CRM','TRV','UNH','VZ','V','WBA','WMT']
now = datetime.datetime.now()
TRAINED_MODEL_DIR = f"trained_models/{now}"
os.makedirs(TRAINED_MODEL_DIR)
Expand Down
File renamed without changes.
Loading