Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions configs/dev.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,14 @@ DATABASE_URL: "postgresql://esclient_devcomm:oBFG3H!7XYSJCSNK@pg4.sweb.ru:5433/e
HOST: "0.0.0.0"
PORT: "7002"

KAFKA_BROKERS: "localhost:9092"
KAFKA_REQUEST_TOPIC: "moderation-request"
KAFKA_RESULT_TOPIC: "moderation-result"
KAFKA_CONSUMER_GROUP_ID: "moderation-consumer-group"
KAFKA_MAX_RETRIES: "3"
KAFKA_RETRY_BACKOFF_MS: "1000"
KAFKA_ENABLE_SSL: false

LOG_LEVEL: "INFO"
LOG_FORMAT: "%(asctime)s %(levelname)-8s [%(name)s] %(message)s"
LOG_DATEFMT: "%Y-%m-%d %H:%M:%S"
11 changes: 8 additions & 3 deletions justfile
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,17 @@ set dotenv-load := true
COMMON_JUST_URL := 'https://raw.githubusercontent.com/esclient/tools/refs/heads/main/python/common.just'
LOAD_ENVS_URL := 'https://raw.githubusercontent.com/esclient/tools/refs/heads/main/load_envs.sh'

PROTO_TAG := 'v0.0.17'
PROTO_NAME := 'comment.proto'
TMP_DIR := '.proto'
COMMENT_PROTO_TAG := 'v0.0.17'
COMMENT_PROTO_NAME := 'comment.proto'
COMMENT_TMP_DIR := '.proto'
SOURCE := 'commentservice'
OUT_DIR := 'src/' + SOURCE + '/grpc'

MODERATION_PROTO_TAG := 'v0.1.3'
MODERATION_PROTO_NAME := 'moderation.proto'
MODERATION_TMP_DIR := '.proto'
SERVICE_NAME := 'moderation'

MKDIR_TOOLS := 'mkdir -p tools'

FETCH_COMMON_JUST := 'curl -fsSL ' + COMMON_JUST_URL + ' -o tools/common.just'
Expand Down
21 changes: 21 additions & 0 deletions migrations/20251114232657_migrations_create_comments_table.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
-- +goose Up
-- +goose StatementBegin
CREATE TABLE IF NOT EXISTS comments (
id BIGSERIAL PRIMARY KEY,
mod_id BIGINT NOT NULL,
author_id BIGINT NOT NULL,
text TEXT NOT NULL,
status VARCHAR(50) DEFAULT 'pending_moderation',
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
edited_at TIMESTAMP WITH TIME ZONE
);

CREATE INDEX idx_comments_mod_id ON comments(mod_id);
CREATE INDEX idx_comments_author_id ON comments(author_id);
CREATE INDEX idx_comments_status ON comments(status);
-- +goose StatementEnd

-- +goose Down
-- +goose StatementBegin
DROP TABLE IF EXISTS comments CASCADE;
-- +goose StatementEnd
26 changes: 25 additions & 1 deletion pdm.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ dependencies = [
"psycopg2-binary==2.9.10",
"watchfiles==1.1.0",
"asyncpg>=0.30.0",
"confluent-kafka>=2.12.2",
]
requires-python = ">=3.13"
readme = "README.md"
Expand Down
42 changes: 42 additions & 0 deletions src/commentservice/grpc/moderation_pb2.py

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

33 changes: 33 additions & 0 deletions src/commentservice/grpc/moderation_pb2.pyi
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
from google.protobuf.internal import enum_type_wrapper as _enum_type_wrapper
from google.protobuf import descriptor as _descriptor
from google.protobuf import message as _message
from typing import ClassVar as _ClassVar, Optional as _Optional, Union as _Union

DESCRIPTOR: _descriptor.FileDescriptor

class ObjectType(int, metaclass=_enum_type_wrapper.EnumTypeWrapper):
__slots__ = ()
OBJECT_TYPE_UNSPECIFIED: _ClassVar[ObjectType]
OBJECT_TYPE_MOD_DESCRIPTION: _ClassVar[ObjectType]
OBJECT_TYPE_COMMENT_TEXT: _ClassVar[ObjectType]
OBJECT_TYPE_USER_NAME: _ClassVar[ObjectType]
OBJECT_TYPE_UNSPECIFIED: ObjectType
OBJECT_TYPE_MOD_DESCRIPTION: ObjectType
OBJECT_TYPE_COMMENT_TEXT: ObjectType
OBJECT_TYPE_USER_NAME: ObjectType

class ModerateObjectRequest(_message.Message):
__slots__ = ("id", "type", "text")
ID_FIELD_NUMBER: _ClassVar[int]
TYPE_FIELD_NUMBER: _ClassVar[int]
TEXT_FIELD_NUMBER: _ClassVar[int]
id: int
type: ObjectType
text: str
def __init__(self, id: _Optional[int] = ..., type: _Optional[_Union[ObjectType, str]] = ..., text: _Optional[str] = ...) -> None: ...

