Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ services:
volumes:
- ./docker_volume:/app/docker_volume
profiles: ['production', 'testing']
environment:
- ENVIRONMENT=${ENVIRONMENT:-development}
- SENTRY_DSN=${SENTRY_DSN:-}
mongo_twitter:
network_mode: host
restart: unless-stopped
Expand Down
3 changes: 2 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,5 @@ redis~=3.5.3
pytest
black
tweepy~=4.5.0
pymongo~=4.0.2
pymongo~=4.0.2
sentry-sdk
17 changes: 16 additions & 1 deletion src/QueueProcessor.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import os
from datetime import datetime, timezone
from time import sleep
from typing import List
Expand All @@ -9,15 +10,19 @@
from pydantic import ValidationError
from rsmq.consumer import RedisSMQConsumer
from rsmq import RedisSMQ
from sentry_sdk.integrations.redis import RedisIntegration
import sentry_sdk

from ServiceConfig import ServiceConfig
from data.Task import Task
from data.TweetData import TweetData
from data.TweetMessage import TweetMessage


class UserSuspended(Exception):
pass


class QueueProcessor:

TIME_BETWEEN_QUERIES_IN_MINUTES = 4
Expand Down Expand Up @@ -98,7 +103,7 @@ def get_tweepy_query_params(self, task):
if task.params.query[0] == "@":
tweepy_client = tweepy.Client(self.service_config.twitter_bearer_token)
user = tweepy_client.get_user(username=task.params.query[1:])
if user.errors and 'detail' in user.errors[0] and 'suspended' in user.errors[0]['detail']:
if user.errors and "detail" in user.errors[0] and "suspended" in user.errors[0]["detail"]:
raise UserSuspended

tweepy_params["id"] = user.data["id"]
Expand Down Expand Up @@ -169,5 +174,15 @@ def get_sanitized_query(query):


if __name__ == "__main__":
try:
sentry_sdk.init(
os.environ.get("SENTRY_DSN"),
traces_sample_rate=0.1,
environment=os.environ.get("ENVIRONMENT", "development"),
integrations=[RedisIntegration()],
)
except Exception:
pass

redis_tasks_processor = QueueProcessor()
redis_tasks_processor.subscribe_to_extractions_tasks_queue()
2 changes: 1 addition & 1 deletion src/data/Params.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,4 @@

class Params(BaseModel):
query: str
tweets_languages: List[str] = list()
tweets_languages: List[str] = list()