From e452d62f9a03ccf6ec71611d5b2eadcff2eaf71d Mon Sep 17 00:00:00 2001 From: Parker-Robinson Date: Thu, 12 Jan 2017 07:35:51 -0800 Subject: [PATCH] stream-handler application --- .gitignore | 3 + README.md | 51 +++++++++++++-- dynamo-tables/create_tweet_table.py | 34 ++++++++++ dynamo-tables/create_users_table.py | 25 ++++++++ requirements.txt | 3 + .../handlers/apply_user_filter_handler.py | 23 +++++++ .../handlers/record_user_handler.py | 41 ++++++++++++ .../handlers/write_to_console_handler.py | 17 +++++ .../handlers/write_to_dynamo_handler.py | 32 ++++++++++ stream-handler/main.py | 49 ++++++++++++++ stream-handler/tweet_type.py | 50 +++++++++++++++ stream-handler/twitter_client.py | 63 ++++++++++++++++++ stream-handler/twitter_stream_listener.py | 64 +++++++++++++++++++ 13 files changed, 451 insertions(+), 4 deletions(-) create mode 100644 dynamo-tables/create_tweet_table.py create mode 100644 dynamo-tables/create_users_table.py create mode 100644 stream-handler/handlers/apply_user_filter_handler.py create mode 100644 stream-handler/handlers/record_user_handler.py create mode 100644 stream-handler/handlers/write_to_console_handler.py create mode 100644 stream-handler/handlers/write_to_dynamo_handler.py create mode 100644 stream-handler/main.py create mode 100644 stream-handler/tweet_type.py create mode 100644 stream-handler/twitter_client.py create mode 100644 stream-handler/twitter_stream_listener.py diff --git a/.gitignore b/.gitignore index 72364f9..6d7b21e 100644 --- a/.gitignore +++ b/.gitignore @@ -87,3 +87,6 @@ ENV/ # Rope project settings .ropeproject + +# Jetbrains settings +.idea/ \ No newline at end of file diff --git a/README.md b/README.md index 1b1bd93..ace4042 100644 --- a/README.md +++ b/README.md @@ -11,10 +11,10 @@ Uses https://python-twitter.readthedocs.io/en/latest/ ## presterity/twitter-integration development 1. go to preferred workspace directory -2. virtualenv venvs/presterity -3. source venvs/presterity/bin/activate -4. git clone git@github.com:/presterity/twitter-integration -5. pip install -r twitter-integration/requirements.txt +2. `virtualenv -p python3 venvs/presterity` +3. `source venvs/presterity/bin/activate` +4. `git clone git@github.com:/presterity/twitter-integration` +5. `pip install -r twitter-integration/requirements.txt` ## Using the Twitter APIs @@ -22,5 +22,48 @@ Uses https://python-twitter.readthedocs.io/en/latest/ 1. Follow the instructions at https://python-twitter.readthedocs.io/en/latest/getting_started.html to register a Twitter app. 2. Under "Keys and Access Tokens," generate Consumer Key and Secret 3. Copy your consumer key/secret and access key/secret into your own version of the config.py file +4. Execute the following command from the root of the repo (to avoid accidentaly checking in your changes to the file) `git update-index --assume-unchanged ./config.py` +## Running stream-listener + +1. CD to root of repo +2. `python stream-listener/twitter-stream-listen.py --handle @presterity --verbose` + +Notes about the above command: + +1. Run `python stream-listener/twitter-stream-listen.py -h` for a help page +2. `python` - If you didn't run `-p python3` option when create the virtualenv workspace, you need to specify `python3` instead `python`. Otherwise, you will get errors immeditately after running. +3. If you don't start the command from the root of the repo, you'll need to specify `--config {path/to/config}`. If your error message is related to accessing `config.py`, this is likely your problem. +4. Pick a different handle than @presterity to listen to something more active. Specify multiple like follows: `--handle @presterity --handle @SenWarren --handle @PramiliaJayapal --handle @BarackObama` ('@' is optional. Leave it off if you want.) +5. I recommend `--verbose` the first time you run it because otherwise you might think the application is hanging (when it's just waiting for someone being watched to tweet). Remove this flag after you know it works for your own sanity. + +## Using the AWS APIs + +1. Log-in to the AWS Console (You can create an account with your Amazon information if you haven't already. A lot of things are free for the first year. Setting up the Dynamo databases like I do below costs approximately $5 per month if you are not still on the free level) +2. In the IAM section, create a user with programatic access and AmazonDynamoDBFullAccess permissions +3. Create a file in ~/.aws/config and paste in your access key and secret access key in the following format + +``` +[default] +aws_access_key_id = xxxxxxxxxxxxxxxxxxxxxx +aws_secret_access_key = xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx +``` + +4. You're all set up. All AWS API calls will find this file automatically and grant permissions accordingly +5. If you have any difficulty, reach out on slack to kyle.parker.robinson + +## Creating DynamoDB Tables + +The application in stream-handler requires access to two DynamoDB tables. Run the scripts in ./dynamo-tables/ to create them + +`python dynamo-tables/create_tweet_table.py` +`python dynamo-tables/create_users_table.py` + +If your AWS account is not on the free tier, these tables will accrue charges of around $5 per month for both as long as they exist. You can delete them and recreate them when needed to save costs. + +## Running stream-handler + +`python3 stream-handler/main.py --handle @BarackObama` + +Using BarackObama for testing has been useful because there is a steady stream of people tweeting to him diff --git a/dynamo-tables/create_tweet_table.py b/dynamo-tables/create_tweet_table.py new file mode 100644 index 0000000..dcfd2bd --- /dev/null +++ b/dynamo-tables/create_tweet_table.py @@ -0,0 +1,34 @@ +import boto3 + + +dynamodb = boto3.resource('dynamodb', region_name='us-west-2') + +table = dynamodb.create_table( + TableName='Tweets', + KeySchema=[ + { + 'AttributeName': 'user', + 'KeyType': 'HASH' # Partition key + }, + { + 'AttributeName': 'id', + 'KeyType': 'RANGE' # Sort key + } + ], + AttributeDefinitions=[ + { + 'AttributeName': 'user', + 'AttributeType': 'S' + }, + { + 'AttributeName': 'id', + 'AttributeType': 'S' + } + ], + ProvisionedThroughput={ + 'ReadCapacityUnits': 5, + 'WriteCapacityUnits': 5 + } +) + +print("Table status:", table.table_status) \ No newline at end of file diff --git a/dynamo-tables/create_users_table.py b/dynamo-tables/create_users_table.py new file mode 100644 index 0000000..1144b77 --- /dev/null +++ b/dynamo-tables/create_users_table.py @@ -0,0 +1,25 @@ +import boto3 + +dynamodb = boto3.resource('dynamodb', region_name='us-west-2') + +table = dynamodb.create_table( + TableName='Users', + KeySchema=[ + { + 'AttributeName': 'id', + 'KeyType': 'HASH' # Partition key + } + ], + AttributeDefinitions=[ + { + 'AttributeName': 'id', + 'AttributeType': 'S' + } + ], + ProvisionedThroughput={ + 'ReadCapacityUnits': 5, + 'WriteCapacityUnits': 5 + } +) + +print("Table status:", table.table_status) \ No newline at end of file diff --git a/requirements.txt b/requirements.txt index 3ea6c72..23ffd48 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,2 +1,5 @@ python-twitter typing + +# Amazon AWS SDK +boto3 \ No newline at end of file diff --git a/stream-handler/handlers/apply_user_filter_handler.py b/stream-handler/handlers/apply_user_filter_handler.py new file mode 100644 index 0000000..b1744d1 --- /dev/null +++ b/stream-handler/handlers/apply_user_filter_handler.py @@ -0,0 +1,23 @@ +import logging + + +class ApplyUserFilterHandler: + + def __init__(self): + self.log = logging.getLogger() + + def handle(self, tweet: dict): + if tweet.get('presterity_action') is None: + tweet['presterity_action'] = {} + + user_flags = tweet.get('presterity_user_flags') + if user_flags is None: + return + + if 'user_blocked' in user_flags: + self.log.info('Blocked user detected: %s', tweet['user'].get('screen_name')) + tweet['presterity_action']['hidden'] = 'blocked_user' + if 'user_promoted' in user_flags: + self.log.info('Promoted user detected: %s', tweet['user'].get('screen_name')) + tweet['presterity_action']['promoted'] = 'promoted_user' + diff --git a/stream-handler/handlers/record_user_handler.py b/stream-handler/handlers/record_user_handler.py new file mode 100644 index 0000000..027ecc1 --- /dev/null +++ b/stream-handler/handlers/record_user_handler.py @@ -0,0 +1,41 @@ +import boto3 +from tweet_type import TweetType +import logging + + +class RecordUserHandler: + def __init__(self): + _dynamodb = boto3.resource('dynamodb', region_name='us-west-2') + self.table = _dynamodb.Table('Users') + self.log = logging.getLogger() + + def handle(self, tweet: dict): + user = tweet.get('user') + + response = self.table.update_item( + Key={ + 'id': user.get('id_str') + }, + UpdateExpression= + 'set ' + 'screen_name = :s, ' + 'profile_image_url = :p, ' + 'created_at = :c', + ExpressionAttributeValues={ + ':s': user.get('screen_name'), + ':p': user.get('profile_image_url'), + ':c': self.__parse_datetime(user.get('created_at')) + }, + ReturnValues='ALL_NEW' + ) + + # Take advantage of the fact that dynamo returns the record's attributes + # Removes need to read record from database in ApplyUserFilterHandler + record = response['Attributes'] + user_flags = record['user_flags'] or {} + tweet['presterity_user_flags'] = user_flags + + @staticmethod + def __parse_datetime(datetime): + datetime = TweetType.translate_datetime(datetime) + return datetime.strftime('%Y-%m-%d %H:%M:%S %Z') \ No newline at end of file diff --git a/stream-handler/handlers/write_to_console_handler.py b/stream-handler/handlers/write_to_console_handler.py new file mode 100644 index 0000000..c8a0611 --- /dev/null +++ b/stream-handler/handlers/write_to_console_handler.py @@ -0,0 +1,17 @@ +from tweet_type import TweetType + + +class WriteToconsoleHandler: + + @classmethod + def handle(cls, tweet: dict): + if not tweet: + print('-- None -- ') + elif TweetType.is_timeout(tweet): + print('-- Timeout --') + elif TweetType.is_heartbeat_timeout(tweet): + print('-- Heartbeat Timeout --') + elif TweetType.is_hangup(tweet): + print('-- Hangup --') + else: + print(TweetType.get_text(tweet)) diff --git a/stream-handler/handlers/write_to_dynamo_handler.py b/stream-handler/handlers/write_to_dynamo_handler.py new file mode 100644 index 0000000..9b61dc2 --- /dev/null +++ b/stream-handler/handlers/write_to_dynamo_handler.py @@ -0,0 +1,32 @@ +import boto3 +from tweet_type import TweetType + + +class WriteToDynamoHandler: + def __init__(self): + _dynamodb = boto3.resource('dynamodb', region_name='us-west-2') + self.table = _dynamodb.Table('Tweets') + + def handle(self, tweet: dict): + user = tweet['user'] + self.table.put_item( + Item={ + 'user': user['id_str'], + 'id': tweet['id_str'], + 'text': tweet['text'], + 'created_at': self.__parse_datetime(tweet['created_at']), + 'in_reply_to_screen_name': tweet.get('in_reply_to_screen_name'), + 'in_reply_to_status_id': tweet.get('in_reply_to_status_id_str'), + 'user_info': { + 'screen_name': user.get('screen_name'), + 'profile_image_url': user.get('profile_image_url'), + 'presterity_user_flags': tweet.get('presterity_user_flags') + }, + 'presterity_actions': tweet.get('presterity_actions') + } + ) + + @staticmethod + def __parse_datetime(datetime): + _datetime = TweetType.translate_datetime(datetime) + return _datetime.strftime('%Y-%m-%d %H:%M:%S %Z') diff --git a/stream-handler/main.py b/stream-handler/main.py new file mode 100644 index 0000000..22f7815 --- /dev/null +++ b/stream-handler/main.py @@ -0,0 +1,49 @@ +import logging +import argparse +from twitter_client import TwitterClient +from twitter_stream_listener import TwitterStreamListener + +from handlers.write_to_dynamo_handler import WriteToDynamoHandler +from handlers.record_user_handler import RecordUserHandler +from handlers.apply_user_filter_handler import ApplyUserFilterHandler + + +def build_parser() -> argparse.ArgumentParser: + """Construct argument parser for script. + + :return: ArgumentParser + """ + parser = argparse.ArgumentParser(description=__doc__) + parser.add_argument('--handle', action='append', + help='Twitter handle to listen for; may appear multiple times') + parser.add_argument('--id', action='append', + help='Twitter user id to listen for; may appear multiple times') + parser.add_argument('--verbose', '-v', action='store_true', help='log level DEBUG') + return parser + +if __name__ != '__main__': + raise Exception('main.py can only be run as main') + +parser = build_parser() +args = parser.parse_args() + +# Log at WARN overall; DEBUG is very verbose for python-twitter code +logging.basicConfig(level=logging.WARN) +log_level = logging.INFO +if args.verbose: + log_level = logging.DEBUG +log = logging.getLogger() +log.setLevel(log_level) + +client = TwitterClient() +listener = TwitterStreamListener(client.get_stream(args.handle)) + +listener.register_handler(RecordUserHandler()) +listener.register_handler(ApplyUserFilterHandler()) +listener.register_handler(WriteToDynamoHandler()) + +listener.start() + +input('Press Enter to quit') + +listener.stop() \ No newline at end of file diff --git a/stream-handler/tweet_type.py b/stream-handler/tweet_type.py new file mode 100644 index 0000000..7f3429e --- /dev/null +++ b/stream-handler/tweet_type.py @@ -0,0 +1,50 @@ +""" +Utility class to help understand data returned from Twitter API. +""" + +from typing import Optional +from datetime import datetime + + +class TweetType(object): + Timeout = {'timeout': True} + Hangup = {'hangup': True} + HeartbeatTimeout = {'hangup': True, 'heartbeat_timeout': True} + + @classmethod + def is_hangup(cls, tweet_dict: dict) -> bool: + """Return True if provided dict looks like any type of hangup. + + Note that a heartbeat timeout is a kind of hangup. + """ + return tweet_dict and tweet_dict is cls.Hangup + + @classmethod + def is_heartbeat_timeout(cls, tweet_dict: dict) -> bool: + """Return True if provided dict looks like heartbeat timeout.""" + return tweet_dict and tweet_dict is cls.HeartbeatTimeout + + @classmethod + def is_timeout(cls, tweet_dict: dict) -> bool: + """Return True if provided dict looks like timeout.""" + return tweet_dict and tweet_dict is cls.Timeout + + @classmethod + def is_hangup_or_timeout(cls, tweet_dict: dict) -> bool: + return cls.is_hangup(tweet_dict) \ + or cls.is_heartbeat_timeout(tweet_dict) \ + or cls.is_timeout(tweet_dict) + + @classmethod + def get_text(cls, tweet_dict: dict) -> Optional[str]: + """Return text or None.""" + text = None + if tweet_dict: + text = tweet_dict.get('text') + # Convert empty string to None for consistency + return text or None + + @staticmethod + def translate_datetime(tweet_datetime: str): + _format = '%a %b %d %H:%M:%S %z %Y' + return datetime.strptime(tweet_datetime, _format) \ No newline at end of file diff --git a/stream-handler/twitter_client.py b/stream-handler/twitter_client.py new file mode 100644 index 0000000..36a556f --- /dev/null +++ b/stream-handler/twitter_client.py @@ -0,0 +1,63 @@ +import logging +import twitter +from typing import List + +DEFAULT_CONFIG_FILE = './config.py' + + +class TwitterClient: + + def __init__(self, config=None): + self.log = logging.getLogger() + if config is None: + config = DEFAULT_CONFIG_FILE + + self.log.info("Loading config.py from %s", config) + _config = self.__load_config(config) + self.log.info("Config loaded") + + self.log.info("Initializing client") + try: + self.client = self.__init_client(_config) + self.log.info("Twitter account verified") + except twitter.TwitterError as ex: + self.log.error("Error initializing client: %s", ex) + + @staticmethod + def __load_config(cfg_filename: str) -> dict: + """"Load our API credentials.""" + config = {} + exec(open(cfg_filename).read(), config) + return config + + @staticmethod + def __init_client(config: dict) -> twitter.Api: + _client = twitter.Api( + consumer_key=config['consumer_key'], + consumer_secret=config['consumer_secret'], + access_token_key=config['access_key'], + access_token_secret=config['access_secret']) + + _client.VerifyCredentials(include_entities=False, skip_status=True) + + return _client + + def get_stream(self, handles: List[str]): + _user_ids = self.__handles_to_ids(handles) + _query_args = { + 'languages': ['en'], + 'follow': [str(uid) for uid in _user_ids] + } + return self.client.GetStreamFilter(**_query_args) + + def __handles_to_ids(self, handles: List[str]) -> List[int]: + num_handles = len(handles) + if num_handles > 100: + raise ValueError("Cannot lookup ids for more than 100 twitter handles.") + handles = [hndl[1:] if hndl.startswith('@') else hndl for hndl in handles] + self.log.info("Getting ids for handles: %s", handles) + user_objs = self.client.UsersLookup(screen_name=handles) + ids = [user_obj.id for user_obj in user_objs] + self.log.info("Ids: %s", ids) + return ids + diff --git a/stream-handler/twitter_stream_listener.py b/stream-handler/twitter_stream_listener.py new file mode 100644 index 0000000..dbce6c7 --- /dev/null +++ b/stream-handler/twitter_stream_listener.py @@ -0,0 +1,64 @@ +from threading import Thread +import logging +from tweet_type import TweetType + + +class TwitterStreamListener: + + def __init__(self, stream): + self.stream = stream + self.listener_thread = None + self.handlers = [] + self.status = 'Stopped' + self.stop_listener = False + self.log = logging.getLogger() + + def register_handler(self, handler): + self.handlers.append(handler) + + def get_status(self): + return self.status + + def start(self): + if self.status != 'Stopped': + raise ValueError('Status=Stopped is required to Start. Instead Status=%s', self.status) + if self.listener_thread is not None: + raise ValueError('It appears that a listener is already running') + if len(self.handlers) < 1: + raise ValueError('No handlers are registered.') + + self.log.info('Starting listener') + self.listener_thread = Thread(target=self.__listen__) + self.listener_thread.start() + self.log.info('Started') + self.status = 'Running' + + def stop(self): + if self.status != 'Running': + raise ValueError('Status=Running is required to Stop. Instead Status=%s', self.status) + if self.listener_thread is None: + raise ValueError('It appears that there is no listener running') + + self.log.info('Stopping listener') + self.status = 'Stopping' + self.stop_listener = True + self.listener_thread.join() + self.listener_thread = None + self.status = 'Stopped' + self.log.info('Listener stopped') + + def __listen__(self): + for tweet in self.stream: + if TweetType.is_hangup_or_timeout(tweet): + continue + + for handler in self.handlers: + try: + handler.handle(tweet) + except Exception as e: + self.log.error('Failure running handler %s on tweet %s. Exception message:%n%s', + handler.__class__.__name__, tweet, e) + + if self.stop_listener: + self.log.info('Stop detected in thread') + break