diff --git a/configs/dev.yaml b/configs/dev.yaml index 72ef4a5..8b14fd7 100644 --- a/configs/dev.yaml +++ b/configs/dev.yaml @@ -2,6 +2,14 @@ DATABASE_URL: "postgresql://esclient_devcomm:oBFG3H!7XYSJCSNK@pg4.sweb.ru:5433/e HOST: "0.0.0.0" PORT: "7002" +KAFKA_BROKERS: "localhost:9092" +KAFKA_REQUEST_TOPIC: "moderation-request" +KAFKA_RESULT_TOPIC: "moderation-result" +KAFKA_CONSUMER_GROUP_ID: "moderation-consumer-group" +KAFKA_MAX_RETRIES: "3" +KAFKA_RETRY_BACKOFF_MS: "1000" +KAFKA_ENABLE_SSL: false + LOG_LEVEL: "INFO" LOG_FORMAT: "%(asctime)s %(levelname)-8s [%(name)s] %(message)s" LOG_DATEFMT: "%Y-%m-%d %H:%M:%S" diff --git a/justfile b/justfile index 59ae2d8..242ec00 100644 --- a/justfile +++ b/justfile @@ -4,12 +4,17 @@ set dotenv-load := true COMMON_JUST_URL := 'https://raw.githubusercontent.com/esclient/tools/refs/heads/main/python/common.just' LOAD_ENVS_URL := 'https://raw.githubusercontent.com/esclient/tools/refs/heads/main/load_envs.sh' -PROTO_TAG := 'v0.0.17' -PROTO_NAME := 'comment.proto' -TMP_DIR := '.proto' +COMMENT_PROTO_TAG := 'v0.0.17' +COMMENT_PROTO_NAME := 'comment.proto' +COMMENT_TMP_DIR := '.proto' SOURCE := 'commentservice' OUT_DIR := 'src/' + SOURCE + '/grpc' +MODERATION_PROTO_TAG := 'v0.1.3' +MODERATION_PROTO_NAME := 'moderation.proto' +MODERATION_TMP_DIR := '.proto' +SERVICE_NAME := 'moderation' + MKDIR_TOOLS := 'mkdir -p tools' FETCH_COMMON_JUST := 'curl -fsSL ' + COMMON_JUST_URL + ' -o tools/common.just' diff --git a/migrations/20251114232657_migrations_create_comments_table.sql b/migrations/20251114232657_migrations_create_comments_table.sql new file mode 100644 index 0000000..34f14fc --- /dev/null +++ b/migrations/20251114232657_migrations_create_comments_table.sql @@ -0,0 +1,21 @@ +-- +goose Up +-- +goose StatementBegin +CREATE TABLE IF NOT EXISTS comments ( + id BIGSERIAL PRIMARY KEY, + mod_id BIGINT NOT NULL, + author_id BIGINT NOT NULL, + text TEXT NOT NULL, + status VARCHAR(50) DEFAULT 'pending_moderation', + created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(), + edited_at TIMESTAMP WITH TIME ZONE +); + +CREATE INDEX idx_comments_mod_id ON comments(mod_id); +CREATE INDEX idx_comments_author_id ON comments(author_id); +CREATE INDEX idx_comments_status ON comments(status); +-- +goose StatementEnd + +-- +goose Down +-- +goose StatementBegin +DROP TABLE IF EXISTS comments CASCADE; +-- +goose StatementEnd \ No newline at end of file diff --git a/pdm.lock b/pdm.lock index 1abebf0..29a650f 100644 --- a/pdm.lock +++ b/pdm.lock @@ -5,7 +5,7 @@ groups = ["default", "dev"] strategy = ["inherit_metadata"] lock_version = "4.5.0" -content_hash = "sha256:e3b96de7c4efceb61703b10262da72cbc0a735cbb324a8fe866247d8c513a297" +content_hash = "sha256:57eeea1d5e7bf9ee1ad0acf79609855f60ee838ccb6d83752803a21ae3fcd7fd" [[metadata.targets]] requires_python = "~=3.13" @@ -113,6 +113,30 @@ files = [ {file = "colorama-0.4.6.tar.gz", hash = "sha256:08695f5cb7ed6e0531a20572697297273c47b8cae5a63ffc6d6ed5c201be6e44"}, ] +[[package]] +name = "confluent-kafka" +version = "2.12.2" +requires_python = ">=3.8" +summary = "Confluent's Python client for Apache Kafka" +groups = ["default"] +files = [ + {file = "confluent_kafka-2.12.2-cp313-cp313-macosx_13_0_arm64.whl", hash = "sha256:e888667c607741af5e4b36014d215c4ad2f214646e3da777505e4cf383ac5375"}, + {file = "confluent_kafka-2.12.2-cp313-cp313-macosx_13_0_x86_64.whl", hash = "sha256:adc98ecfbb2a41a234c72043c0ca46c941d5da61900d998f14f29a30baa2e688"}, + {file = "confluent_kafka-2.12.2-cp313-cp313-manylinux_2_28_aarch64.whl", hash = "sha256:d0abde08fc133cfe6667226472518c6afbb80e083090c441c4ae4cddcd8ed921"}, + {file = "confluent_kafka-2.12.2-cp313-cp313-manylinux_2_28_x86_64.whl", hash = "sha256:b3065064a86b4494c8c94eff9968845461918a2bc89e5a800a2920f722ed2cb1"}, + {file = "confluent_kafka-2.12.2-cp313-cp313-win_amd64.whl", hash = "sha256:26b2291694a300b7ff00b46eda835a06b124b4878527d32277d42ca39ee95dd9"}, + {file = "confluent_kafka-2.12.2-cp314-cp314-macosx_13_0_arm64.whl", hash = "sha256:0101be4b6037ad5a49f71c749bfd9f24e82607774f5fb4424c4dee6bf39a302d"}, + {file = "confluent_kafka-2.12.2-cp314-cp314-macosx_13_0_x86_64.whl", hash = "sha256:27cc33a0c47f167db81b4f46d9e1c59582d9bfd8b3c21129a2ee400f5c93844e"}, + {file = "confluent_kafka-2.12.2-cp314-cp314-manylinux_2_28_aarch64.whl", hash = "sha256:38d23cf3f428451fc14c18aa53f5f3f1a37c7d89c44bfaf2862b3d6a5068e45c"}, + {file = "confluent_kafka-2.12.2-cp314-cp314-manylinux_2_28_x86_64.whl", hash = "sha256:eed1b0e540204c52d0ab40d621c371f52044a788b542f6e28a7756fd8f7a1029"}, + {file = "confluent_kafka-2.12.2-cp314-cp314-win_amd64.whl", hash = "sha256:ef411221bfdaffae944156826965b9a08777a5dff66d765a23108f7d6774706f"}, + {file = "confluent_kafka-2.12.2-cp314-cp314t-macosx_13_0_arm64.whl", hash = "sha256:d04f69f6c269ccf6ec1a2ec327edf977a06e790f631ede18511093c1fe598fef"}, + {file = "confluent_kafka-2.12.2-cp314-cp314t-macosx_13_0_x86_64.whl", hash = "sha256:01a0429cac8fe38db49ebb9bda335b0c77f14455da72ccf351d49a51c1bd00a5"}, + {file = "confluent_kafka-2.12.2-cp314-cp314t-manylinux_2_28_aarch64.whl", hash = "sha256:82ec5302cf7c9ea06d556ed8e8ea8422d2a60035b607d64579ca63663276fe9b"}, + {file = "confluent_kafka-2.12.2-cp314-cp314t-manylinux_2_28_x86_64.whl", hash = "sha256:de9dece5803e6b58d8c101cbceb90fa55ca96e0a5f40f10483a4c8f5f4027a69"}, + {file = "confluent_kafka-2.12.2.tar.gz", hash = "sha256:5a50bfcd24f9dcf34b986f837f80126a71087364d44fcb8b45e8e74080fb6e98"}, +] + [[package]] name = "coverage" version = "7.11.0" diff --git a/pyproject.toml b/pyproject.toml index f367c1b..cc316b4 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -125,6 +125,7 @@ dependencies = [ "psycopg2-binary==2.9.10", "watchfiles==1.1.0", "asyncpg>=0.30.0", + "confluent-kafka>=2.12.2", ] requires-python = ">=3.13" readme = "README.md" diff --git a/src/commentservice/grpc/moderation_pb2.py b/src/commentservice/grpc/moderation_pb2.py new file mode 100644 index 0000000..8860587 --- /dev/null +++ b/src/commentservice/grpc/moderation_pb2.py @@ -0,0 +1,42 @@ +# -*- coding: utf-8 -*- +# Generated by the protocol buffer compiler. DO NOT EDIT! +# NO CHECKED-IN PROTOBUF GENCODE +# source: moderation.proto +# Protobuf Python Version: 6.31.1 +"""Generated protocol buffer code.""" +from google.protobuf import descriptor as _descriptor +from google.protobuf import descriptor_pool as _descriptor_pool +from google.protobuf import runtime_version as _runtime_version +from google.protobuf import symbol_database as _symbol_database +from google.protobuf.internal import builder as _builder +_runtime_version.ValidateProtobufRuntimeVersion( + _runtime_version.Domain.PUBLIC, + 6, + 31, + 1, + '', + 'moderation.proto' +) +# @@protoc_insertion_point(imports) + +_sym_db = _symbol_database.Default() + + + + +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x10moderation.proto\x12\nmoderation\"W\n\x15ModerateObjectRequest\x12\n\n\x02id\x18\x01 \x01(\x03\x12$\n\x04type\x18\x02 \x01(\x0e\x32\x16.moderation.ObjectType\x12\x0c\n\x04text\x18\x03 \x01(\t\")\n\x16ModerateObjectResponse\x12\x0f\n\x07success\x18\x01 \x01(\x08*\x83\x01\n\nObjectType\x12\x1b\n\x17OBJECT_TYPE_UNSPECIFIED\x10\x00\x12\x1f\n\x1bOBJECT_TYPE_MOD_DESCRIPTION\x10\x01\x12\x1c\n\x18OBJECT_TYPE_COMMENT_TEXT\x10\x02\x12\x19\n\x15OBJECT_TYPE_USER_NAME\x10\x03\x32l\n\x11ModerationService\x12W\n\x0eModerateObject\x12!.moderation.ModerateObjectRequest\x1a\".moderation.ModerateObjectResponseb\x06proto3') + +_globals = globals() +_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) +_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'moderation_pb2', _globals) +if not _descriptor._USE_C_DESCRIPTORS: + DESCRIPTOR._loaded_options = None + _globals['_OBJECTTYPE']._serialized_start=165 + _globals['_OBJECTTYPE']._serialized_end=296 + _globals['_MODERATEOBJECTREQUEST']._serialized_start=32 + _globals['_MODERATEOBJECTREQUEST']._serialized_end=119 + _globals['_MODERATEOBJECTRESPONSE']._serialized_start=121 + _globals['_MODERATEOBJECTRESPONSE']._serialized_end=162 + _globals['_MODERATIONSERVICE']._serialized_start=298 + _globals['_MODERATIONSERVICE']._serialized_end=406 +# @@protoc_insertion_point(module_scope) diff --git a/src/commentservice/grpc/moderation_pb2.pyi b/src/commentservice/grpc/moderation_pb2.pyi new file mode 100644 index 0000000..107cea2 --- /dev/null +++ b/src/commentservice/grpc/moderation_pb2.pyi @@ -0,0 +1,33 @@ +from google.protobuf.internal import enum_type_wrapper as _enum_type_wrapper +from google.protobuf import descriptor as _descriptor +from google.protobuf import message as _message +from typing import ClassVar as _ClassVar, Optional as _Optional, Union as _Union + +DESCRIPTOR: _descriptor.FileDescriptor + +class ObjectType(int, metaclass=_enum_type_wrapper.EnumTypeWrapper): + __slots__ = () + OBJECT_TYPE_UNSPECIFIED: _ClassVar[ObjectType] + OBJECT_TYPE_MOD_DESCRIPTION: _ClassVar[ObjectType] + OBJECT_TYPE_COMMENT_TEXT: _ClassVar[ObjectType] + OBJECT_TYPE_USER_NAME: _ClassVar[ObjectType] +OBJECT_TYPE_UNSPECIFIED: ObjectType +OBJECT_TYPE_MOD_DESCRIPTION: ObjectType +OBJECT_TYPE_COMMENT_TEXT: ObjectType +OBJECT_TYPE_USER_NAME: ObjectType + +class ModerateObjectRequest(_message.Message): + __slots__ = ("id", "type", "text") + ID_FIELD_NUMBER: _ClassVar[int] + TYPE_FIELD_NUMBER: _ClassVar[int] + TEXT_FIELD_NUMBER: _ClassVar[int] + id: int + type: ObjectType + text: str + def __init__(self, id: _Optional[int] = ..., type: _Optional[_Union[ObjectType, str]] = ..., text: _Optional[str] = ...) -> None: ... + +class ModerateObjectResponse(_message.Message): + __slots__ = ("success",) + SUCCESS_FIELD_NUMBER: _ClassVar[int] + success: bool + def __init__(self, success: bool = ...) -> None: ... diff --git a/src/commentservice/grpc/moderation_pb2_grpc.py b/src/commentservice/grpc/moderation_pb2_grpc.py new file mode 100644 index 0000000..ed857d0 --- /dev/null +++ b/src/commentservice/grpc/moderation_pb2_grpc.py @@ -0,0 +1,97 @@ +# Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT! +"""Client and server classes corresponding to protobuf-defined services.""" +import grpc +import warnings + +from . import moderation_pb2 as moderation__pb2 + +GRPC_GENERATED_VERSION = '1.75.1' +GRPC_VERSION = grpc.__version__ +_version_not_supported = False + +try: + from grpc._utilities import first_version_is_lower + _version_not_supported = first_version_is_lower(GRPC_VERSION, GRPC_GENERATED_VERSION) +except ImportError: + _version_not_supported = True + +if _version_not_supported: + raise RuntimeError( + f'The grpc package installed is at version {GRPC_VERSION},' + + f' but the generated code in moderation_pb2_grpc.py depends on' + + f' grpcio>={GRPC_GENERATED_VERSION}.' + + f' Please upgrade your grpc module to grpcio>={GRPC_GENERATED_VERSION}' + + f' or downgrade your generated code using grpcio-tools<={GRPC_VERSION}.' + ) + + +class ModerationServiceStub(object): + """Missing associated documentation comment in .proto file.""" + + def __init__(self, channel): + """Constructor. + + Args: + channel: A grpc.Channel. + """ + self.ModerateObject = channel.unary_unary( + '/moderation.ModerationService/ModerateObject', + request_serializer=moderation__pb2.ModerateObjectRequest.SerializeToString, + response_deserializer=moderation__pb2.ModerateObjectResponse.FromString, + _registered_method=True) + + +class ModerationServiceServicer(object): + """Missing associated documentation comment in .proto file.""" + + def ModerateObject(self, request, context): + """Missing associated documentation comment in .proto file.""" + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details('Method not implemented!') + raise NotImplementedError('Method not implemented!') + + +def add_ModerationServiceServicer_to_server(servicer, server): + rpc_method_handlers = { + 'ModerateObject': grpc.unary_unary_rpc_method_handler( + servicer.ModerateObject, + request_deserializer=moderation__pb2.ModerateObjectRequest.FromString, + response_serializer=moderation__pb2.ModerateObjectResponse.SerializeToString, + ), + } + generic_handler = grpc.method_handlers_generic_handler( + 'moderation.ModerationService', rpc_method_handlers) + server.add_generic_rpc_handlers((generic_handler,)) + server.add_registered_method_handlers('moderation.ModerationService', rpc_method_handlers) + + + # This class is part of an EXPERIMENTAL API. +class ModerationService(object): + """Missing associated documentation comment in .proto file.""" + + @staticmethod + def ModerateObject(request, + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None): + return grpc.experimental.unary_unary( + request, + target, + '/moderation.ModerationService/ModerateObject', + moderation__pb2.ModerateObjectRequest.SerializeToString, + moderation__pb2.ModerateObjectResponse.FromString, + options, + channel_credentials, + insecure, + call_credentials, + compression, + wait_for_ready, + timeout, + metadata, + _registered_method=True) diff --git a/src/commentservice/kafka/config.py b/src/commentservice/kafka/config.py new file mode 100644 index 0000000..12caa70 --- /dev/null +++ b/src/commentservice/kafka/config.py @@ -0,0 +1,38 @@ +import os + +class KafkaConfig: + + def __init__(self): + self.brokers = os.getenv('KAFKA_BROKERS', 'localhost:9092') + self.request_topic = os.getenv('KAFKA_REQUEST_TOPIC', 'moderation-request') + self.result_topic = os.getenv('KAFKA_RESULT', 'moderation-result') + self.consumer_group_id = os.getenv('KAFKA_CONSUMER_GROUP_ID', 'comment-service-group') + self.max_retries = int(os.getenv('KAFKA_MAX_RETRIES', '3')) + self.retry_backoff_ms = int(os.getenv('KAFKA_RETRY_BACKOFF_MS', '1000')) + self.enable_ssl = os.getenv('KAFKA_ENABLE_SSL', 'false').lower() == 'true' + + def get_producer_config(self): + + config = { + 'bootstrap.servers': self.brokers, + 'client.id': 'comment_service_producer', + 'acks': 'all', + 'retries': self.max_retries, + 'retry.backoff.ms': self.retry_backoff_ms, + 'enable.idempotence': True, + 'compression.type': 'none' + } + return config + + def get_consumer_config(self): + + config = { + 'bootstrap.servers': self.brokers, + 'group.id': self.consumer_group_id, + 'auto.offset.reset': 'earliest', + 'enable.auto.commit': True, + 'auto.commit.interval.ms': 1000, + 'session.timeout.ms': 30000, + 'heartbeat.interval.ms': 10000 + } + return config \ No newline at end of file diff --git a/src/commentservice/kafka/consumer.py b/src/commentservice/kafka/consumer.py new file mode 100644 index 0000000..0869afe --- /dev/null +++ b/src/commentservice/kafka/consumer.py @@ -0,0 +1,105 @@ +from confluent_kafka import Consumer, KafkaException, KafkaError +import logging +import threading +from .config import KafkaConfig +from commentservice.grpc import moderation_pb2 + +logger = logging.getLogger(__name__) + +class ModerationResponseConsumer: + + def __init__(self, config: KafkaConfig, callback): + + self.config = config + self.callback = callback + self.consumer = Consumer(config.get_consumer_config()) + self.topic = config.result_topic + self.running = False + self.consumer_thread = None + + self.consumer.subscribe([self.topic]) + logger.info(f"ModerationResponseConsumer subscribed to topic: {self.topic}") + + def start(self): + + if self.running: + logger.warning("Consumer already running") + return + + self.running = True + self.consumer_thread = threading.Thread(target=self._consume_loop, daemon = True) + self.consumer_thread.start() + logger.info("ModerationResponseConsumer started") + + def stop(self): + + if not self.running: + return + + self.running = False + + if self.consumer_thread and self.consumer_thread.is_alive(): + self.consumer_thread.join(timeout=5.0) + + self.consumer.close() + logger.info("ModerationResponseConsumer stopped") + + def _consume_loop(self): + + logger.info("Consumer loop started") + + while self.running: + try: + msg = self.consumer.poll(timeout=1.0) + + if msg is None: + continue + if msg.error(): + if msg.error().code() == KafkaError._PARTITION_EOF: + logger.debug(f"Reached end of partition for topic {msg.topic()}") + elif msg.error().code() == KafkaError._TIMED_OUT: + logger.debug("Timeout") + else: + logger.error(f"Consumer error: {msg.error()}") + continue + + self._process_message(msg) + + except KafkaException as e: + logger.error(f"Kafka exception in consume loop: {e}") + except Exception as e: + logger.error(f"Unexpected error in consume loop: {e}") + + logger.info("Consumer loop ended") + + def _process_message(self, msg): + + try: + request_id = 0 + if msg.key() is not None: + try: + key_str = msg.key().decode('utf-8') + request_id = int(key_str) + except (ValueError) as e: + logger.error(f"Failed to parse message key to request ID: {e}") + return + + response = moderation_pb2.ModerateObjectResponse() + response.ParseFromString(msg.value()) + + logger.info( + f"Received moderation response: request_id={request_id}, " + f"success={response.success}" + ) + + if self.callback: + try: + self.callback(response, request_id) + except Exception as e: + logger.error(f"Error in callback: {e}") + + except Exception as e: + logger.error(f"Failed to process message: {e}") + + def is_running(self) -> bool: + return self.running \ No newline at end of file diff --git a/src/commentservice/kafka/moderaiton_service.py b/src/commentservice/kafka/moderaiton_service.py new file mode 100644 index 0000000..5d4e955 --- /dev/null +++ b/src/commentservice/kafka/moderaiton_service.py @@ -0,0 +1,73 @@ +import logging +from ..kafka.producer import ModerationRequestProducer +from ..kafka.consumer import ModerationResponseConsumer +from ..kafka.config import KafkaConfig +from commentservice.grpc import moderation_pb2 + +logger = logging.getLogger(__name__) + +class ModerationService: + + def __init__(self): + self.config = KafkaConfig() + + self.producer = ModerationRequestProducer(self.config) + + self.consumer = ModerationResponseConsumer( + self.config, + callback=self._handle_moderation_response + ) + + self.consumer.start() + + logger.info("ModerationService initialized") + + def request_moderaiton(self, comment_id: int, comment_text: str) -> bool: + try: + request = moderation_pb2.ModerateObjectRequest() + request.id = comment_id + request.text = comment_text + + success = self.producer.send_moderation_request(request) + + if success: + logger.info(f"Moderation request sent: comment_id={comment_id}") + else: + logger.error(f"Failed to send moderation request: comment_id={comment_id}") + + return success + + except Exception as e: + logger.error(f"Error requesting moderation: {e}") + return False + + def _handle_moderation_response(self, response: moderation_pb2.ModerateObjectResponse, request_id: int): + + try: + is_flagged = response.success + + logger.info( + f"Moderation result received: comment_id={request_id}, " + f"flagged={is_flagged}" + ) + + self._update_comment_status(request_id, is_flagged) + + except Exception as e: + logger.error(f"Error handling moderation response: {e}") + + def _update_comment_status(self, comment_id: int, is_flagged: bool): + # TODO: Implement database update + # Example: + # if is_flagged: + # db.update_comment_status(comment_id, status='FLAGGED') + # else: + # db.update_comment_status(comment_id, status='APPROVED') + logger.debug("Testing") + + def shutdown(self): + self.consumer.stop() + self.producer.flush() + logger.info("ModerationService shutdown complete") + + \ No newline at end of file diff --git a/src/commentservice/kafka/producer.py b/src/commentservice/kafka/producer.py new file mode 100644 index 0000000..e5c4d1c --- /dev/null +++ b/src/commentservice/kafka/producer.py @@ -0,0 +1,74 @@ +from confluent_kafka import Producer +from confluent_kafka import KafkaException +import logging +from .config import KafkaConfig +from commentservice.grpc import moderation_pb2 +import time + +logger = logging.getLogger(__name__) + +class ModerationRequestProducer: + + def __init__(self, config: KafkaConfig): + self.config = config + self.producer = Producer(config.get_producer_config()) + self.topic = config.request_topic + + logger.info(f"ModerationRequestProducer initialized for topic: {self.topic}") + + def send_moderation_request(self, request: moderation_pb2.ModerateObjectRequest, retries: int = 3) -> bool: + for _ in range(retries): + try: + serialized_request = request.SerializeToString() + + key = str(request.id).encode('utf-8') + + self.producer.produce( + topic=self.topic, + value=serialized_request, + key=key, + partition=-1, + callback=self._delivery_callback + ) + + self.producer.poll(0) + + logger.debug(f"Moderation request queued: ID={request.id}, text_length={len(request.text)}") + return True + + except BufferError as e: + logger.error(f"Producer buffer full, message not queued: {e}") + return False + except KafkaException as e: + logger.error(f"Failed to produce message: {e}") + return False + except Exception as e: + logger.error(f"Unexpected error producing message: {e}") + return False + return False + + + def _delivery_callback(self, err, msg): + + if err is not None: + logger.error(f"Message delivery failed: {err}") + else: + logger.debug( + f"Message delivered: topic={msg.topic()}, " + f"partition={msg.partition()}, offset={msg.offset()}" + ) + + def flush(self, timeout: float = 10.0): + + remaining = self.producer.flush(timeout) + + if remaining > 0: + logger.warning(f"Flush incomplete: {remaining} messages still pending") + else: + logger.debug("All messages flushed successfully") + + def __del__(self): + if hasattr(self, 'producer'): + self.flush() + + \ No newline at end of file diff --git a/src/commentservice/server.py b/src/commentservice/server.py index 26416a0..cfdd9d1 100644 --- a/src/commentservice/server.py +++ b/src/commentservice/server.py @@ -1,5 +1,6 @@ import asyncio import logging +import signal from concurrent import futures import asyncpg @@ -11,6 +12,7 @@ from commentservice.repository.repository import CommentRepository from commentservice.service.service import CommentService from commentservice.settings import Settings +from commentservice.kafka.moderaiton_service import ModerationService async def serve() -> None: @@ -24,8 +26,10 @@ async def serve() -> None: max_size=10, ) + moderation_service = ModerationService() + repo = CommentRepository(db_pool) - service = CommentService(repo) + service = CommentService(repo, moderation_service) handler = CommentHandler(service) server = grpc.aio.server(futures.ThreadPoolExecutor(max_workers=5)) @@ -42,6 +46,7 @@ async def serve() -> None: server.add_insecure_port(f"{settings.host}:{settings.port}") await server.start() logger.info(f"gRPC server listening on {settings.host}:{settings.port}") + logger.info("Kafka consumer running in background") await server.wait_for_termination() diff --git a/src/commentservice/service/service.py b/src/commentservice/service/service.py index 98cd9f9..5369598 100644 --- a/src/commentservice/service/service.py +++ b/src/commentservice/service/service.py @@ -8,16 +8,22 @@ ) from commentservice.service.edit_comment import edit_comment as _edit_comment from commentservice.service.get_comments import get_comments as _get_comments +from commentservice.kafka.moderaiton_service import ModerationService class CommentService: - def __init__(self, repo: CommentRepository): + def __init__(self, repo: CommentRepository, moderation_service: ModerationService): self._repo = repo + self.moderation_service = moderation_service async def create_comment( self, mod_id: int, author_id: int, text: str ) -> int: - return await _create_comment(self._repo, mod_id, author_id, text) + + comment_id = await _create_comment(self._repo, mod_id, author_id, text) + + self.moderation_service.request_moderaiton(comment_id, text) + return comment_id async def edit_comment(self, comment_id: int, new_text: str) -> bool: return await _edit_comment(self._repo, comment_id, new_text) diff --git a/tools/common.just b/tools/common.just index b4cdc97..8c8a1ec 100644 --- a/tools/common.just +++ b/tools/common.just @@ -1,15 +1,19 @@ MKDIR := 'mkdir -p' -RM := 'rm -rf ' + TMP_DIR +RM-COMMENT := 'rm -rf ' + COMMENT_TMP_DIR +RM-MODERATION := 'rm -rf ' + MODERATION_TMP_DIR DOWN := 'curl -fsSL' DOWN_OUT := '-o' PY_FIX_IMPORTS := 'for f in ' + OUT_DIR + '/*_pb2_grpc.py; do [ -f "$f" ] && sed -i "s/^import \\(.*_pb2\\)/from . import \\1/" "$f"; done' clean: - {{RM}} + {{RM-MODERATION}} + {{RM-COMMENT}} fetch-proto: - {{MKDIR}} "{{TMP_DIR}}" - {{DOWN}} "https://raw.githubusercontent.com/esclient/protos/{{PROTO_TAG}}/{{PROTO_NAME}}" {{DOWN_OUT}} "{{TMP_DIR}}/{{PROTO_NAME}}" + {{MKDIR}} "{{COMMENT_TMP_DIR}}" + {{MKDIR}} "{{MODERATION_TMP_DIR}}" + {{DOWN}} "https://raw.githubusercontent.com/esclient/protos/{{MODERATION_PROTO_TAG}}/{{MODERATION_PROTO_NAME}}" {{DOWN_OUT}} "{{MODERATION_TMP_DIR}}/{{MODERATION_PROTO_NAME}}" + {{DOWN}} "https://raw.githubusercontent.com/esclient/protos/{{COMMENT_PROTO_TAG}}/{{COMMENT_PROTO_NAME}}" {{DOWN_OUT}} "{{COMMENT_TMP_DIR}}/{{COMMENT_PROTO_NAME}}" run: ENV=dev PYTHONPATH=src ./tools/load_envs.sh pdm run run-server @@ -17,15 +21,27 @@ run: gen-stubs: fetch-proto {{MKDIR}} "{{OUT_DIR}}" pdm run python -m grpc_tools.protoc \ - --proto_path="{{TMP_DIR}}" \ + --proto_path="{{COMMENT_TMP_DIR}}" \ --python_out="{{OUT_DIR}}" \ --grpc_python_out="{{OUT_DIR}}" \ --pyi_out="{{OUT_DIR}}" \ - "{{TMP_DIR}}/{{PROTO_NAME}}" + "{{COMMENT_TMP_DIR}}/{{COMMENT_PROTO_NAME}}" + {{PY_FIX_IMPORTS}} + +gen-stubs-moderation: fetch-proto + {{MKDIR}} "{{OUT_DIR}}" + pdm run python -m grpc_tools.protoc \ + --proto_path="{{MODERATION_TMP_DIR}}" \ + --python_out="{{OUT_DIR}}" \ + --grpc_python_out="{{OUT_DIR}}" \ + --pyi_out="{{OUT_DIR}}" \ + "{{MODERATION_TMP_DIR}}/{{MODERATION_PROTO_NAME}}" {{PY_FIX_IMPORTS}} update: gen-stubs clean +update-moderation: gen-stubs-moderation clean + format: black . isort .