class ModerateObjectResponse(_message.Message):
__slots__ = ("success",)
SUCCESS_FIELD_NUMBER: _ClassVar[int]
success: bool
def __init__(self, success: bool = ...) -> None: ...
97 changes: 97 additions & 0 deletions src/commentservice/grpc/moderation_pb2_grpc.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
# Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT!
"""Client and server classes corresponding to protobuf-defined services."""
import grpc
import warnings

from . import moderation_pb2 as moderation__pb2

GRPC_GENERATED_VERSION = '1.75.1'
GRPC_VERSION = grpc.__version__
_version_not_supported = False

try:
from grpc._utilities import first_version_is_lower
_version_not_supported = first_version_is_lower(GRPC_VERSION, GRPC_GENERATED_VERSION)
except ImportError:
_version_not_supported = True

if _version_not_supported:
raise RuntimeError(
f'The grpc package installed is at version {GRPC_VERSION},'
+ f' but the generated code in moderation_pb2_grpc.py depends on'
+ f' grpcio>={GRPC_GENERATED_VERSION}.'
+ f' Please upgrade your grpc module to grpcio>={GRPC_GENERATED_VERSION}'
+ f' or downgrade your generated code using grpcio-tools<={GRPC_VERSION}.'
)


class ModerationServiceStub(object):
"""Missing associated documentation comment in .proto file."""

def __init__(self, channel):
"""Constructor.

Args:
channel: A grpc.Channel.
"""
self.ModerateObject = channel.unary_unary(
'/moderation.ModerationService/ModerateObject',
request_serializer=moderation__pb2.ModerateObjectRequest.SerializeToString,
response_deserializer=moderation__pb2.ModerateObjectResponse.FromString,
_registered_method=True)


class ModerationServiceServicer(object):
"""Missing associated documentation comment in .proto file."""

def ModerateObject(self, request, context):
"""Missing associated documentation comment in .proto file."""
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')


def add_ModerationServiceServicer_to_server(servicer, server):
rpc_method_handlers = {
'ModerateObject': grpc.unary_unary_rpc_method_handler(
servicer.ModerateObject,
request_deserializer=moderation__pb2.ModerateObjectRequest.FromString,
response_serializer=moderation__pb2.ModerateObjectResponse.SerializeToString,
),
}
generic_handler = grpc.method_handlers_generic_handler(
'moderation.ModerationService', rpc_method_handlers)
server.add_generic_rpc_handlers((generic_handler,))
server.add_registered_method_handlers('moderation.ModerationService', rpc_method_handlers)


# This class is part of an EXPERIMENTAL API.
class ModerationService(object):
"""Missing associated documentation comment in .proto file."""

@staticmethod
def ModerateObject(request,
target,
options=(),
channel_credentials=None,
call_credentials=None,
insecure=False,
compression=None,
wait_for_ready=None,
timeout=None,
metadata=None):
return grpc.experimental.unary_unary(
request,
target,
'/moderation.ModerationService/ModerateObject',
moderation__pb2.ModerateObjectRequest.SerializeToString,
moderation__pb2.ModerateObjectResponse.FromString,
options,
channel_credentials,
insecure,
call_credentials,
compression,
wait_for_ready,
timeout,
metadata,
_registered_method=True)
38 changes: 38 additions & 0 deletions src/commentservice/kafka/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import os

class KafkaConfig:

def __init__(self):
self.brokers = os.getenv('KAFKA_BROKERS', 'localhost:9092')
self.request_topic = os.getenv('KAFKA_REQUEST_TOPIC', 'moderation-request')
self.result_topic = os.getenv('KAFKA_RESULT', 'moderation-result')
self.consumer_group_id = os.getenv('KAFKA_CONSUMER_GROUP_ID', 'comment-service-group')
self.max_retries = int(os.getenv('KAFKA_MAX_RETRIES', '3'))
self.retry_backoff_ms = int(os.getenv('KAFKA_RETRY_BACKOFF_MS', '1000'))
self.enable_ssl = os.getenv('KAFKA_ENABLE_SSL', 'false').lower() == 'true'

def get_producer_config(self):

config = {
'bootstrap.servers': self.brokers,
'client.id': 'comment_service_producer',
'acks': 'all',
'retries': self.max_retries,
'retry.backoff.ms': self.retry_backoff_ms,
'enable.idempotence': True,
'compression.type': 'none'
}
return config

def get_consumer_config(self):

config = {
'bootstrap.servers': self.brokers,
'group.id': self.consumer_group_id,
'auto.offset.reset': 'earliest',
'enable.auto.commit': True,
'auto.commit.interval.ms': 1000,
'session.timeout.ms': 30000,
'heartbeat.interval.ms': 10000
}
return config
Loading
Loading