diff --git a/.gitignore b/.gitignore index bf0d076b..326440d2 100644 --- a/.gitignore +++ b/.gitignore @@ -8,6 +8,7 @@ __pycache__/ # C extensions *.so +.telegramToken # ignore .pem file *.pem diff --git a/.telegramToken b/.telegramToken new file mode 100644 index 00000000..b77b1f81 --- /dev/null +++ b/.telegramToken @@ -0,0 +1 @@ +5379937883:AAF9kGAOMUPKzDrPsZbVrp6exqVeud6QWn8 \ No newline at end of file diff --git a/Dockerfile b/Dockerfile index 6f6d50e4..515297f8 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,7 +1,9 @@ FROM python:3.8.12-slim-buster - # YOUR COMMANDS HERE # .... # .... - -CMD ["python3", "bot.py"] \ No newline at end of file +RUN pip install --upgrade pip +RUN pip install awscli +COPY . . +RUN pip install -r requirements.txt +CMD ["python3","bot.py", "utils.py"] \ No newline at end of file diff --git a/PR.Jenkinsfile b/PR.Jenkinsfile new file mode 100644 index 00000000..ea455cb0 --- /dev/null +++ b/PR.Jenkinsfile @@ -0,0 +1,16 @@ +pipeline { + agent any + + stages { + stage('Unittest') { + steps { + echo "testing" + } + } + stage('Functional test') { + steps { + echo "testing" + } + } + } +} \ No newline at end of file diff --git a/bot.py b/bot.py index 7e311287..8706c2bf 100644 --- a/bot.py +++ b/bot.py @@ -1,6 +1,10 @@ +import json +import threading +import botocore from telegram.ext import Updater, MessageHandler, Filters -from utils import search_download_youtube_video from loguru import logger +import boto3 +from utils import calc_backlog_per_instance class Bot: @@ -26,10 +30,13 @@ def send_video(self, update, context, file_path): """Sends video to a chat""" context.bot.send_video(chat_id=update.message.chat_id, video=open(file_path, 'rb'), supports_streaming=True) - def send_text(self, update, text, quote=False): + def send_text(self, update, text, chat_id=None, quote=False): """Sends text to a chat""" - # retry https://github.com/python-telegram-bot/python-telegram-bot/issues/1124 - update.message.reply_text(text, quote=quote) + if chat_id: + self.updater.bot.send_message(chat_id, text=text) + else: + # retry https://github.com/python-telegram-bot/python-telegram-bot/issues/1124 + update.message.reply_text(text, quote=quote) class QuoteBot(Bot): @@ -42,14 +49,50 @@ def _message_handler(self, update, context): self.send_text(update, f'Your original message: {update.message.text}', quote=to_quote) -class YoutubeBot(Bot): - pass +class YoutubeObjectDetectBot(Bot): + def __init__(self, token): + super().__init__(token) + threading.Thread( + target=calc_backlog_per_instance, + args=(workers_queue, asg, config.get("autoscaling_group_name")) + ).start() + + def _message_handler(self, update, context): + try: + chat_id = str(update.effective_message.chat_id) + response = workers_queue.send_message( + MessageBody=update.message.text, + MessageAttributes={ + 'chat_id': {'StringValue': chat_id, 'DataType': 'String'} + } + ) + logger.info(f'msg {response.get("MessageId")} has been sent to queue') + self.send_text(update, f'Your message is being processed...', chat_id=chat_id) + + except botocore.exceptions.ClientError as error: + logger.error(error) + self.send_text(update, f'Something went wrong, please try again...') if __name__ == '__main__': with open('.telegramToken') as f: _token = f.read() - my_bot = Bot(_token) + with open('config.json') as f: + config = json.load(f) + + sqs = boto3.resource('sqs', region_name=config.get('aws_region')) + workers_queue = sqs.get_queue_by_name(QueueName=config.get('bot_to_worker_queue_name')) + asg = boto3.client('autoscaling', region_name=config.get('aws_region')) + asgw = boto3.client('autoscaling') + with open('config2.json') as s: + config2 = json.load(s) + asgw.put_scaling_policy( + AutoScalingGroupName='danishain-polybot--aws-ex1', + PolicyName='10sqs-target-tracking-scaling-policy', + PolicyType='TargetTrackingScaling', + TargetTrackingConfiguration=config2 + ) + + my_bot = YoutubeObjectDetectBot(_token) my_bot.start() - diff --git a/config.json b/config.json new file mode 100644 index 00000000..d90050db --- /dev/null +++ b/config.json @@ -0,0 +1,5 @@ +{ + "aws_region": "eu-north-1", + "bot_to_worker_queue_name": "danishain-sqs-aws-ex1", + "autoscaling_group_name": "danishain-polybot--aws-ex1" +} \ No newline at end of file diff --git a/config2.json b/config2.json new file mode 100644 index 00000000..31d82358 --- /dev/null +++ b/config2.json @@ -0,0 +1,9 @@ +{ + "TargetValue":10, + "CustomizedMetricSpecification":{ + "MetricName":"BacklogPerInstance_danishain", + "Namespace":"danishaintarget10", + "Statistic":"Average", + "Unit":"None" + } +} \ No newline at end of file diff --git a/requirements.txt b/requirements.txt index 28cc6ee3..6a5e8bfe 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,5 @@ python-telegram-bot>=13.11 -youtube-dl>=2021.12.17 -loguru \ No newline at end of file +yt-dlp>=2022.6.29 +loguru~=0.6.0 +botocore~=1.27.46 +boto3~=1.24.46 diff --git a/tests/test_autoscaling_metric.py b/tests/test_autoscaling_metric.py new file mode 100644 index 00000000..ca57848d --- /dev/null +++ b/tests/test_autoscaling_metric.py @@ -0,0 +1,22 @@ +import unittest2 as unittest +from unittest.mock import Mock +from utils import calc_backlog_per_instance + + +class TestBacklogPerInstanceMetric(unittest.TestCase): + def setUp(self): + self.sqs_queue_client = Mock() + self.asg_client = Mock() + + def test_no_worker_full_queue(self): + self.sqs_queue_client.attributes = { + 'ApproximateNumberOfMessages': '100' + } + + self.asg_client.describe_auto_scaling_groups = Mock(return_value={ + 'AutoScalingGroups': [{ + 'DesiredCapacity': 0 + }] + }) + + self.assertEqual(calc_backlog_per_instance(self.sqs_queue_client, self.asg_client, None), 99) diff --git a/utils.py b/utils.py index 0dcbc25c..5684d5b5 100644 --- a/utils.py +++ b/utils.py @@ -1,4 +1,8 @@ -from youtube_dl import YoutubeDL +import time +import json +import boto3 +from yt_dlp import YoutubeDL +from loguru import logger def search_download_youtube_video(video_name, num_results=1): @@ -13,3 +17,34 @@ def search_download_youtube_video(video_name, num_results=1): return [ydl.prepare_filename(video) for video in videos] + +def calc_backlog_per_instance(sqs_queue_client, asg_client, asg_group_name): + while True: + msgs_in_queue = int(sqs_queue_client.attributes.get('ApproximateNumberOfMessages')) + asg_size = asg_client.describe_auto_scaling_groups(AutoScalingGroupNames=[asg_group_name])['AutoScalingGroups'][0]['DesiredCapacity'] + print(msgs_in_queue) + if msgs_in_queue == 0: + backlog_per_instance = 0 + elif asg_size == 0: + backlog_per_instance = 99 + else: + backlog_per_instance = msgs_in_queue / asg_size + + logger.info(f'backlog per instance: {backlog_per_instance}') + + # TODO send the backlog_per_instance metric to cloudwatch + cloudwatch = boto3.client('cloudwatch', region_name='eu-north-1') + + cloudwatch.put_metric_data( + Namespace='danishaintarget10', + MetricData=[ + { + 'MetricName': 'BacklogPerInstance_danishain', + 'Unit': 'None', + 'Value': backlog_per_instance + + } + ] + ) + + time.sleep(60) diff --git a/worker.py b/worker.py new file mode 100644 index 00000000..831ab847 --- /dev/null +++ b/worker.py @@ -0,0 +1,59 @@ +import json +import time +import boto3 +import botocore +from loguru import logger +from utils import search_download_youtube_video +from botocore.exceptions import ClientError + + + +def process_msg(msg): + ytb = search_download_youtube_video(msg) + if len(ytb)==0: + return ('No videos vy this name',msg) + else: + print('video is ', ytb[0]) + print('msg is' , msg) + # TODO upload the downloaded video to your S3 bucket + s3 = boto3.client("s3") + s3.upload_file( + Filename=ytb[0], + Bucket="danishain-polybot-aws-ex1", + Key= msg, + ) + + +def main(): + while True: + try: + messages = queue.receive_messages( + MessageAttributeNames=['All'], + MaxNumberOfMessages=1, + WaitTimeSeconds=10 + ) + for msg in messages: + logger.info(f'processing message {msg}') + process_msg(msg.body) + + # delete message from the queue after is was handled + response = queue.delete_messages(Entries=[{ + 'Id': msg.message_id, + 'ReceiptHandle': msg.receipt_handle + }]) + if 'Successful' in response: + logger.info(f'msg {msg} has been handled successfully') + + except botocore.exceptions.ClientError as err: + logger.exception(f"Couldn't receive messages {err}") + time.sleep(10) + + +if __name__ == '__main__': + with open('config.json') as f: + config = json.load(f) + + sqs = boto3.resource('sqs', region_name=config.get('aws_region')) + queue = sqs.get_queue_by_name(QueueName=config.get('bot_to_worker_queue_name')) + + main()