From 1ecba5abc574d57e246a5353e79886cca88f43e4 Mon Sep 17 00:00:00 2001 From: MystiSs Date: Fri, 10 Oct 2025 18:21:58 +0300 Subject: [PATCH 1/7] =?UTF-8?q?=D0=A0=D0=B5=D1=84=D0=B0=D0=BA=D1=82=D0=BE?= =?UTF-8?q?=D1=80=D0=B8=D0=BD=D0=B3=20grpc=20=D0=BA=D0=BB=D0=B8=D0=B5?= =?UTF-8?q?=D0=BD=D1=82=D0=BE=D0=B2?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- configs/dev.yaml | 8 +- src/gateway/clients/base_client.py | 136 ++++++++++++++++++++++ src/gateway/clients/client_factory.py | 68 +++++++++++ src/gateway/clients/comment.py | 109 +++++++++++++---- src/gateway/clients/mod.py | 113 +++++++++++++----- src/gateway/clients/rating.py | 69 ++++++++--- src/gateway/esclient_graphql.py | 7 ++ src/gateway/resolvers/mutation/comment.py | 14 +-- src/gateway/resolvers/mutation/mod.py | 7 +- src/gateway/resolvers/mutation/rating.py | 5 +- src/gateway/resolvers/query/comment.py | 5 +- src/gateway/resolvers/query/mod.py | 7 +- src/gateway/server.py | 18 ++- 13 files changed, 474 insertions(+), 92 deletions(-) create mode 100644 src/gateway/clients/base_client.py create mode 100644 src/gateway/clients/client_factory.py create mode 100644 src/gateway/esclient_graphql.py diff --git a/configs/dev.yaml b/configs/dev.yaml index 900a974..e843b98 100644 --- a/configs/dev.yaml +++ b/configs/dev.yaml @@ -1,11 +1,11 @@ HOST: 0.0.0.0 -PORT: 8000 +PORT: 7776 -COMMENT_SERVICE_HOST: 0.0.0.0 +COMMENT_SERVICE_HOST: localhost COMMENT_SERVICE_PORT: 7002 -RATING_SERVICE_HOST: 0.0.0.0 +RATING_SERVICE_HOST: localhost RATING_SERVICE_PORT: 7003 -MOD_SERVICE_HOST: 0.0.0.0 +MOD_SERVICE_HOST: localhost MOD_SERVICE_PORT: 7004 diff --git a/src/gateway/clients/base_client.py b/src/gateway/clients/base_client.py new file mode 100644 index 0000000..fb69f9c --- /dev/null +++ b/src/gateway/clients/base_client.py @@ -0,0 +1,136 @@ +import grpc +import grpc.aio +import logging +from typing import Dict, Any +from google.protobuf import message as _message + + +logger = logging.getLogger(__name__) + +class GRPCError(Exception): ... + +class RPCDoesNotExistException(GRPCError): + """Исключение когда RPC метод не найден в gRPC стабе""" + +class GRPCClient: + _RPC_REQUEST_CLASSES = { } + + def __init__(self, channel: grpc.Channel): + """ + Args: + channel: gRPC канал для соединения + """ + self._channel = channel + self._stub = None + self._initialize_stub() + + def _initialize_stub(self): + """Инициализация стаба""" + raise NotImplementedError() + + def call_rpc(self, rpc_name: str, request_data: Dict[str, Any], timeout: int = 30) -> _message.Message: + """ + Универсальный метод вызова RPC + + Args: + rpc_name: имя RPC метода + request_data: данные для запроса + timeout: таймаут в секундах + + Returns: + gRPC response message + """ + try: + rpc_method = self._get_rpc_method(rpc_name) + + request = self._create_request(rpc_name, request_data) + + logger.debug(f"Вызов gRPC: {rpc_name} Данные: {request_data}") + response = rpc_method(request, timeout=timeout) + + return response + + except grpc.RpcError as e: + logger.error(f"gRPC ошибка {rpc_name}: {e}") + raise GRPCError(f"gRPC Ошибка вызова: {e}") from e + except Exception as e: + logger.error(f"Неизвестная ошибка при вызове {rpc_name}: {e}") + raise + + def _get_rpc_method(self, rpc_name: str): + """Получаем RPC метод из стаба""" + rpc_method = getattr(self._stub, rpc_name, None) + if rpc_method is None: + raise RPCDoesNotExistException(f"RPC метод '{rpc_name}' не найден в стабе") + return rpc_method + + def _create_request(self, rpc_name: str, request_data: Dict[str, Any]) -> _message.Message: + """ + Создает gRPC request message из словаря + """ + raise NotImplementedError() + + def close(self): + """Закрывает соединение""" + if self._channel: + self._channel.close() + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self.close() + +class AsyncGRPCClient: + def __init__(self, channel: grpc.aio.Channel): + self._channel = channel + self._stub = None + self._initialize_stub() + + def _initialize_stub(self): + raise NotImplementedError() + + async def call_rpc(self, rpc_name: str, request_data: Dict[str, Any], timeout: int = 30) -> _message.Message: + """ + Универсальный метод вызова RPC (async) + + Args: + rpc_name: имя RPC метода + request_data: данные для запроса + timeout: таймаут в секундах + + Returns: + gRPC response message + """ + try: + rpc_method = self._get_rpc_method(rpc_name) + request = self._create_request(rpc_name, request_data) + + response = await rpc_method(request, timeout=timeout) + return response + + except grpc.RpcError as e: + logger.error(f"gRPC ошибка {rpc_name}: {e} (async)") + raise GRPCError(f"gRPC Ошибка вызова: {e} (async)") from e + except Exception as e: + logger.error(f"Неизвестная ошибка при вызове {rpc_name}: {e} (async)") + raise + + def _get_rpc_method(self, rpc_name: str): + rpc_method = getattr(self._stub, rpc_name, None) + if not rpc_method: + raise RPCDoesNotExistException(f"RPC метод '{rpc_name}' не найден в стабе (async)") + return rpc_method + + def _create_request(self, rpc_name: str, request_data: Dict[str, Any]) -> _message.Message: + raise NotImplementedError() + + async def close(self): + if self._channel: + await self._channel.close() + + async def __aenter__(self): + return self + + async def __aexit__(self, exc_type, exc_val, exc_tb): + await self.close() diff --git a/src/gateway/clients/client_factory.py b/src/gateway/clients/client_factory.py new file mode 100644 index 0000000..709bd7a --- /dev/null +++ b/src/gateway/clients/client_factory.py @@ -0,0 +1,68 @@ +import grpc +from gateway.settings import Settings +from gateway.clients.comment import CommentServiceClient, AsyncCommentServiceClient +from gateway.clients.mod import ModServiceClient, AsyncModServiceClient +from gateway.clients.rating import RatingServiceClient, AsyncRatingServiceClient + + +class GRPCClientFactory: + def __init__(self, settings: Settings): + self._settings = settings + self._channels = {} # На всякий случай + + def _add_to_channels_if_needed(self, url): + if url not in self._channels: + self._channels[url] = grpc.insecure_channel(url) + + def get_comment_client(self) -> CommentServiceClient: + url = self._settings.comment_service_url + self._add_to_channels_if_needed(url) + + return CommentServiceClient(self._channels[url]) + + def get_mod_client(self) -> ModServiceClient: + url = self._settings.mod_service_url + self._add_to_channels_if_needed(url) + + return ModServiceClient(self._channels[url]) + + def get_rating_client(self) -> RatingServiceClient: + url = self._settings.rating_service_url + self._add_to_channels_if_needed(url) + + return RatingServiceClient(self._channels[url]) + + def close_all(self): + for channel in self._channels.values(): + channel.close() + +class AsyncGRPCClientFactory: + def __init__(self, settings: Settings): + self._settings = settings + self._channels = {} # На всякий случай + + def _add_to_channels_if_needed(self, url): + if url not in self._channels: + self._channels[url] = grpc.aio.insecure_channel(url) + + async def get_comment_client(self) -> AsyncCommentServiceClient: + url = self._settings.comment_service_url + self._add_to_channels_if_needed(url) + + return AsyncCommentServiceClient(self._channels[url]) + + async def get_mod_client(self) -> AsyncModServiceClient: + url = self._settings.mod_service_url + self._add_to_channels_if_needed(url) + + return AsyncModServiceClient(self._channels[url]) + + async def get_rating_client(self) -> AsyncRatingServiceClient: + url = self._settings.rating_service_url + self._add_to_channels_if_needed(url) + + return AsyncRatingServiceClient(self._channels[url]) + + async def close_all(self): + for channel in self._channels.values(): + channel.close() diff --git a/src/gateway/clients/comment.py b/src/gateway/clients/comment.py index 8edb9f5..d2dcb3d 100644 --- a/src/gateway/clients/comment.py +++ b/src/gateway/clients/comment.py @@ -1,33 +1,92 @@ -import grpc +from gateway.stubs import comment_pb2, comment_pb2_grpc +from google.protobuf import message as _message +from typing import Dict, Any +from .base_client import GRPCClient, AsyncGRPCClient -import gateway.stubs.comment_pb2 -import gateway.stubs.comment_pb2_grpc -from gateway.settings import Settings -_settings = Settings() -_channel = grpc.insecure_channel(_settings.comment_service_url) -_stub = gateway.stubs.comment_pb2_grpc.CommentServiceStub(_channel) # type: ignore +class CommentServiceClient(GRPCClient): + _RPC_REQUEST_CLASSES = { + "CreateComment": comment_pb2.CreateCommentRequest, + "EditComment": comment_pb2.EditCommentRequest, + "DeleteComment": comment_pb2.DeleteCommentRequest, + "GetComments": comment_pb2.GetCommentsRequest, + } + def _initialize_stub(self): + self._stub = comment_pb2_grpc.CommentServiceStub(self._channel) -def create_comment_rpc(mod_id: int, author_id: int, text: str) -> gateway.stubs.comment_pb2.CreateCommentResponse: - req = gateway.stubs.comment_pb2.CreateCommentRequest(mod_id=mod_id, author_id=author_id, text=text) - return _stub.CreateComment(req) # type: ignore + def _create_request(self, rpc_name: str, request_data: Dict[str, Any]) -> _message.Message: + request_class = CommentServiceClient._RPC_REQUEST_CLASSES.get(rpc_name) + if not request_class: + raise ValueError(f"Неизветсный RPC метод: {rpc_name}") + + return request_class(**request_data) + + # Вообще я написал универсальный "call_rpc". Но обёртки никто не отменял :) + # Как минимум благодаря обёртке мы можем проставить типизацию + def create_comment(self, mod_id: int, author_id: int, text: str) -> comment_pb2.CreateCommentResponse: + return self.call_rpc("CreateComment", { + "mod_id": mod_id, + "author_id": author_id, + "text": text + }) + + def edit_comment(self, comment_id: int, text: str) -> comment_pb2.EditCommentResponse: + return self.call_rpc("EditComment", { + "comment_id": comment_id, + "text": text + }) + + def delete_comment(self, comment_id: int) -> comment_pb2.DeleteCommentResponse: + return self.call_rpc("DeleteComment", { + "comment_id": comment_id + }) + + def get_comments(self, mod_id: int) -> comment_pb2.GetCommentsResponse: + return self.call_rpc("GetComments", { + "mod_id": mod_id + }) + +class AsyncCommentServiceClient(AsyncGRPCClient): + _RPC_REQUEST_CLASSES = { + "CreateComment": comment_pb2.CreateCommentRequest, + "EditComment": comment_pb2.EditCommentRequest, + "DeleteComment": comment_pb2.DeleteCommentRequest, + "GetComments": comment_pb2.GetCommentsRequest, + } -def edit_comment_rpc(comment_id: int, text: str) -> gateway.stubs.comment_pb2.EditCommentResponse: - req = gateway.stubs.comment_pb2.EditCommentRequest(comment_id=comment_id, text=text) - return _stub.EditComment(req) # type: ignore + def _initialize_stub(self): + self._stub = comment_pb2_grpc.CommentServiceStub(self._channel) + def _create_request(self, rpc_name: str, request_data: Dict[str, Any]) -> _message.Message: + request_class = CommentServiceClient._RPC_REQUEST_CLASSES.get(rpc_name) + if not request_class: + raise ValueError(f"Неизветсный RPC метод: {rpc_name}") + + return request_class(**request_data) + + # *** # -def delete_comment_rpc( - comment_id: int, -) -> gateway.stubs.comment_pb2.DeleteCommentResponse: - req = gateway.stubs.comment_pb2.DeleteCommentRequest(comment_id=comment_id) - return _stub.DeleteComment(req) # type: ignore - - -def get_comments_rpc( - mod_id: int, -) -> gateway.stubs.comment_pb2.GetCommentsResponse: - req = gateway.stubs.comment_pb2.GetCommentsRequest(mod_id=mod_id) - return _stub.GetComments(req) # type: ignore + async def create_comment(self, mod_id: int, author_id: int, text: str) -> comment_pb2.CreateCommentResponse: + return self.call_rpc("CreateComment", { + "mod_id": mod_id, + "author_id": author_id, + "text": text + }) + + async def edit_comment(self, comment_id: int, text: str) -> comment_pb2.EditCommentResponse: + return self.call_rpc("EditComment", { + "comment_id": comment_id, + "text": text + }) + + async def delete_comment(self, comment_id: int) -> comment_pb2.DeleteCommentResponse: + return self.call_rpc("DeleteComment", { + "comment_id": comment_id + }) + + async def get_comments(self, mod_id: int) -> comment_pb2.GetCommentsResponse: + return self.call_rpc("GetComments", { + "mod_id": mod_id + }) \ No newline at end of file diff --git a/src/gateway/clients/mod.py b/src/gateway/clients/mod.py index 52143ed..efcae0d 100644 --- a/src/gateway/clients/mod.py +++ b/src/gateway/clients/mod.py @@ -1,39 +1,94 @@ -import grpc +from gateway.stubs import mod_pb2, mod_pb2_grpc +from google.protobuf import message as _message +from typing import Dict, Any +from .base_client import GRPCClient, AsyncGRPCClient -import gateway.stubs.mod_pb2 -import gateway.stubs.mod_pb2_grpc -from gateway.converters.mod_status_converter import graphql_to_proto_mod_status -from gateway.settings import Settings +class ModServiceClient(GRPCClient): + _RPC_REQUEST_CLASSES = { + "CreateMod": mod_pb2.CreateModRequest, + "SetStatus": mod_pb2.SetStatusRequest, + "GetModDownloadLink": mod_pb2.GetModDownloadLinkRequest, + "GetMods": mod_pb2.GetModsRequest, + } -_settings = Settings() -_channel = grpc.insecure_channel(_settings.mod_service_url) -_stub = gateway.stubs.mod_pb2_grpc.ModServiceStub(_channel) # type: ignore + def _initialize_stub(self): + self._stub = mod_pb2_grpc.ModServiceStub(self._channel) + def _create_request(self, rpc_name: str, request_data: Dict[str, Any]) -> _message.Message: + request_class = ModServiceClient._RPC_REQUEST_CLASSES.get(rpc_name) + if not request_class: + raise ValueError(f"Неизветсный RPC метод: {rpc_name}") + + return request_class(**request_data) + + # *** # -def create_mod_rpc( - title: str, author_id: int, filename: str, description: str -) -> gateway.stubs.mod_pb2.CreateModResponse: - req = gateway.stubs.mod_pb2.CreateModRequest( - title=title, - author_id=author_id, - filename=filename, - description=description, - ) - return _stub.CreateMod(req) # type: ignore + def create_mod(self, title: str, author_id: int, filename: str, description: str) -> mod_pb2.CreateModResponse: + return self.call_rpc("CreateMod", { + "title": title, + "author_id": author_id, + "filename": filename, + "description": description + }) -def set_status_mod_rpc(mod_id: int, status: str) -> gateway.stubs.mod_pb2.SetStatusResponse: - req = gateway.stubs.mod_pb2.SetStatusRequest(mod_id=mod_id, status=graphql_to_proto_mod_status(status)) # type: ignore - return _stub.SetStatus(req) # type: ignore + def set_status_mod(self, mod_id: int, status: str) -> mod_pb2.SetStatusResponse: + return self.call_rpc("SetStatus", { + "mod_id": mod_id, + "status": status + }) -def get_mod_download_link_rpc( - mod_id: int, -) -> gateway.stubs.mod_pb2.GetModDownloadLinkResponse: - req = gateway.stubs.mod_pb2.GetModDownloadLinkRequest(mod_id=mod_id) - return _stub.GetModDownloadLink(req) # type: ignore + def get_mod_download_link(self, mod_id: int) -> mod_pb2.GetModDownloadLinkResponse: + return self.call_rpc("GetModDownloadLink", { + "mod_id": mod_id + }) -def get_mods_rpc() -> gateway.stubs.mod_pb2.GetModsResponse: - req = gateway.stubs.mod_pb2.GetModsRequest() - return _stub.GetMods(req) # type: ignore + def get_mods(self) -> mod_pb2.GetModsResponse: + return self.call_rpc("GetMods", {}) + +class AsyncModServiceClient(AsyncGRPCClient): + _RPC_REQUEST_CLASSES = { + "CreateMod": mod_pb2.CreateModRequest, + "SetStatus": mod_pb2.SetStatusRequest, + "GetModDownloadLink": mod_pb2.GetModDownloadLinkRequest, + "GetMods": mod_pb2.GetModsRequest, + } + + def _initialize_stub(self): + self._stub = mod_pb2_grpc.ModServiceStub(self._channel) + + def _create_request(self, rpc_name: str, request_data: Dict[str, Any]) -> _message.Message: + request_class = ModServiceClient._RPC_REQUEST_CLASSES.get(rpc_name) + if not request_class: + raise ValueError(f"Неизветсный RPC метод: {rpc_name}") + + return request_class(**request_data) + + # *** # + + async def create_mod(self, title: str, author_id: int, filename: str, description: str) -> mod_pb2.CreateModResponse: + return self.call_rpc("CreateMod", { + "title": title, + "author_id": author_id, + "filename": filename, + "description": description + }) + + + async def set_status_mod(self, mod_id: int, status: str) -> mod_pb2.SetStatusResponse: + return self.call_rpc("SetStatus", { + "mod_id": mod_id, + "status": status + }) + + + async def get_mod_download_link_rpc(self, mod_id: int) -> mod_pb2.GetModDownloadLinkResponse: + return self.call_rpc("GetModDownloadLink", { + "mod_id": mod_id + }) + + + async def get_mods(self) -> mod_pb2.GetModsResponse: + return self.call_rpc("GetMods", {}) diff --git a/src/gateway/clients/rating.py b/src/gateway/clients/rating.py index 822388b..679b7e8 100644 --- a/src/gateway/clients/rating.py +++ b/src/gateway/clients/rating.py @@ -1,19 +1,62 @@ -import grpc +from gateway.stubs import rating_pb2, rating_pb2_grpc +from google.protobuf import message as _message +from typing import Dict, Any +from .base_client import GRPCClient, AsyncGRPCClient -import gateway.stubs.rating_pb2 -import gateway.stubs.rating_pb2_grpc -from gateway.settings import Settings +class RatingServiceClient(GRPCClient): + _RPC_REQUEST_CLASSES = { + "RateMod": rating_pb2.RateModRequest, + "GetRates": rating_pb2.GetRatesRequest, + } -_settings = Settings() -_channel = grpc.insecure_channel(_settings.rating_service_url) -_stub = gateway.stubs.rating_pb2_grpc.RatingServiceStub(_channel) # type: ignore + def _initialize_stub(self): + self._stub = rating_pb2_grpc.RatingServiceStub(self._channel) + def _create_request(self, rpc_name: str, request_data: Dict[str, Any]) -> _message.Message: + request_class = RatingServiceClient._RPC_REQUEST_CLASSES.get(rpc_name) + if not request_class: + raise ValueError(f"Неизветсный RPC метод: {rpc_name}") + + return request_class(**request_data) + + # *** # -def rate_mod_rpc(mod_id: int, author_id: int, rate: str) -> gateway.stubs.rating_pb2.RateModResponse: - req = gateway.stubs.rating_pb2.RateModRequest(mod_id=mod_id, author_id=author_id, rate=rate) - return _stub.RateMod(req) # type: ignore + def rate_mod(self, mod_id: int, author_id: int, rate: str) -> rating_pb2.RateModResponse: + return self.call_rpc("RateMod", { + "mod_id": mod_id, + "author_id": author_id, + "rate": rate + }) -def get_rates_rpc(mod_id: int) -> gateway.stubs.rating_pb2.GetRatesResponse: - req = gateway.stubs.rating_pb2.GetRatesRequest(mod_id=mod_id) - return _stub.GetRates(req) # type: ignore + def get_rates(self, mod_id: int) -> rating_pb2.GetRatesResponse: + return self.call_rpc("GetRates", { "mod_id": mod_id }) + +class AsyncRatingServiceClient(AsyncGRPCClient): + _RPC_REQUEST_CLASSES = { + "RateMod": rating_pb2.RateModRequest, + "GetRates": rating_pb2.GetRatesRequest, + } + + def _initialize_stub(self): + self._stub = rating_pb2_grpc.RatingServiceStub(self._channel) + + def _create_request(self, rpc_name: str, request_data: Dict[str, Any]) -> _message.Message: + request_class = RatingServiceClient._RPC_REQUEST_CLASSES.get(rpc_name) + if not request_class: + raise ValueError(f"Неизветсный RPC метод: {rpc_name}") + + return request_class(**request_data) + + # *** # + + async def rate_mod(self, mod_id: int, author_id: int, rate: str) -> rating_pb2.RateModResponse: + return self.call_rpc("RateMod", { + "mod_id": mod_id, + "author_id": author_id, + "rate": rate + }) + + + async def get_rates(self, mod_id: int) -> rating_pb2.GetRatesResponse: + return self.call_rpc("GetRates", { "mod_id": mod_id }) diff --git a/src/gateway/esclient_graphql.py b/src/gateway/esclient_graphql.py new file mode 100644 index 0000000..ecf31c3 --- /dev/null +++ b/src/gateway/esclient_graphql.py @@ -0,0 +1,7 @@ +class GQLContextViewer: + def __init__(self): + self.clients = { } + + def get_current(self, request): + return { "request": request, "clients": self.clients } + \ No newline at end of file diff --git a/src/gateway/resolvers/mutation/comment.py b/src/gateway/resolvers/mutation/comment.py index 1596843..98d779e 100644 --- a/src/gateway/resolvers/mutation/comment.py +++ b/src/gateway/resolvers/mutation/comment.py @@ -4,11 +4,6 @@ from graphql import GraphQLResolveInfo from pydantic import BaseModel, field_validator -from gateway.clients.comment import ( - create_comment_rpc, - delete_comment_rpc, - edit_comment_rpc, -) from gateway.helpers.id_helper import validate_and_convert_id comment_mutation = ObjectType("CommentMutation") @@ -31,7 +26,8 @@ def _author_id(cls, v: Any) -> int: @comment_mutation.field("createComment") def resolve_create_comment(parent: object, info: GraphQLResolveInfo, input: CreateCommentInput) -> str: data = CreateCommentInput.model_validate(input) - resp = create_comment_rpc(data.mod_id, data.author_id, data.text) + client = info.context["clients"]["comment_service"] + resp = client.create_comment(data.mod_id, data.author_id, data.text) return str(resp.comment_id) @@ -47,7 +43,8 @@ def _comment_id(cls, v: Any) -> int: @comment_mutation.field("editComment") def resolve_edit_comment(parent: object, info: GraphQLResolveInfo, input: EditCommentInput) -> bool: data = EditCommentInput.model_validate(input) - resp = edit_comment_rpc(data.comment_id, data.text) + client = info.context["clients"]["comment_service"] + resp = client.edit_comment(data.comment_id, data.text) return resp.success @@ -62,5 +59,6 @@ def _comment_id(cls, v: Any) -> int: @comment_mutation.field("deleteComment") def resolve_delete_comment(parent: object, info: GraphQLResolveInfo, input: DeleteCommentInput) -> bool: data = DeleteCommentInput.model_validate(input) - resp = delete_comment_rpc(data.comment_id) + client = info.context["clients"]["comment_service"] + resp = client.delete_comment(data.comment_id) return resp.success diff --git a/src/gateway/resolvers/mutation/mod.py b/src/gateway/resolvers/mutation/mod.py index b6117ba..b0b22c9 100644 --- a/src/gateway/resolvers/mutation/mod.py +++ b/src/gateway/resolvers/mutation/mod.py @@ -5,7 +5,6 @@ from graphql import GraphQLResolveInfo from pydantic import BaseModel, field_validator -from gateway.clients.mod import create_mod_rpc, set_status_mod_rpc from gateway.helpers.id_helper import validate_and_convert_id mod_mutation = ObjectType("ModMutation") @@ -38,7 +37,8 @@ class CreateModResult(BaseModel): @mod_mutation.field("createMod") def resolve_create_mod(parent: object, info: GraphQLResolveInfo, input: CreateModInput) -> dict[str, Any]: data = CreateModInput.model_validate(input) - resp = create_mod_rpc(data.title, data.author_id, data.filename, data.description) + client = info.context["clients"]["mod_service"] + resp = client.create_mod(data.title, data.author_id, data.filename, data.description) return CreateModResult(mod_id=resp.mod_id, s3_key=resp.s3_key, upload_url=resp.upload_url).model_dump() @@ -54,5 +54,6 @@ def validate_mod_id(cls, v: Any) -> int: @mod_mutation.field("setStatus") def resolve_set_status_mod(parent: object, info: GraphQLResolveInfo, input: SetStatusInput) -> bool: data = SetStatusInput.model_validate(input) - resp = set_status_mod_rpc(data.mod_id, data.status.value) + client = info.context["clients"]["mod_service"] + resp = client.set_status_mod(data.mod_id, data.status.value) return resp.success diff --git a/src/gateway/resolvers/mutation/rating.py b/src/gateway/resolvers/mutation/rating.py index a57bc48..7f83418 100644 --- a/src/gateway/resolvers/mutation/rating.py +++ b/src/gateway/resolvers/mutation/rating.py @@ -5,7 +5,6 @@ from graphql import GraphQLResolveInfo from pydantic import BaseModel, field_validator -from gateway.clients.rating import rate_mod_rpc from gateway.helpers.id_helper import validate_and_convert_id rating_mutation = ObjectType("RatingMutation") @@ -33,9 +32,9 @@ def validate_mod_id(cls, v: Any) -> int: def validate_author_id(cls, v: Any) -> int: return validate_and_convert_id(v, "author_id") - @rating_mutation.field("addRate") def resolve_add_rate(parent: object, info: GraphQLResolveInfo, input: AddRateInput) -> str: data = AddRateInput.model_validate(input) - resp = rate_mod_rpc(data.mod_id, data.author_id, data.rate.value) + client = info.context["clients"]["rating_service"] + resp = client.rate_mod(data.mod_id, data.author_id, data.rate.value) return str(resp.rate_id) diff --git a/src/gateway/resolvers/query/comment.py b/src/gateway/resolvers/query/comment.py index 8001d0c..039df7e 100644 --- a/src/gateway/resolvers/query/comment.py +++ b/src/gateway/resolvers/query/comment.py @@ -4,10 +4,8 @@ from graphql import GraphQLResolveInfo from pydantic import BaseModel, field_validator -from gateway.clients.comment import get_comments_rpc from gateway.helpers.id_helper import validate_and_convert_id - class GetCommentsInput(BaseModel): mod_id: int @@ -34,7 +32,8 @@ def _edited_at(cls, v: Any) -> Any | None: @comment_query.field("getComments") def resolve_get_comments(parent: object, info: GraphQLResolveInfo, input: GetCommentsInput) -> list[dict[str, Any]]: data = GetCommentsInput.model_validate(input) - resp = get_comments_rpc(data.mod_id) + client = info.context["clients"]["comment_service"] + resp = client.get_comments(data.mod_id) return [ GetCommentsResult( id=item.id, diff --git a/src/gateway/resolvers/query/mod.py b/src/gateway/resolvers/query/mod.py index 138b44b..96113db 100644 --- a/src/gateway/resolvers/query/mod.py +++ b/src/gateway/resolvers/query/mod.py @@ -4,7 +4,6 @@ from graphql import GraphQLResolveInfo from pydantic import BaseModel, field_validator -from gateway.clients.mod import get_mod_download_link_rpc, get_mods_rpc from gateway.converters.mod_status_converter import proto_to_graphql_mod_status from gateway.helpers.id_helper import validate_and_convert_id @@ -23,13 +22,15 @@ def _mod_id(cls, v: Any) -> int: @mod_query.field("getModDownloadLink") def resolve_get_mod_download_link(parent: object, info: GraphQLResolveInfo, input: GetModDownloadLinkInput) -> str: data = GetModDownloadLinkInput.model_validate(input) - resp = get_mod_download_link_rpc(data.mod_id) + client = info.context["clients"]["mod_service"] + resp = client.get_mod_download_link(data.mod_id) return resp.link_url @mod_query.field("getMods") def resolve_get_mods(parent: object, info: GraphQLResolveInfo) -> list[dict[str, Any]]: - resp = get_mods_rpc() + client = info.context["clients"]["mod_service"] + resp = client.get_mods() return [ { "id": item.id, diff --git a/src/gateway/server.py b/src/gateway/server.py index 02b4063..7118e7c 100644 --- a/src/gateway/server.py +++ b/src/gateway/server.py @@ -1,14 +1,16 @@ import uvicorn -from ariadne import load_schema_from_path, make_executable_schema from ariadne.asgi import GraphQL +from ariadne import load_schema_from_path, make_executable_schema from ariadne.explorer import ExplorerGraphiQL +from .esclient_graphql import ESClientGQL, GQLContextViewer from gateway.resolvers.mutation.comment import comment_mutation from gateway.resolvers.mutation.mod import mod_mutation from gateway.resolvers.mutation.root import mutation from gateway.resolvers.query.comment import comment_query from gateway.resolvers.query.mod import mod_query from gateway.resolvers.query.root import query +from gateway.clients.client_factory import GRPCClientFactory, AsyncGRPCClientFactory from gateway.settings import Settings settings = Settings() @@ -25,12 +27,26 @@ mod_mutation, ) +context_viewer = GQLContextViewer() + app = GraphQL( schema, debug=True, explorer=ExplorerGraphiQL(), + context_value=context_viewer.get_current ) +sync_clients_factory = GRPCClientFactory(settings) +async_clients_factory = AsyncGRPCClientFactory(settings) + +#<Потом это можно будет заменить балансировщиком># +#<Я ваще не понимаю, что я делаю 🙏🙏🙏># +context_viewer.clients = { + "comment_service": sync_clients_factory.get_comment_client(), + "mod_service": sync_clients_factory.get_mod_client(), + "rating_service": sync_clients_factory.get_rating_client() +} + if __name__ == "__main__": uvicorn.run( "gateway.server:app", From 78563c51f956d2c9a7e3a88003bf86a7bb511168 Mon Sep 17 00:00:00 2001 From: MystiSs Date: Fri, 10 Oct 2025 19:40:28 +0300 Subject: [PATCH 2/7] =?UTF-8?q?=D0=9B=D0=B8=D0=BD=D1=82=D0=B5=D1=80=D1=8B?= =?UTF-8?q?=20=D1=83=D1=85=D1=83=D0=B4=D1=88=D0=B8=D0=BB=D0=B8=20=D0=BA?= =?UTF-8?q?=D0=BE=D0=B4=20(=D1=83=D0=B2=D1=8B)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/gateway/clients/base_client.py | 95 ++++++++++++---------- src/gateway/clients/client_factory.py | 38 ++++----- src/gateway/clients/comment.py | 97 ++++++++++------------- src/gateway/clients/mod.py | 94 ++++++++++------------ src/gateway/clients/rating.py | 60 +++++++------- src/gateway/esclient_graphql.py | 14 ++-- src/gateway/resolvers/mutation/comment.py | 4 +- src/gateway/resolvers/mutation/mod.py | 2 +- src/gateway/resolvers/mutation/rating.py | 1 + src/gateway/resolvers/query/comment.py | 1 + src/gateway/resolvers/query/mod.py | 2 +- src/gateway/server.py | 20 ++--- 12 files changed, 203 insertions(+), 225 deletions(-) diff --git a/src/gateway/clients/base_client.py b/src/gateway/clients/base_client.py index fb69f9c..da3b29b 100644 --- a/src/gateway/clients/base_client.py +++ b/src/gateway/clients/base_client.py @@ -1,19 +1,23 @@ +import logging +from typing import Any, ClassVar, Self + import grpc import grpc.aio -import logging -from typing import Dict, Any from google.protobuf import message as _message - logger = logging.getLogger(__name__) -class GRPCError(Exception): ... + +class GRPCError(Exception): + pass + class RPCDoesNotExistException(GRPCError): """Исключение когда RPC метод не найден в gRPC стабе""" + class GRPCClient: - _RPC_REQUEST_CLASSES = { } + _RPC_REQUEST_CLASSES: ClassVar[dict[str, type[_message.Message]]] = {} def __init__(self, channel: grpc.Channel): """ @@ -24,113 +28,116 @@ def __init__(self, channel: grpc.Channel): self._stub = None self._initialize_stub() - def _initialize_stub(self): + def _initialize_stub(self) -> None: """Инициализация стаба""" raise NotImplementedError() - def call_rpc(self, rpc_name: str, request_data: Dict[str, Any], timeout: int = 30) -> _message.Message: + def call_rpc(self, rpc_name: str, request_data: dict[str, Any], timeout: int = 30) -> _message.Message: """ Универсальный метод вызова RPC - + Args: rpc_name: имя RPC метода request_data: данные для запроса timeout: таймаут в секундах - + Returns: gRPC response message """ try: rpc_method = self._get_rpc_method(rpc_name) - + request = self._create_request(rpc_name, request_data) - + logger.debug(f"Вызов gRPC: {rpc_name} Данные: {request_data}") - response = rpc_method(request, timeout=timeout) - + response: _message.Message = rpc_method(request, timeout=timeout) # type: ignore[operator] + return response - + except grpc.RpcError as e: logger.error(f"gRPC ошибка {rpc_name}: {e}") raise GRPCError(f"gRPC Ошибка вызова: {e}") from e except Exception as e: logger.error(f"Неизвестная ошибка при вызове {rpc_name}: {e}") raise - - def _get_rpc_method(self, rpc_name: str): + + def _get_rpc_method(self, rpc_name: str) -> _message.Message: """Получаем RPC метод из стаба""" - rpc_method = getattr(self._stub, rpc_name, None) + rpc_method: _message.Message = getattr(self._stub, rpc_name, None) # type: ignore[assignment] if rpc_method is None: raise RPCDoesNotExistException(f"RPC метод '{rpc_name}' не найден в стабе") return rpc_method - - def _create_request(self, rpc_name: str, request_data: Dict[str, Any]) -> _message.Message: + + def _create_request(self, rpc_name: str, request_data: dict[str, Any]) -> _message.Message: """ Создает gRPC request message из словаря """ raise NotImplementedError() - - def close(self): + + def close(self) -> None: """Закрывает соединение""" if self._channel: self._channel.close() - - def __enter__(self): + + def __enter__(self) -> Self: return self - - def __exit__(self, exc_type, exc_val, exc_tb): + + def __exit__(self, exc_type, exc_val, exc_tb) -> None: # type: ignore self.close() + class AsyncGRPCClient: + _RPC_REQUEST_CLASSES: ClassVar[dict[str, type[_message.Message]]] = {} + def __init__(self, channel: grpc.aio.Channel): self._channel = channel self._stub = None self._initialize_stub() - def _initialize_stub(self): + def _initialize_stub(self) -> None: raise NotImplementedError() - - async def call_rpc(self, rpc_name: str, request_data: Dict[str, Any], timeout: int = 30) -> _message.Message: + + async def call_rpc(self, rpc_name: str, request_data: dict[str, Any], timeout: int = 30) -> _message.Message: """ Универсальный метод вызова RPC (async) - + Args: rpc_name: имя RPC метода request_data: данные для запроса timeout: таймаут в секундах - + Returns: gRPC response message """ try: rpc_method = self._get_rpc_method(rpc_name) request = self._create_request(rpc_name, request_data) - - response = await rpc_method(request, timeout=timeout) + + response: _message.Message = await rpc_method(request, timeout=timeout) # type: ignore[operator] return response - + except grpc.RpcError as e: logger.error(f"gRPC ошибка {rpc_name}: {e} (async)") raise GRPCError(f"gRPC Ошибка вызова: {e} (async)") from e except Exception as e: logger.error(f"Неизвестная ошибка при вызове {rpc_name}: {e} (async)") raise - - def _get_rpc_method(self, rpc_name: str): - rpc_method = getattr(self._stub, rpc_name, None) + + def _get_rpc_method(self, rpc_name: str) -> _message.Message: + rpc_method: _message.Message | None = getattr(self._stub, rpc_name, None) if not rpc_method: raise RPCDoesNotExistException(f"RPC метод '{rpc_name}' не найден в стабе (async)") return rpc_method - - def _create_request(self, rpc_name: str, request_data: Dict[str, Any]) -> _message.Message: + + def _create_request(self, rpc_name: str, request_data: dict[str, Any]) -> _message.Message: raise NotImplementedError() - - async def close(self): + + async def close(self) -> None: if self._channel: await self._channel.close() - - async def __aenter__(self): + + async def __aenter__(self) -> Self: return self - - async def __aexit__(self, exc_type, exc_val, exc_tb): + + async def __aexit__(self, exc_type, exc_val, exc_tb) -> None: # type: ignore await self.close() diff --git a/src/gateway/clients/client_factory.py b/src/gateway/clients/client_factory.py index 709bd7a..5e2557d 100644 --- a/src/gateway/clients/client_factory.py +++ b/src/gateway/clients/client_factory.py @@ -1,25 +1,26 @@ import grpc + +from gateway.clients.comment import AsyncCommentServiceClient, CommentServiceClient +from gateway.clients.mod import AsyncModServiceClient, ModServiceClient +from gateway.clients.rating import AsyncRatingServiceClient, RatingServiceClient from gateway.settings import Settings -from gateway.clients.comment import CommentServiceClient, AsyncCommentServiceClient -from gateway.clients.mod import ModServiceClient, AsyncModServiceClient -from gateway.clients.rating import RatingServiceClient, AsyncRatingServiceClient class GRPCClientFactory: - def __init__(self, settings: Settings): + def __init__(self, settings: Settings) -> None: self._settings = settings - self._channels = {} # На всякий случай + self._channels: dict[str, grpc.Channel] = {} # На всякий случай - def _add_to_channels_if_needed(self, url): + def _add_to_channels_if_needed(self, url: str) -> None: if url not in self._channels: self._channels[url] = grpc.insecure_channel(url) def get_comment_client(self) -> CommentServiceClient: url = self._settings.comment_service_url self._add_to_channels_if_needed(url) - + return CommentServiceClient(self._channels[url]) - + def get_mod_client(self) -> ModServiceClient: url = self._settings.mod_service_url self._add_to_channels_if_needed(url) @@ -31,26 +32,27 @@ def get_rating_client(self) -> RatingServiceClient: self._add_to_channels_if_needed(url) return RatingServiceClient(self._channels[url]) - - def close_all(self): + + def close_all(self) -> None: for channel in self._channels.values(): channel.close() + class AsyncGRPCClientFactory: - def __init__(self, settings: Settings): + def __init__(self, settings: Settings) -> None: self._settings = settings - self._channels = {} # На всякий случай + self._channels: dict[str, grpc.aio.Channel] = {} # На всякий случай - def _add_to_channels_if_needed(self, url): + def _add_to_channels_if_needed(self, url: str) -> None: if url not in self._channels: self._channels[url] = grpc.aio.insecure_channel(url) async def get_comment_client(self) -> AsyncCommentServiceClient: url = self._settings.comment_service_url self._add_to_channels_if_needed(url) - + return AsyncCommentServiceClient(self._channels[url]) - + async def get_mod_client(self) -> AsyncModServiceClient: url = self._settings.mod_service_url self._add_to_channels_if_needed(url) @@ -62,7 +64,7 @@ async def get_rating_client(self) -> AsyncRatingServiceClient: self._add_to_channels_if_needed(url) return AsyncRatingServiceClient(self._channels[url]) - - async def close_all(self): + + async def close_all(self) -> None: for channel in self._channels.values(): - channel.close() + await channel.close() diff --git a/src/gateway/clients/comment.py b/src/gateway/clients/comment.py index d2dcb3d..15cb069 100644 --- a/src/gateway/clients/comment.py +++ b/src/gateway/clients/comment.py @@ -1,92 +1,77 @@ -from gateway.stubs import comment_pb2, comment_pb2_grpc +from collections.abc import Coroutine +from typing import Any, ClassVar + from google.protobuf import message as _message -from typing import Dict, Any -from .base_client import GRPCClient, AsyncGRPCClient + +from gateway.stubs import comment_pb2, comment_pb2_grpc + +from .base_client import AsyncGRPCClient, GRPCClient class CommentServiceClient(GRPCClient): - _RPC_REQUEST_CLASSES = { + _RPC_REQUEST_CLASSES: ClassVar[dict[str, type[_message.Message]]] = { "CreateComment": comment_pb2.CreateCommentRequest, "EditComment": comment_pb2.EditCommentRequest, "DeleteComment": comment_pb2.DeleteCommentRequest, "GetComments": comment_pb2.GetCommentsRequest, } - def _initialize_stub(self): - self._stub = comment_pb2_grpc.CommentServiceStub(self._channel) + def _initialize_stub(self) -> None: + self._stub = comment_pb2_grpc.CommentServiceStub(self._channel) # type: ignore - def _create_request(self, rpc_name: str, request_data: Dict[str, Any]) -> _message.Message: + def _create_request(self, rpc_name: str, request_data: dict[str, Any]) -> _message.Message: request_class = CommentServiceClient._RPC_REQUEST_CLASSES.get(rpc_name) if not request_class: raise ValueError(f"Неизветсный RPC метод: {rpc_name}") - + return request_class(**request_data) - + # Вообще я написал универсальный "call_rpc". Но обёртки никто не отменял :) # Как минимум благодаря обёртке мы можем проставить типизацию def create_comment(self, mod_id: int, author_id: int, text: str) -> comment_pb2.CreateCommentResponse: - return self.call_rpc("CreateComment", { - "mod_id": mod_id, - "author_id": author_id, - "text": text - }) - + return self.call_rpc("CreateComment", {"mod_id": mod_id, "author_id": author_id, "text": text}) # type: ignore[return-value] + def edit_comment(self, comment_id: int, text: str) -> comment_pb2.EditCommentResponse: - return self.call_rpc("EditComment", { - "comment_id": comment_id, - "text": text - }) - + return self.call_rpc("EditComment", {"comment_id": comment_id, "text": text}) # type: ignore[return-value] + def delete_comment(self, comment_id: int) -> comment_pb2.DeleteCommentResponse: - return self.call_rpc("DeleteComment", { - "comment_id": comment_id - }) - + return self.call_rpc("DeleteComment", {"comment_id": comment_id}) # type: ignore[return-value] + def get_comments(self, mod_id: int) -> comment_pb2.GetCommentsResponse: - return self.call_rpc("GetComments", { - "mod_id": mod_id - }) - + return self.call_rpc("GetComments", {"mod_id": mod_id}) # type: ignore[return-value] + + class AsyncCommentServiceClient(AsyncGRPCClient): - _RPC_REQUEST_CLASSES = { + _RPC_REQUEST_CLASSES: ClassVar[dict[str, type[_message.Message]]] = { "CreateComment": comment_pb2.CreateCommentRequest, "EditComment": comment_pb2.EditCommentRequest, "DeleteComment": comment_pb2.DeleteCommentRequest, "GetComments": comment_pb2.GetCommentsRequest, } - def _initialize_stub(self): - self._stub = comment_pb2_grpc.CommentServiceStub(self._channel) + def _initialize_stub(self) -> None: + self._stub = comment_pb2_grpc.CommentServiceStub(self._channel) # type: ignore - def _create_request(self, rpc_name: str, request_data: Dict[str, Any]) -> _message.Message: + def _create_request(self, rpc_name: str, request_data: dict[str, Any]) -> _message.Message: request_class = CommentServiceClient._RPC_REQUEST_CLASSES.get(rpc_name) if not request_class: raise ValueError(f"Неизветсный RPC метод: {rpc_name}") - + return request_class(**request_data) - + # *** # - async def create_comment(self, mod_id: int, author_id: int, text: str) -> comment_pb2.CreateCommentResponse: - return self.call_rpc("CreateComment", { - "mod_id": mod_id, - "author_id": author_id, - "text": text - }) - - async def edit_comment(self, comment_id: int, text: str) -> comment_pb2.EditCommentResponse: - return self.call_rpc("EditComment", { - "comment_id": comment_id, - "text": text - }) - - async def delete_comment(self, comment_id: int) -> comment_pb2.DeleteCommentResponse: - return self.call_rpc("DeleteComment", { - "comment_id": comment_id - }) - - async def get_comments(self, mod_id: int) -> comment_pb2.GetCommentsResponse: - return self.call_rpc("GetComments", { - "mod_id": mod_id - }) \ No newline at end of file + async def create_comment( + self, mod_id: int, author_id: int, text: str + ) -> Coroutine[Any, Any, comment_pb2.CreateCommentResponse]: + return self.call_rpc("CreateComment", {"mod_id": mod_id, "author_id": author_id, "text": text}) # type: ignore[return-value] + + async def edit_comment(self, comment_id: int, text: str) -> Coroutine[Any, Any, comment_pb2.EditCommentResponse]: + return self.call_rpc("EditComment", {"comment_id": comment_id, "text": text}) # type: ignore[return-value] + + async def delete_comment(self, comment_id: int) -> Coroutine[Any, Any, comment_pb2.DeleteCommentResponse]: + return self.call_rpc("DeleteComment", {"comment_id": comment_id}) # type: ignore[return-value] + + async def get_comments(self, mod_id: int) -> Coroutine[Any, Any, comment_pb2.GetCommentsResponse]: + return self.call_rpc("GetComments", {"mod_id": mod_id}) # type: ignore[return-value] diff --git a/src/gateway/clients/mod.py b/src/gateway/clients/mod.py index efcae0d..6b0fe30 100644 --- a/src/gateway/clients/mod.py +++ b/src/gateway/clients/mod.py @@ -1,94 +1,80 @@ -from gateway.stubs import mod_pb2, mod_pb2_grpc +from collections.abc import Coroutine +from typing import Any, ClassVar + from google.protobuf import message as _message -from typing import Dict, Any -from .base_client import GRPCClient, AsyncGRPCClient + +from gateway.stubs import mod_pb2, mod_pb2_grpc + +from .base_client import AsyncGRPCClient, GRPCClient + class ModServiceClient(GRPCClient): - _RPC_REQUEST_CLASSES = { + _RPC_REQUEST_CLASSES: ClassVar[dict[str, type[_message.Message]]] = { "CreateMod": mod_pb2.CreateModRequest, "SetStatus": mod_pb2.SetStatusRequest, "GetModDownloadLink": mod_pb2.GetModDownloadLinkRequest, "GetMods": mod_pb2.GetModsRequest, } - def _initialize_stub(self): - self._stub = mod_pb2_grpc.ModServiceStub(self._channel) + def _initialize_stub(self) -> None: + self._stub = mod_pb2_grpc.ModServiceStub(self._channel) # type: ignore - def _create_request(self, rpc_name: str, request_data: Dict[str, Any]) -> _message.Message: + def _create_request(self, rpc_name: str, request_data: dict[str, Any]) -> _message.Message: request_class = ModServiceClient._RPC_REQUEST_CLASSES.get(rpc_name) if not request_class: raise ValueError(f"Неизветсный RPC метод: {rpc_name}") - + return request_class(**request_data) - + # *** # def create_mod(self, title: str, author_id: int, filename: str, description: str) -> mod_pb2.CreateModResponse: - return self.call_rpc("CreateMod", { - "title": title, - "author_id": author_id, - "filename": filename, - "description": description - }) - + return self.call_rpc( + "CreateMod", {"title": title, "author_id": author_id, "filename": filename, "description": description} + ) # type: ignore[return-value] def set_status_mod(self, mod_id: int, status: str) -> mod_pb2.SetStatusResponse: - return self.call_rpc("SetStatus", { - "mod_id": mod_id, - "status": status - }) - + return self.call_rpc("SetStatus", {"mod_id": mod_id, "status": status}) # type: ignore[return-value] def get_mod_download_link(self, mod_id: int) -> mod_pb2.GetModDownloadLinkResponse: - return self.call_rpc("GetModDownloadLink", { - "mod_id": mod_id - }) - + return self.call_rpc("GetModDownloadLink", {"mod_id": mod_id}) # type: ignore[return-value] def get_mods(self) -> mod_pb2.GetModsResponse: - return self.call_rpc("GetMods", {}) - + return self.call_rpc("GetMods", {}) # type: ignore[return-value] + + class AsyncModServiceClient(AsyncGRPCClient): - _RPC_REQUEST_CLASSES = { + _RPC_REQUEST_CLASSES: ClassVar[dict[str, type[_message.Message]]] = { "CreateMod": mod_pb2.CreateModRequest, "SetStatus": mod_pb2.SetStatusRequest, "GetModDownloadLink": mod_pb2.GetModDownloadLinkRequest, "GetMods": mod_pb2.GetModsRequest, } - def _initialize_stub(self): - self._stub = mod_pb2_grpc.ModServiceStub(self._channel) + def _initialize_stub(self) -> None: + self._stub = mod_pb2_grpc.ModServiceStub(self._channel) # type: ignore - def _create_request(self, rpc_name: str, request_data: Dict[str, Any]) -> _message.Message: + def _create_request(self, rpc_name: str, request_data: dict[str, Any]) -> _message.Message: request_class = ModServiceClient._RPC_REQUEST_CLASSES.get(rpc_name) if not request_class: raise ValueError(f"Неизветсный RPC метод: {rpc_name}") - - return request_class(**request_data) - - # *** # - - async def create_mod(self, title: str, author_id: int, filename: str, description: str) -> mod_pb2.CreateModResponse: - return self.call_rpc("CreateMod", { - "title": title, - "author_id": author_id, - "filename": filename, - "description": description - }) + return request_class(**request_data) - async def set_status_mod(self, mod_id: int, status: str) -> mod_pb2.SetStatusResponse: - return self.call_rpc("SetStatus", { - "mod_id": mod_id, - "status": status - }) + # *** # + async def create_mod( + self, title: str, author_id: int, filename: str, description: str + ) -> Coroutine[Any, Any, mod_pb2.CreateModResponse]: + return self.call_rpc( + "CreateMod", {"title": title, "author_id": author_id, "filename": filename, "description": description} + ) # type: ignore[return-value] - async def get_mod_download_link_rpc(self, mod_id: int) -> mod_pb2.GetModDownloadLinkResponse: - return self.call_rpc("GetModDownloadLink", { - "mod_id": mod_id - }) + async def set_status_mod(self, mod_id: int, status: str) -> Coroutine[Any, Any, mod_pb2.SetStatusResponse]: + return self.call_rpc("SetStatus", {"mod_id": mod_id, "status": status}) # type: ignore[return-value] + async def get_mod_download_link_rpc(self, mod_id: int) -> Coroutine[Any, Any, mod_pb2.GetModDownloadLinkResponse]: + return self.call_rpc("GetModDownloadLink", {"mod_id": mod_id}) # type: ignore[return-value] - async def get_mods(self) -> mod_pb2.GetModsResponse: - return self.call_rpc("GetMods", {}) + async def get_mods(self) -> Coroutine[Any, Any, mod_pb2.GetModsResponse]: + return self.call_rpc("GetMods", {}) # type: ignore[return-value] diff --git a/src/gateway/clients/rating.py b/src/gateway/clients/rating.py index 679b7e8..6a6d5ed 100644 --- a/src/gateway/clients/rating.py +++ b/src/gateway/clients/rating.py @@ -1,62 +1,58 @@ -from gateway.stubs import rating_pb2, rating_pb2_grpc +from collections.abc import Coroutine +from typing import Any, ClassVar + from google.protobuf import message as _message -from typing import Dict, Any -from .base_client import GRPCClient, AsyncGRPCClient + +from gateway.stubs import rating_pb2, rating_pb2_grpc + +from .base_client import AsyncGRPCClient, GRPCClient + class RatingServiceClient(GRPCClient): - _RPC_REQUEST_CLASSES = { + _RPC_REQUEST_CLASSES: ClassVar[dict[str, type[_message.Message]]] = { "RateMod": rating_pb2.RateModRequest, "GetRates": rating_pb2.GetRatesRequest, } - def _initialize_stub(self): - self._stub = rating_pb2_grpc.RatingServiceStub(self._channel) + def _initialize_stub(self) -> None: + self._stub = rating_pb2_grpc.RatingServiceStub(self._channel) # type: ignore - def _create_request(self, rpc_name: str, request_data: Dict[str, Any]) -> _message.Message: + def _create_request(self, rpc_name: str, request_data: dict[str, Any]) -> _message.Message: request_class = RatingServiceClient._RPC_REQUEST_CLASSES.get(rpc_name) if not request_class: raise ValueError(f"Неизветсный RPC метод: {rpc_name}") - + return request_class(**request_data) - + # *** # def rate_mod(self, mod_id: int, author_id: int, rate: str) -> rating_pb2.RateModResponse: - return self.call_rpc("RateMod", { - "mod_id": mod_id, - "author_id": author_id, - "rate": rate - }) - + return self.call_rpc("RateMod", {"mod_id": mod_id, "author_id": author_id, "rate": rate}) # type: ignore[return-value] def get_rates(self, mod_id: int) -> rating_pb2.GetRatesResponse: - return self.call_rpc("GetRates", { "mod_id": mod_id }) + return self.call_rpc("GetRates", {"mod_id": mod_id}) # type: ignore[return-value] + class AsyncRatingServiceClient(AsyncGRPCClient): - _RPC_REQUEST_CLASSES = { + _RPC_REQUEST_CLASSES: ClassVar[dict[str, type[_message.Message]]] = { "RateMod": rating_pb2.RateModRequest, "GetRates": rating_pb2.GetRatesRequest, } - def _initialize_stub(self): - self._stub = rating_pb2_grpc.RatingServiceStub(self._channel) - - def _create_request(self, rpc_name: str, request_data: Dict[str, Any]) -> _message.Message: + def _initialize_stub(self) -> None: + self._stub = rating_pb2_grpc.RatingServiceStub(self._channel) # type: ignore + + def _create_request(self, rpc_name: str, request_data: dict[str, Any]) -> _message.Message: request_class = RatingServiceClient._RPC_REQUEST_CLASSES.get(rpc_name) if not request_class: raise ValueError(f"Неизветсный RPC метод: {rpc_name}") - + return request_class(**request_data) - - # *** # - async def rate_mod(self, mod_id: int, author_id: int, rate: str) -> rating_pb2.RateModResponse: - return self.call_rpc("RateMod", { - "mod_id": mod_id, - "author_id": author_id, - "rate": rate - }) + # *** # + async def rate_mod(self, mod_id: int, author_id: int, rate: str) -> Coroutine[Any, Any, rating_pb2.RateModResponse]: + return self.call_rpc("RateMod", {"mod_id": mod_id, "author_id": author_id, "rate": rate}) # type: ignore[return-value] - async def get_rates(self, mod_id: int) -> rating_pb2.GetRatesResponse: - return self.call_rpc("GetRates", { "mod_id": mod_id }) + async def get_rates(self, mod_id: int) -> Coroutine[Any, Any, rating_pb2.GetRatesResponse]: + return self.call_rpc("GetRates", {"mod_id": mod_id}) # type: ignore[return-value] diff --git a/src/gateway/esclient_graphql.py b/src/gateway/esclient_graphql.py index ecf31c3..7511658 100644 --- a/src/gateway/esclient_graphql.py +++ b/src/gateway/esclient_graphql.py @@ -1,7 +1,11 @@ +from typing import Any + +from gateway.clients.base_client import AsyncGRPCClient, GRPCClient + + class GQLContextViewer: - def __init__(self): - self.clients = { } + def __init__(self) -> None: + self.clients: dict[str, GRPCClient | AsyncGRPCClient] = {} - def get_current(self, request): - return { "request": request, "clients": self.clients } - \ No newline at end of file + def get_current(self, request: Any) -> dict[str, Any]: + return {"request": request, "clients": self.clients} diff --git a/src/gateway/resolvers/mutation/comment.py b/src/gateway/resolvers/mutation/comment.py index 98d779e..7badf2c 100644 --- a/src/gateway/resolvers/mutation/comment.py +++ b/src/gateway/resolvers/mutation/comment.py @@ -45,7 +45,7 @@ def resolve_edit_comment(parent: object, info: GraphQLResolveInfo, input: EditCo data = EditCommentInput.model_validate(input) client = info.context["clients"]["comment_service"] resp = client.edit_comment(data.comment_id, data.text) - return resp.success + return resp.success # type: ignore class DeleteCommentInput(BaseModel): @@ -61,4 +61,4 @@ def resolve_delete_comment(parent: object, info: GraphQLResolveInfo, input: Dele data = DeleteCommentInput.model_validate(input) client = info.context["clients"]["comment_service"] resp = client.delete_comment(data.comment_id) - return resp.success + return resp.success # type: ignore diff --git a/src/gateway/resolvers/mutation/mod.py b/src/gateway/resolvers/mutation/mod.py index b0b22c9..0a124c3 100644 --- a/src/gateway/resolvers/mutation/mod.py +++ b/src/gateway/resolvers/mutation/mod.py @@ -56,4 +56,4 @@ def resolve_set_status_mod(parent: object, info: GraphQLResolveInfo, input: SetS data = SetStatusInput.model_validate(input) client = info.context["clients"]["mod_service"] resp = client.set_status_mod(data.mod_id, data.status.value) - return resp.success + return resp.success # type: ignore diff --git a/src/gateway/resolvers/mutation/rating.py b/src/gateway/resolvers/mutation/rating.py index 7f83418..661a303 100644 --- a/src/gateway/resolvers/mutation/rating.py +++ b/src/gateway/resolvers/mutation/rating.py @@ -32,6 +32,7 @@ def validate_mod_id(cls, v: Any) -> int: def validate_author_id(cls, v: Any) -> int: return validate_and_convert_id(v, "author_id") + @rating_mutation.field("addRate") def resolve_add_rate(parent: object, info: GraphQLResolveInfo, input: AddRateInput) -> str: data = AddRateInput.model_validate(input) diff --git a/src/gateway/resolvers/query/comment.py b/src/gateway/resolvers/query/comment.py index 039df7e..e39703c 100644 --- a/src/gateway/resolvers/query/comment.py +++ b/src/gateway/resolvers/query/comment.py @@ -6,6 +6,7 @@ from gateway.helpers.id_helper import validate_and_convert_id + class GetCommentsInput(BaseModel): mod_id: int diff --git a/src/gateway/resolvers/query/mod.py b/src/gateway/resolvers/query/mod.py index 96113db..5f0d70d 100644 --- a/src/gateway/resolvers/query/mod.py +++ b/src/gateway/resolvers/query/mod.py @@ -24,7 +24,7 @@ def resolve_get_mod_download_link(parent: object, info: GraphQLResolveInfo, inpu data = GetModDownloadLinkInput.model_validate(input) client = info.context["clients"]["mod_service"] resp = client.get_mod_download_link(data.mod_id) - return resp.link_url + return resp.link_url # type: ignore @mod_query.field("getMods") diff --git a/src/gateway/server.py b/src/gateway/server.py index 7118e7c..6decfab 100644 --- a/src/gateway/server.py +++ b/src/gateway/server.py @@ -1,18 +1,19 @@ import uvicorn -from ariadne.asgi import GraphQL from ariadne import load_schema_from_path, make_executable_schema +from ariadne.asgi import GraphQL from ariadne.explorer import ExplorerGraphiQL -from .esclient_graphql import ESClientGQL, GQLContextViewer +from gateway.clients.client_factory import AsyncGRPCClientFactory, GRPCClientFactory from gateway.resolvers.mutation.comment import comment_mutation from gateway.resolvers.mutation.mod import mod_mutation from gateway.resolvers.mutation.root import mutation from gateway.resolvers.query.comment import comment_query from gateway.resolvers.query.mod import mod_query from gateway.resolvers.query.root import query -from gateway.clients.client_factory import GRPCClientFactory, AsyncGRPCClientFactory from gateway.settings import Settings +from .esclient_graphql import GQLContextViewer + settings = Settings() type_defs = load_schema_from_path("src/gateway/schema") @@ -29,22 +30,17 @@ context_viewer = GQLContextViewer() -app = GraphQL( - schema, - debug=True, - explorer=ExplorerGraphiQL(), - context_value=context_viewer.get_current -) +app = GraphQL(schema, debug=True, explorer=ExplorerGraphiQL(), context_value=context_viewer.get_current) sync_clients_factory = GRPCClientFactory(settings) async_clients_factory = AsyncGRPCClientFactory(settings) -#<Потом это можно будет заменить балансировщиком># -#<Я ваще не понимаю, что я делаю 🙏🙏🙏># +# <Потом это можно будет заменить балансировщиком># +# <Я ваще не понимаю, что я делаю 🙏🙏🙏># context_viewer.clients = { "comment_service": sync_clients_factory.get_comment_client(), "mod_service": sync_clients_factory.get_mod_client(), - "rating_service": sync_clients_factory.get_rating_client() + "rating_service": sync_clients_factory.get_rating_client(), } if __name__ == "__main__": From 299dc3d00b4d9075b5ae544778c08093c22ba302 Mon Sep 17 00:00:00 2001 From: MystiSs Date: Fri, 10 Oct 2025 23:24:42 +0300 Subject: [PATCH 3/7] =?UTF-8?q?=D0=94=D0=BE=D0=B1=D0=B0=D0=B2=D0=BB=D0=B5?= =?UTF-8?q?=D0=BD=D0=B0=20=D0=BE=D0=B1=D1=91=D1=80=D1=82=D0=BA=D0=B0=20gRP?= =?UTF-8?q?C=20=D0=BE=D1=88=D0=B8=D0=B1=D0=BE=D0=BA?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/gateway/clients/comment.py | 11 ++++------- src/gateway/clients/mod.py | 9 ++++----- src/gateway/clients/rating.py | 5 ++--- src/gateway/resolvers/grpc_error_wrapper.py | 20 ++++++++++++++++++++ src/gateway/resolvers/mutation/comment.py | 5 +++++ src/gateway/resolvers/mutation/mod.py | 4 ++++ src/gateway/resolvers/mutation/rating.py | 3 +++ src/gateway/resolvers/mutation/root.py | 5 +++++ src/gateway/resolvers/query/comment.py | 3 +++ src/gateway/resolvers/query/mod.py | 4 ++++ src/gateway/resolvers/query/root.py | 4 ++++ 11 files changed, 58 insertions(+), 15 deletions(-) create mode 100644 src/gateway/resolvers/grpc_error_wrapper.py diff --git a/src/gateway/clients/comment.py b/src/gateway/clients/comment.py index 15cb069..b9727eb 100644 --- a/src/gateway/clients/comment.py +++ b/src/gateway/clients/comment.py @@ -1,4 +1,3 @@ -from collections.abc import Coroutine from typing import Any, ClassVar from google.protobuf import message as _message @@ -62,16 +61,14 @@ def _create_request(self, rpc_name: str, request_data: dict[str, Any]) -> _messa # *** # - async def create_comment( - self, mod_id: int, author_id: int, text: str - ) -> Coroutine[Any, Any, comment_pb2.CreateCommentResponse]: + async def create_comment(self, mod_id: int, author_id: int, text: str) -> comment_pb2.CreateCommentResponse: return self.call_rpc("CreateComment", {"mod_id": mod_id, "author_id": author_id, "text": text}) # type: ignore[return-value] - async def edit_comment(self, comment_id: int, text: str) -> Coroutine[Any, Any, comment_pb2.EditCommentResponse]: + async def edit_comment(self, comment_id: int, text: str) -> comment_pb2.EditCommentResponse: return self.call_rpc("EditComment", {"comment_id": comment_id, "text": text}) # type: ignore[return-value] - async def delete_comment(self, comment_id: int) -> Coroutine[Any, Any, comment_pb2.DeleteCommentResponse]: + async def delete_comment(self, comment_id: int) -> comment_pb2.DeleteCommentResponse: return self.call_rpc("DeleteComment", {"comment_id": comment_id}) # type: ignore[return-value] - async def get_comments(self, mod_id: int) -> Coroutine[Any, Any, comment_pb2.GetCommentsResponse]: + async def get_comments(self, mod_id: int) -> comment_pb2.GetCommentsResponse: return self.call_rpc("GetComments", {"mod_id": mod_id}) # type: ignore[return-value] diff --git a/src/gateway/clients/mod.py b/src/gateway/clients/mod.py index 6b0fe30..5d23fed 100644 --- a/src/gateway/clients/mod.py +++ b/src/gateway/clients/mod.py @@ -1,4 +1,3 @@ -from collections.abc import Coroutine from typing import Any, ClassVar from google.protobuf import message as _message @@ -65,16 +64,16 @@ def _create_request(self, rpc_name: str, request_data: dict[str, Any]) -> _messa async def create_mod( self, title: str, author_id: int, filename: str, description: str - ) -> Coroutine[Any, Any, mod_pb2.CreateModResponse]: + ) -> mod_pb2.CreateModResponse: return self.call_rpc( "CreateMod", {"title": title, "author_id": author_id, "filename": filename, "description": description} ) # type: ignore[return-value] - async def set_status_mod(self, mod_id: int, status: str) -> Coroutine[Any, Any, mod_pb2.SetStatusResponse]: + async def set_status_mod(self, mod_id: int, status: str) -> mod_pb2.SetStatusResponse: return self.call_rpc("SetStatus", {"mod_id": mod_id, "status": status}) # type: ignore[return-value] - async def get_mod_download_link_rpc(self, mod_id: int) -> Coroutine[Any, Any, mod_pb2.GetModDownloadLinkResponse]: + async def get_mod_download_link_rpc(self, mod_id: int) -> mod_pb2.GetModDownloadLinkResponse: return self.call_rpc("GetModDownloadLink", {"mod_id": mod_id}) # type: ignore[return-value] - async def get_mods(self) -> Coroutine[Any, Any, mod_pb2.GetModsResponse]: + async def get_mods(self) -> mod_pb2.GetModsResponse: return self.call_rpc("GetMods", {}) # type: ignore[return-value] diff --git a/src/gateway/clients/rating.py b/src/gateway/clients/rating.py index 6a6d5ed..474ee87 100644 --- a/src/gateway/clients/rating.py +++ b/src/gateway/clients/rating.py @@ -1,4 +1,3 @@ -from collections.abc import Coroutine from typing import Any, ClassVar from google.protobuf import message as _message @@ -51,8 +50,8 @@ def _create_request(self, rpc_name: str, request_data: dict[str, Any]) -> _messa # *** # - async def rate_mod(self, mod_id: int, author_id: int, rate: str) -> Coroutine[Any, Any, rating_pb2.RateModResponse]: + async def rate_mod(self, mod_id: int, author_id: int, rate: str) -> rating_pb2.RateModResponse: return self.call_rpc("RateMod", {"mod_id": mod_id, "author_id": author_id, "rate": rate}) # type: ignore[return-value] - async def get_rates(self, mod_id: int) -> Coroutine[Any, Any, rating_pb2.GetRatesResponse]: + async def get_rates(self, mod_id: int) -> rating_pb2.GetRatesResponse: return self.call_rpc("GetRates", {"mod_id": mod_id}) # type: ignore[return-value] diff --git a/src/gateway/resolvers/grpc_error_wrapper.py b/src/gateway/resolvers/grpc_error_wrapper.py new file mode 100644 index 0000000..d852e5c --- /dev/null +++ b/src/gateway/resolvers/grpc_error_wrapper.py @@ -0,0 +1,20 @@ +from collections.abc import Callable +from functools import wraps +from typing import Any + +from graphql import GraphQLError + +from ..clients.base_client import GRPCError + + +def handle_grpc_errors(func: Callable) -> Callable: # type: ignore + @wraps(func) + def wrapper(*args, **kwargs) -> Any: # type: ignore + try: + return func(*args, **kwargs) + except GRPCError as e: + raise GraphQLError(f"gRPC ошибка: {e}") from None + except Exception as e: + raise GraphQLError(f"Неизвестная ошибка: {e}") from None + + return wrapper diff --git a/src/gateway/resolvers/mutation/comment.py b/src/gateway/resolvers/mutation/comment.py index 7badf2c..988731b 100644 --- a/src/gateway/resolvers/mutation/comment.py +++ b/src/gateway/resolvers/mutation/comment.py @@ -6,6 +6,8 @@ from gateway.helpers.id_helper import validate_and_convert_id +from ..grpc_error_wrapper import handle_grpc_errors + comment_mutation = ObjectType("CommentMutation") @@ -24,6 +26,7 @@ def _author_id(cls, v: Any) -> int: @comment_mutation.field("createComment") +@handle_grpc_errors def resolve_create_comment(parent: object, info: GraphQLResolveInfo, input: CreateCommentInput) -> str: data = CreateCommentInput.model_validate(input) client = info.context["clients"]["comment_service"] @@ -41,6 +44,7 @@ def _comment_id(cls, v: Any) -> int: @comment_mutation.field("editComment") +@handle_grpc_errors def resolve_edit_comment(parent: object, info: GraphQLResolveInfo, input: EditCommentInput) -> bool: data = EditCommentInput.model_validate(input) client = info.context["clients"]["comment_service"] @@ -57,6 +61,7 @@ def _comment_id(cls, v: Any) -> int: @comment_mutation.field("deleteComment") +@handle_grpc_errors def resolve_delete_comment(parent: object, info: GraphQLResolveInfo, input: DeleteCommentInput) -> bool: data = DeleteCommentInput.model_validate(input) client = info.context["clients"]["comment_service"] diff --git a/src/gateway/resolvers/mutation/mod.py b/src/gateway/resolvers/mutation/mod.py index 0a124c3..1c0ad8f 100644 --- a/src/gateway/resolvers/mutation/mod.py +++ b/src/gateway/resolvers/mutation/mod.py @@ -7,6 +7,8 @@ from gateway.helpers.id_helper import validate_and_convert_id +from ..grpc_error_wrapper import handle_grpc_errors + mod_mutation = ObjectType("ModMutation") @@ -35,6 +37,7 @@ class CreateModResult(BaseModel): @mod_mutation.field("createMod") +@handle_grpc_errors def resolve_create_mod(parent: object, info: GraphQLResolveInfo, input: CreateModInput) -> dict[str, Any]: data = CreateModInput.model_validate(input) client = info.context["clients"]["mod_service"] @@ -52,6 +55,7 @@ def validate_mod_id(cls, v: Any) -> int: @mod_mutation.field("setStatus") +@handle_grpc_errors def resolve_set_status_mod(parent: object, info: GraphQLResolveInfo, input: SetStatusInput) -> bool: data = SetStatusInput.model_validate(input) client = info.context["clients"]["mod_service"] diff --git a/src/gateway/resolvers/mutation/rating.py b/src/gateway/resolvers/mutation/rating.py index 661a303..b96ecf2 100644 --- a/src/gateway/resolvers/mutation/rating.py +++ b/src/gateway/resolvers/mutation/rating.py @@ -7,6 +7,8 @@ from gateway.helpers.id_helper import validate_and_convert_id +from ..grpc_error_wrapper import handle_grpc_errors + rating_mutation = ObjectType("RatingMutation") @@ -34,6 +36,7 @@ def validate_author_id(cls, v: Any) -> int: @rating_mutation.field("addRate") +@handle_grpc_errors def resolve_add_rate(parent: object, info: GraphQLResolveInfo, input: AddRateInput) -> str: data = AddRateInput.model_validate(input) client = info.context["clients"]["rating_service"] diff --git a/src/gateway/resolvers/mutation/root.py b/src/gateway/resolvers/mutation/root.py index 89efc59..11c2486 100644 --- a/src/gateway/resolvers/mutation/root.py +++ b/src/gateway/resolvers/mutation/root.py @@ -3,19 +3,24 @@ from ariadne import MutationType from graphql import GraphQLResolveInfo +from ..grpc_error_wrapper import handle_grpc_errors + mutation = MutationType() @mutation.field("comment") +@handle_grpc_errors def resolve_comment_root(obj: Any, info: GraphQLResolveInfo, **kwargs: Any) -> dict[str, Any]: return {} @mutation.field("mod") +@handle_grpc_errors def resolve_mod_root(obj: Any, info: GraphQLResolveInfo, **kwargs: Any) -> dict[str, Any]: return {} @mutation.field("rating") +@handle_grpc_errors def resolve_rating_root(obj: Any, info: GraphQLResolveInfo, **kwargs: Any) -> dict[str, Any]: return {} diff --git a/src/gateway/resolvers/query/comment.py b/src/gateway/resolvers/query/comment.py index e39703c..5bd0a35 100644 --- a/src/gateway/resolvers/query/comment.py +++ b/src/gateway/resolvers/query/comment.py @@ -6,6 +6,8 @@ from gateway.helpers.id_helper import validate_and_convert_id +from ..grpc_error_wrapper import handle_grpc_errors + class GetCommentsInput(BaseModel): mod_id: int @@ -31,6 +33,7 @@ def _edited_at(cls, v: Any) -> Any | None: @comment_query.field("getComments") +@handle_grpc_errors def resolve_get_comments(parent: object, info: GraphQLResolveInfo, input: GetCommentsInput) -> list[dict[str, Any]]: data = GetCommentsInput.model_validate(input) client = info.context["clients"]["comment_service"] diff --git a/src/gateway/resolvers/query/mod.py b/src/gateway/resolvers/query/mod.py index 5f0d70d..73c7d3e 100644 --- a/src/gateway/resolvers/query/mod.py +++ b/src/gateway/resolvers/query/mod.py @@ -7,6 +7,8 @@ from gateway.converters.mod_status_converter import proto_to_graphql_mod_status from gateway.helpers.id_helper import validate_and_convert_id +from ..grpc_error_wrapper import handle_grpc_errors + class GetModDownloadLinkInput(BaseModel): mod_id: int @@ -20,6 +22,7 @@ def _mod_id(cls, v: Any) -> int: @mod_query.field("getModDownloadLink") +@handle_grpc_errors def resolve_get_mod_download_link(parent: object, info: GraphQLResolveInfo, input: GetModDownloadLinkInput) -> str: data = GetModDownloadLinkInput.model_validate(input) client = info.context["clients"]["mod_service"] @@ -28,6 +31,7 @@ def resolve_get_mod_download_link(parent: object, info: GraphQLResolveInfo, inpu @mod_query.field("getMods") +@handle_grpc_errors def resolve_get_mods(parent: object, info: GraphQLResolveInfo) -> list[dict[str, Any]]: client = info.context["clients"]["mod_service"] resp = client.get_mods() diff --git a/src/gateway/resolvers/query/root.py b/src/gateway/resolvers/query/root.py index abb9957..b1413c1 100644 --- a/src/gateway/resolvers/query/root.py +++ b/src/gateway/resolvers/query/root.py @@ -3,14 +3,18 @@ from ariadne import QueryType from graphql import GraphQLResolveInfo +from ..grpc_error_wrapper import handle_grpc_errors + query = QueryType() @query.field("comment") +@handle_grpc_errors def resolve_comment_root(obj: Any, info: GraphQLResolveInfo, **kwargs: Any) -> dict[str, Any]: return {} @query.field("mod") +@handle_grpc_errors def resolve_mod_root(obj: Any, info: GraphQLResolveInfo, **kwargs: Any) -> dict[str, Any]: return {} From ba39bfcb330dc0587b5fcd28ce7b07f63c5fefd6 Mon Sep 17 00:00:00 2001 From: MystiSs Date: Sun, 12 Oct 2025 17:12:37 +0300 Subject: [PATCH 4/7] =?UTF-8?q?-=20=D0=9F=D0=B5=D1=80=D0=B5=D1=85=D0=BE?= =?UTF-8?q?=D0=B4=20=D0=BD=D0=B0=20=D0=B0=D1=81=D0=B8=D0=BD=D1=85=D1=80?= =?UTF-8?q?=D0=BE=D0=BD=D0=BD=D0=BE=D0=B5=20=D0=B2=D1=8B=D0=BF=D0=BE=D0=BB?= =?UTF-8?q?=D0=BD=D0=B5=D0=BD=D0=B8=D0=B5;=20-=20=D0=92=D0=BD=D0=B5=D0=B4?= =?UTF-8?q?=D1=80=D1=91=D0=BD=20tenacity=20=D0=B4=D0=BB=D1=8F=20=D0=BF?= =?UTF-8?q?=D0=BE=D0=B2=D1=82=D0=BE=D1=80=D0=BD=D1=8B=D1=85=20=D0=B2=D1=8B?= =?UTF-8?q?=D0=B7=D0=BE=D0=B2=D0=BE=D0=B2=20=D0=B7=D0=B0=D0=BF=D1=80=D0=BE?= =?UTF-8?q?=D1=81=D0=BE=D0=B2?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/gateway/clients/base_client.py | 8 ++- src/gateway/clients/client_factory.py | 12 +++-- src/gateway/clients/comment.py | 20 ++++---- src/gateway/clients/mod.py | 24 ++++----- src/gateway/clients/rating.py | 12 ++--- src/gateway/helpers/retry.py | 53 +++++++++++++++++++ src/gateway/resolvers/grpc_error_wrapper.py | 36 +++++++++---- src/gateway/resolvers/mutation/comment.py | 12 ++--- src/gateway/resolvers/mutation/mod.py | 8 +-- src/gateway/resolvers/mutation/rating.py | 4 +- src/gateway/resolvers/query/comment.py | 6 ++- src/gateway/resolvers/query/mod.py | 10 ++-- src/gateway/server.py | 56 +++++++++++++++------ 13 files changed, 184 insertions(+), 77 deletions(-) create mode 100644 src/gateway/helpers/retry.py diff --git a/src/gateway/clients/base_client.py b/src/gateway/clients/base_client.py index da3b29b..c3e0df6 100644 --- a/src/gateway/clients/base_client.py +++ b/src/gateway/clients/base_client.py @@ -5,6 +5,8 @@ import grpc.aio from google.protobuf import message as _message +from ..helpers.retry import grpc_retry + logger = logging.getLogger(__name__) @@ -32,7 +34,8 @@ def _initialize_stub(self) -> None: """Инициализация стаба""" raise NotImplementedError() - def call_rpc(self, rpc_name: str, request_data: dict[str, Any], timeout: int = 30) -> _message.Message: + @grpc_retry() # type: ignore + def call_rpc(self, rpc_name: str, request_data: dict[str, Any], timeout: int=30) -> _message.Message: """ Универсальный метод вызова RPC @@ -97,7 +100,8 @@ def __init__(self, channel: grpc.aio.Channel): def _initialize_stub(self) -> None: raise NotImplementedError() - async def call_rpc(self, rpc_name: str, request_data: dict[str, Any], timeout: int = 30) -> _message.Message: + @grpc_retry() # type: ignore + async def call_rpc(self, rpc_name: str, request_data: dict[str, Any], timeout: int=30) -> _message.Message: """ Универсальный метод вызова RPC (async) diff --git a/src/gateway/clients/client_factory.py b/src/gateway/clients/client_factory.py index 5e2557d..cceda63 100644 --- a/src/gateway/clients/client_factory.py +++ b/src/gateway/clients/client_factory.py @@ -47,19 +47,19 @@ def _add_to_channels_if_needed(self, url: str) -> None: if url not in self._channels: self._channels[url] = grpc.aio.insecure_channel(url) - async def get_comment_client(self) -> AsyncCommentServiceClient: + def get_comment_client(self) -> AsyncCommentServiceClient: url = self._settings.comment_service_url self._add_to_channels_if_needed(url) return AsyncCommentServiceClient(self._channels[url]) - async def get_mod_client(self) -> AsyncModServiceClient: + def get_mod_client(self) -> AsyncModServiceClient: url = self._settings.mod_service_url self._add_to_channels_if_needed(url) return AsyncModServiceClient(self._channels[url]) - async def get_rating_client(self) -> AsyncRatingServiceClient: + def get_rating_client(self) -> AsyncRatingServiceClient: url = self._settings.rating_service_url self._add_to_channels_if_needed(url) @@ -68,3 +68,9 @@ async def get_rating_client(self) -> AsyncRatingServiceClient: async def close_all(self) -> None: for channel in self._channels.values(): await channel.close() + + async def __aenter__(self): # type: ignore + return self + + async def __aexit__(self, exc_type, exc, tb): # type: ignore + await self.close_all() diff --git a/src/gateway/clients/comment.py b/src/gateway/clients/comment.py index b9727eb..a82e01c 100644 --- a/src/gateway/clients/comment.py +++ b/src/gateway/clients/comment.py @@ -19,7 +19,7 @@ def _initialize_stub(self) -> None: self._stub = comment_pb2_grpc.CommentServiceStub(self._channel) # type: ignore def _create_request(self, rpc_name: str, request_data: dict[str, Any]) -> _message.Message: - request_class = CommentServiceClient._RPC_REQUEST_CLASSES.get(rpc_name) + request_class = self._RPC_REQUEST_CLASSES.get(rpc_name) if not request_class: raise ValueError(f"Неизветсный RPC метод: {rpc_name}") @@ -29,16 +29,16 @@ def _create_request(self, rpc_name: str, request_data: dict[str, Any]) -> _messa # Как минимум благодаря обёртке мы можем проставить типизацию def create_comment(self, mod_id: int, author_id: int, text: str) -> comment_pb2.CreateCommentResponse: - return self.call_rpc("CreateComment", {"mod_id": mod_id, "author_id": author_id, "text": text}) # type: ignore[return-value] + return self.call_rpc("CreateComment", {"mod_id": mod_id, "author_id": author_id, "text": text}) # type: ignore def edit_comment(self, comment_id: int, text: str) -> comment_pb2.EditCommentResponse: - return self.call_rpc("EditComment", {"comment_id": comment_id, "text": text}) # type: ignore[return-value] + return self.call_rpc("EditComment", {"comment_id": comment_id, "text": text}) # type: ignore def delete_comment(self, comment_id: int) -> comment_pb2.DeleteCommentResponse: - return self.call_rpc("DeleteComment", {"comment_id": comment_id}) # type: ignore[return-value] + return self.call_rpc("DeleteComment", {"comment_id": comment_id}) # type: ignore def get_comments(self, mod_id: int) -> comment_pb2.GetCommentsResponse: - return self.call_rpc("GetComments", {"mod_id": mod_id}) # type: ignore[return-value] + return self.call_rpc("GetComments", {"mod_id": mod_id}) # type: ignore class AsyncCommentServiceClient(AsyncGRPCClient): @@ -53,7 +53,7 @@ def _initialize_stub(self) -> None: self._stub = comment_pb2_grpc.CommentServiceStub(self._channel) # type: ignore def _create_request(self, rpc_name: str, request_data: dict[str, Any]) -> _message.Message: - request_class = CommentServiceClient._RPC_REQUEST_CLASSES.get(rpc_name) + request_class = self._RPC_REQUEST_CLASSES.get(rpc_name) if not request_class: raise ValueError(f"Неизветсный RPC метод: {rpc_name}") @@ -62,13 +62,13 @@ def _create_request(self, rpc_name: str, request_data: dict[str, Any]) -> _messa # *** # async def create_comment(self, mod_id: int, author_id: int, text: str) -> comment_pb2.CreateCommentResponse: - return self.call_rpc("CreateComment", {"mod_id": mod_id, "author_id": author_id, "text": text}) # type: ignore[return-value] + return await self.call_rpc("CreateComment", {"mod_id": mod_id, "author_id": author_id, "text": text}) # type: ignore async def edit_comment(self, comment_id: int, text: str) -> comment_pb2.EditCommentResponse: - return self.call_rpc("EditComment", {"comment_id": comment_id, "text": text}) # type: ignore[return-value] + return await self.call_rpc("EditComment", {"comment_id": comment_id, "text": text}) # type: ignore async def delete_comment(self, comment_id: int) -> comment_pb2.DeleteCommentResponse: - return self.call_rpc("DeleteComment", {"comment_id": comment_id}) # type: ignore[return-value] + return await self.call_rpc("DeleteComment", {"comment_id": comment_id}) # type: ignore async def get_comments(self, mod_id: int) -> comment_pb2.GetCommentsResponse: - return self.call_rpc("GetComments", {"mod_id": mod_id}) # type: ignore[return-value] + return await self.call_rpc("GetComments", {"mod_id": mod_id}) # type: ignore diff --git a/src/gateway/clients/mod.py b/src/gateway/clients/mod.py index 5d23fed..2ba1542 100644 --- a/src/gateway/clients/mod.py +++ b/src/gateway/clients/mod.py @@ -19,7 +19,7 @@ def _initialize_stub(self) -> None: self._stub = mod_pb2_grpc.ModServiceStub(self._channel) # type: ignore def _create_request(self, rpc_name: str, request_data: dict[str, Any]) -> _message.Message: - request_class = ModServiceClient._RPC_REQUEST_CLASSES.get(rpc_name) + request_class = self._RPC_REQUEST_CLASSES.get(rpc_name) if not request_class: raise ValueError(f"Неизветсный RPC метод: {rpc_name}") @@ -28,18 +28,18 @@ def _create_request(self, rpc_name: str, request_data: dict[str, Any]) -> _messa # *** # def create_mod(self, title: str, author_id: int, filename: str, description: str) -> mod_pb2.CreateModResponse: - return self.call_rpc( + return self.call_rpc( # type: ignore "CreateMod", {"title": title, "author_id": author_id, "filename": filename, "description": description} - ) # type: ignore[return-value] + ) def set_status_mod(self, mod_id: int, status: str) -> mod_pb2.SetStatusResponse: - return self.call_rpc("SetStatus", {"mod_id": mod_id, "status": status}) # type: ignore[return-value] + return self.call_rpc("SetStatus", {"mod_id": mod_id, "status": status}) # type: ignore def get_mod_download_link(self, mod_id: int) -> mod_pb2.GetModDownloadLinkResponse: - return self.call_rpc("GetModDownloadLink", {"mod_id": mod_id}) # type: ignore[return-value] + return self.call_rpc("GetModDownloadLink", {"mod_id": mod_id}) # type: ignore def get_mods(self) -> mod_pb2.GetModsResponse: - return self.call_rpc("GetMods", {}) # type: ignore[return-value] + return self.call_rpc("GetMods", {}) # type: ignore class AsyncModServiceClient(AsyncGRPCClient): @@ -54,7 +54,7 @@ def _initialize_stub(self) -> None: self._stub = mod_pb2_grpc.ModServiceStub(self._channel) # type: ignore def _create_request(self, rpc_name: str, request_data: dict[str, Any]) -> _message.Message: - request_class = ModServiceClient._RPC_REQUEST_CLASSES.get(rpc_name) + request_class = self._RPC_REQUEST_CLASSES.get(rpc_name) if not request_class: raise ValueError(f"Неизветсный RPC метод: {rpc_name}") @@ -65,15 +65,15 @@ def _create_request(self, rpc_name: str, request_data: dict[str, Any]) -> _messa async def create_mod( self, title: str, author_id: int, filename: str, description: str ) -> mod_pb2.CreateModResponse: - return self.call_rpc( + return await self.call_rpc( # type: ignore "CreateMod", {"title": title, "author_id": author_id, "filename": filename, "description": description} - ) # type: ignore[return-value] + ) async def set_status_mod(self, mod_id: int, status: str) -> mod_pb2.SetStatusResponse: - return self.call_rpc("SetStatus", {"mod_id": mod_id, "status": status}) # type: ignore[return-value] + return await self.call_rpc("SetStatus", {"mod_id": mod_id, "status": status}) # type: ignore async def get_mod_download_link_rpc(self, mod_id: int) -> mod_pb2.GetModDownloadLinkResponse: - return self.call_rpc("GetModDownloadLink", {"mod_id": mod_id}) # type: ignore[return-value] + return await self.call_rpc("GetModDownloadLink", {"mod_id": mod_id}) # type: ignore async def get_mods(self) -> mod_pb2.GetModsResponse: - return self.call_rpc("GetMods", {}) # type: ignore[return-value] + return await self.call_rpc("GetMods", {}) # type: ignore diff --git a/src/gateway/clients/rating.py b/src/gateway/clients/rating.py index 474ee87..71ec0c4 100644 --- a/src/gateway/clients/rating.py +++ b/src/gateway/clients/rating.py @@ -17,7 +17,7 @@ def _initialize_stub(self) -> None: self._stub = rating_pb2_grpc.RatingServiceStub(self._channel) # type: ignore def _create_request(self, rpc_name: str, request_data: dict[str, Any]) -> _message.Message: - request_class = RatingServiceClient._RPC_REQUEST_CLASSES.get(rpc_name) + request_class = self._RPC_REQUEST_CLASSES.get(rpc_name) if not request_class: raise ValueError(f"Неизветсный RPC метод: {rpc_name}") @@ -26,10 +26,10 @@ def _create_request(self, rpc_name: str, request_data: dict[str, Any]) -> _messa # *** # def rate_mod(self, mod_id: int, author_id: int, rate: str) -> rating_pb2.RateModResponse: - return self.call_rpc("RateMod", {"mod_id": mod_id, "author_id": author_id, "rate": rate}) # type: ignore[return-value] + return self.call_rpc("RateMod", {"mod_id": mod_id, "author_id": author_id, "rate": rate}) # type: ignore def get_rates(self, mod_id: int) -> rating_pb2.GetRatesResponse: - return self.call_rpc("GetRates", {"mod_id": mod_id}) # type: ignore[return-value] + return self.call_rpc("GetRates", {"mod_id": mod_id}) # type: ignore class AsyncRatingServiceClient(AsyncGRPCClient): @@ -42,7 +42,7 @@ def _initialize_stub(self) -> None: self._stub = rating_pb2_grpc.RatingServiceStub(self._channel) # type: ignore def _create_request(self, rpc_name: str, request_data: dict[str, Any]) -> _message.Message: - request_class = RatingServiceClient._RPC_REQUEST_CLASSES.get(rpc_name) + request_class = self._RPC_REQUEST_CLASSES.get(rpc_name) if not request_class: raise ValueError(f"Неизветсный RPC метод: {rpc_name}") @@ -51,7 +51,7 @@ def _create_request(self, rpc_name: str, request_data: dict[str, Any]) -> _messa # *** # async def rate_mod(self, mod_id: int, author_id: int, rate: str) -> rating_pb2.RateModResponse: - return self.call_rpc("RateMod", {"mod_id": mod_id, "author_id": author_id, "rate": rate}) # type: ignore[return-value] + return await self.call_rpc("RateMod", {"mod_id": mod_id, "author_id": author_id, "rate": rate}) # type: ignore async def get_rates(self, mod_id: int) -> rating_pb2.GetRatesResponse: - return self.call_rpc("GetRates", {"mod_id": mod_id}) # type: ignore[return-value] + return await self.call_rpc("GetRates", {"mod_id": mod_id}) # type: ignore diff --git a/src/gateway/helpers/retry.py b/src/gateway/helpers/retry.py new file mode 100644 index 0000000..4eb8140 --- /dev/null +++ b/src/gateway/helpers/retry.py @@ -0,0 +1,53 @@ +import logging + +import grpc +from tenacity import RetryCallState, retry, retry_if_exception, stop_after_attempt, wait_exponential + +logger = logging.getLogger(__name__) + +NON_RETRYABLE = {grpc.StatusCode.UNIMPLEMENTED, grpc.StatusCode.INVALID_ARGUMENT, grpc.StatusCode.NOT_FOUND} + +RETRY_ATTEMPTS = 3 +RETRY_DELAY_MULTIPLIER = 1 +RETRY_DELAY_MIN = 1 +RETRY_DELAY_MAX = 10 + + +def is_retryable_grpc_exception(exc: BaseException) -> bool: + if isinstance(exc, grpc.RpcError): + code = exc.code() if hasattr(exc, "code") else None + return code not in NON_RETRYABLE + return False + + +def log_retry_attempt(retry_state: RetryCallState) -> None: + if retry_state.attempt_number < 1: + return + + if retry_state.outcome is None or not retry_state.outcome.failed or not hasattr(retry_state.outcome, "exception"): + return + + try: + exception = retry_state.outcome.exception() + if exception is None: + return + + sleep_time = getattr(retry_state.next_action, "sleep", 0) if retry_state.next_action else 0 + + logger.warning( + f"Повторная попытка {retry_state.attempt_number}/{RETRY_ATTEMPTS} " + f"для gRPC вызова. Ошибка: {exception} " + f"(следующая попытка через {sleep_time:.1f} сек)" + ) + except Exception as e: + logger.debug(f"Ошибка при логировании повторной попытки: {e}") + + +def grpc_retry(): # type: ignore + return retry( + stop=stop_after_attempt(RETRY_ATTEMPTS), + wait=wait_exponential(multiplier=RETRY_DELAY_MULTIPLIER, min=RETRY_DELAY_MIN, max=RETRY_DELAY_MAX), + retry=retry_if_exception(is_retryable_grpc_exception), + reraise=True, + before_sleep=log_retry_attempt, + ) diff --git a/src/gateway/resolvers/grpc_error_wrapper.py b/src/gateway/resolvers/grpc_error_wrapper.py index d852e5c..5c96f29 100644 --- a/src/gateway/resolvers/grpc_error_wrapper.py +++ b/src/gateway/resolvers/grpc_error_wrapper.py @@ -1,3 +1,4 @@ +import asyncio from collections.abc import Callable from functools import wraps from typing import Any @@ -8,13 +9,28 @@ def handle_grpc_errors(func: Callable) -> Callable: # type: ignore - @wraps(func) - def wrapper(*args, **kwargs) -> Any: # type: ignore - try: - return func(*args, **kwargs) - except GRPCError as e: - raise GraphQLError(f"gRPC ошибка: {e}") from None - except Exception as e: - raise GraphQLError(f"Неизвестная ошибка: {e}") from None - - return wrapper + if asyncio.iscoroutinefunction(func): + + @wraps(func) + async def async_wrapper(*args, **kwargs) -> Any: # type: ignore + try: + return await func(*args, **kwargs) + except GRPCError as e: + raise GraphQLError(f"gRPC ошибка: {e}") from None + except Exception as e: + raise GraphQLError(f"Неизвестная ошибка: {e}") from None + + return async_wrapper + + else: + + @wraps(func) + def sync_wrapper(*args, **kwargs) -> Any: # type: ignore + try: + return func(*args, **kwargs) + except GRPCError as e: + raise GraphQLError(f"gRPC ошибка: {e}") from None + except Exception as e: + raise GraphQLError(f"Неизвестная ошибка: {e}") from None + + return sync_wrapper diff --git a/src/gateway/resolvers/mutation/comment.py b/src/gateway/resolvers/mutation/comment.py index 988731b..97e1902 100644 --- a/src/gateway/resolvers/mutation/comment.py +++ b/src/gateway/resolvers/mutation/comment.py @@ -27,10 +27,10 @@ def _author_id(cls, v: Any) -> int: @comment_mutation.field("createComment") @handle_grpc_errors -def resolve_create_comment(parent: object, info: GraphQLResolveInfo, input: CreateCommentInput) -> str: +async def resolve_create_comment(parent: object, info: GraphQLResolveInfo, input: CreateCommentInput) -> str: data = CreateCommentInput.model_validate(input) client = info.context["clients"]["comment_service"] - resp = client.create_comment(data.mod_id, data.author_id, data.text) + resp = await client.create_comment(data.mod_id, data.author_id, data.text) return str(resp.comment_id) @@ -45,10 +45,10 @@ def _comment_id(cls, v: Any) -> int: @comment_mutation.field("editComment") @handle_grpc_errors -def resolve_edit_comment(parent: object, info: GraphQLResolveInfo, input: EditCommentInput) -> bool: +async def resolve_edit_comment(parent: object, info: GraphQLResolveInfo, input: EditCommentInput) -> bool: data = EditCommentInput.model_validate(input) client = info.context["clients"]["comment_service"] - resp = client.edit_comment(data.comment_id, data.text) + resp = await client.edit_comment(data.comment_id, data.text) return resp.success # type: ignore @@ -62,8 +62,8 @@ def _comment_id(cls, v: Any) -> int: @comment_mutation.field("deleteComment") @handle_grpc_errors -def resolve_delete_comment(parent: object, info: GraphQLResolveInfo, input: DeleteCommentInput) -> bool: +async def resolve_delete_comment(parent: object, info: GraphQLResolveInfo, input: DeleteCommentInput) -> bool: data = DeleteCommentInput.model_validate(input) client = info.context["clients"]["comment_service"] - resp = client.delete_comment(data.comment_id) + resp = await client.delete_comment(data.comment_id) return resp.success # type: ignore diff --git a/src/gateway/resolvers/mutation/mod.py b/src/gateway/resolvers/mutation/mod.py index 1c0ad8f..d343b64 100644 --- a/src/gateway/resolvers/mutation/mod.py +++ b/src/gateway/resolvers/mutation/mod.py @@ -38,10 +38,10 @@ class CreateModResult(BaseModel): @mod_mutation.field("createMod") @handle_grpc_errors -def resolve_create_mod(parent: object, info: GraphQLResolveInfo, input: CreateModInput) -> dict[str, Any]: +async def resolve_create_mod(parent: object, info: GraphQLResolveInfo, input: CreateModInput) -> dict[str, Any]: data = CreateModInput.model_validate(input) client = info.context["clients"]["mod_service"] - resp = client.create_mod(data.title, data.author_id, data.filename, data.description) + resp = await client.create_mod(data.title, data.author_id, data.filename, data.description) return CreateModResult(mod_id=resp.mod_id, s3_key=resp.s3_key, upload_url=resp.upload_url).model_dump() @@ -56,8 +56,8 @@ def validate_mod_id(cls, v: Any) -> int: @mod_mutation.field("setStatus") @handle_grpc_errors -def resolve_set_status_mod(parent: object, info: GraphQLResolveInfo, input: SetStatusInput) -> bool: +async def resolve_set_status_mod(parent: object, info: GraphQLResolveInfo, input: SetStatusInput) -> bool: data = SetStatusInput.model_validate(input) client = info.context["clients"]["mod_service"] - resp = client.set_status_mod(data.mod_id, data.status.value) + resp = await client.set_status_mod(data.mod_id, data.status.value) return resp.success # type: ignore diff --git a/src/gateway/resolvers/mutation/rating.py b/src/gateway/resolvers/mutation/rating.py index b96ecf2..8cc4c98 100644 --- a/src/gateway/resolvers/mutation/rating.py +++ b/src/gateway/resolvers/mutation/rating.py @@ -37,8 +37,8 @@ def validate_author_id(cls, v: Any) -> int: @rating_mutation.field("addRate") @handle_grpc_errors -def resolve_add_rate(parent: object, info: GraphQLResolveInfo, input: AddRateInput) -> str: +async def resolve_add_rate(parent: object, info: GraphQLResolveInfo, input: AddRateInput) -> str: data = AddRateInput.model_validate(input) client = info.context["clients"]["rating_service"] - resp = client.rate_mod(data.mod_id, data.author_id, data.rate.value) + resp = await client.rate_mod(data.mod_id, data.author_id, data.rate.value) return str(resp.rate_id) diff --git a/src/gateway/resolvers/query/comment.py b/src/gateway/resolvers/query/comment.py index 5bd0a35..00942ae 100644 --- a/src/gateway/resolvers/query/comment.py +++ b/src/gateway/resolvers/query/comment.py @@ -34,10 +34,12 @@ def _edited_at(cls, v: Any) -> Any | None: @comment_query.field("getComments") @handle_grpc_errors -def resolve_get_comments(parent: object, info: GraphQLResolveInfo, input: GetCommentsInput) -> list[dict[str, Any]]: +async def resolve_get_comments( + parent: object, info: GraphQLResolveInfo, input: GetCommentsInput +) -> list[dict[str, Any]]: data = GetCommentsInput.model_validate(input) client = info.context["clients"]["comment_service"] - resp = client.get_comments(data.mod_id) + resp = await client.get_comments(data.mod_id) return [ GetCommentsResult( id=item.id, diff --git a/src/gateway/resolvers/query/mod.py b/src/gateway/resolvers/query/mod.py index 73c7d3e..a91f488 100644 --- a/src/gateway/resolvers/query/mod.py +++ b/src/gateway/resolvers/query/mod.py @@ -23,18 +23,20 @@ def _mod_id(cls, v: Any) -> int: @mod_query.field("getModDownloadLink") @handle_grpc_errors -def resolve_get_mod_download_link(parent: object, info: GraphQLResolveInfo, input: GetModDownloadLinkInput) -> str: +async def resolve_get_mod_download_link( + parent: object, info: GraphQLResolveInfo, input: GetModDownloadLinkInput +) -> str: data = GetModDownloadLinkInput.model_validate(input) client = info.context["clients"]["mod_service"] - resp = client.get_mod_download_link(data.mod_id) + resp = await client.get_mod_download_link(data.mod_id) return resp.link_url # type: ignore @mod_query.field("getMods") @handle_grpc_errors -def resolve_get_mods(parent: object, info: GraphQLResolveInfo) -> list[dict[str, Any]]: +async def resolve_get_mods(parent: object, info: GraphQLResolveInfo) -> list[dict[str, Any]]: client = info.context["clients"]["mod_service"] - resp = client.get_mods() + resp = await client.get_mods() return [ { "id": item.id, diff --git a/src/gateway/server.py b/src/gateway/server.py index 6decfab..d8f9c65 100644 --- a/src/gateway/server.py +++ b/src/gateway/server.py @@ -1,9 +1,12 @@ +import asyncio +import logging + import uvicorn from ariadne import load_schema_from_path, make_executable_schema from ariadne.asgi import GraphQL from ariadne.explorer import ExplorerGraphiQL -from gateway.clients.client_factory import AsyncGRPCClientFactory, GRPCClientFactory +from gateway.clients.client_factory import AsyncGRPCClientFactory from gateway.resolvers.mutation.comment import comment_mutation from gateway.resolvers.mutation.mod import mod_mutation from gateway.resolvers.mutation.root import mutation @@ -14,6 +17,9 @@ from .esclient_graphql import GQLContextViewer +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + settings = Settings() type_defs = load_schema_from_path("src/gateway/schema") @@ -29,24 +35,42 @@ ) context_viewer = GQLContextViewer() +# sync_clients_factory = GRPCClientFactory(settings) +async_clients_factory = AsyncGRPCClientFactory(settings) app = GraphQL(schema, debug=True, explorer=ExplorerGraphiQL(), context_value=context_viewer.get_current) -sync_clients_factory = GRPCClientFactory(settings) -async_clients_factory = AsyncGRPCClientFactory(settings) -# <Потом это можно будет заменить балансировщиком># -# <Я ваще не понимаю, что я делаю 🙏🙏🙏># -context_viewer.clients = { - "comment_service": sync_clients_factory.get_comment_client(), - "mod_service": sync_clients_factory.get_mod_client(), - "rating_service": sync_clients_factory.get_rating_client(), -} +async def main(): # type: ignore + logger.info("Инициализация gRPC клиентов...") + comment_client = async_clients_factory.get_comment_client() + mod_client = async_clients_factory.get_mod_client() + rating_client = async_clients_factory.get_rating_client() + logger.info("gRPC клиенты инициализированы.") -if __name__ == "__main__": - uvicorn.run( - "gateway.server:app", - host=settings.host, - port=settings.port, - reload=True, + context_viewer.clients = { + "comment_service": comment_client, + "mod_service": mod_client, + "rating_service": rating_client, + } + + app = GraphQL( + schema, + debug=True, + explorer=ExplorerGraphiQL(), + context_value=context_viewer.get_current, ) + + config = uvicorn.Config(app=app, host=settings.host, port=settings.port, reload=True) + server = uvicorn.Server(config) + + try: + await server.serve() + finally: + logger.info("Закрытие gRPC каналов...") + await async_clients_factory.close_all() + logger.info("Все соединения закрыты.") + + +if __name__ == "__main__": + asyncio.run(main()) # type: ignore From e099cf3d4b756c131ffe034c2d142107710d4320 Mon Sep 17 00:00:00 2001 From: MystiSs Date: Sun, 12 Oct 2025 17:13:35 +0300 Subject: [PATCH 5/7] =?UTF-8?q?=D0=9B=D0=B8=D0=BD=D1=82=D0=B5=D1=80=20?= =?UTF-8?q?=D1=87=D1=82=D0=BE-=D1=82=D0=BE=20=D0=BF=D0=BE=D0=B4=D0=BF?= =?UTF-8?q?=D1=80=D0=B0=D0=B2=D0=B8=D0=BB?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/gateway/clients/base_client.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/gateway/clients/base_client.py b/src/gateway/clients/base_client.py index c3e0df6..310551f 100644 --- a/src/gateway/clients/base_client.py +++ b/src/gateway/clients/base_client.py @@ -35,7 +35,7 @@ def _initialize_stub(self) -> None: raise NotImplementedError() @grpc_retry() # type: ignore - def call_rpc(self, rpc_name: str, request_data: dict[str, Any], timeout: int=30) -> _message.Message: + def call_rpc(self, rpc_name: str, request_data: dict[str, Any], timeout: int = 30) -> _message.Message: """ Универсальный метод вызова RPC @@ -101,7 +101,7 @@ def _initialize_stub(self) -> None: raise NotImplementedError() @grpc_retry() # type: ignore - async def call_rpc(self, rpc_name: str, request_data: dict[str, Any], timeout: int=30) -> _message.Message: + async def call_rpc(self, rpc_name: str, request_data: dict[str, Any], timeout: int = 30) -> _message.Message: """ Универсальный метод вызова RPC (async) From d1f52351e0e26b507f6677a9a0f387a3bd836094 Mon Sep 17 00:00:00 2001 From: Andrey Kataev Date: Mon, 13 Oct 2025 14:17:32 +0300 Subject: [PATCH 6/7] =?UTF-8?q?=D0=A3=D0=B1=D1=80=D0=B0=D0=BB=20=D1=81?= =?UTF-8?q?=D0=B8=D0=BD=D1=85=D1=80=D0=BE=D0=BD=D0=BD=D1=8B=D0=B9=20=D0=BA?= =?UTF-8?q?=D0=BE=D0=B4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/gateway/clients/base_client.py | 71 --------------------------- src/gateway/clients/client_factory.py | 42 ++-------------- src/gateway/clients/comment.py | 36 +------------- src/gateway/clients/mod.py | 39 +-------------- src/gateway/clients/rating.py | 27 +--------- src/gateway/esclient_graphql.py | 4 +- src/gateway/server.py | 13 +++-- 7 files changed, 17 insertions(+), 215 deletions(-) diff --git a/src/gateway/clients/base_client.py b/src/gateway/clients/base_client.py index 310551f..f71f9a1 100644 --- a/src/gateway/clients/base_client.py +++ b/src/gateway/clients/base_client.py @@ -21,77 +21,6 @@ class RPCDoesNotExistException(GRPCError): class GRPCClient: _RPC_REQUEST_CLASSES: ClassVar[dict[str, type[_message.Message]]] = {} - def __init__(self, channel: grpc.Channel): - """ - Args: - channel: gRPC канал для соединения - """ - self._channel = channel - self._stub = None - self._initialize_stub() - - def _initialize_stub(self) -> None: - """Инициализация стаба""" - raise NotImplementedError() - - @grpc_retry() # type: ignore - def call_rpc(self, rpc_name: str, request_data: dict[str, Any], timeout: int = 30) -> _message.Message: - """ - Универсальный метод вызова RPC - - Args: - rpc_name: имя RPC метода - request_data: данные для запроса - timeout: таймаут в секундах - - Returns: - gRPC response message - """ - try: - rpc_method = self._get_rpc_method(rpc_name) - - request = self._create_request(rpc_name, request_data) - - logger.debug(f"Вызов gRPC: {rpc_name} Данные: {request_data}") - response: _message.Message = rpc_method(request, timeout=timeout) # type: ignore[operator] - - return response - - except grpc.RpcError as e: - logger.error(f"gRPC ошибка {rpc_name}: {e}") - raise GRPCError(f"gRPC Ошибка вызова: {e}") from e - except Exception as e: - logger.error(f"Неизвестная ошибка при вызове {rpc_name}: {e}") - raise - - def _get_rpc_method(self, rpc_name: str) -> _message.Message: - """Получаем RPC метод из стаба""" - rpc_method: _message.Message = getattr(self._stub, rpc_name, None) # type: ignore[assignment] - if rpc_method is None: - raise RPCDoesNotExistException(f"RPC метод '{rpc_name}' не найден в стабе") - return rpc_method - - def _create_request(self, rpc_name: str, request_data: dict[str, Any]) -> _message.Message: - """ - Создает gRPC request message из словаря - """ - raise NotImplementedError() - - def close(self) -> None: - """Закрывает соединение""" - if self._channel: - self._channel.close() - - def __enter__(self) -> Self: - return self - - def __exit__(self, exc_type, exc_val, exc_tb) -> None: # type: ignore - self.close() - - -class AsyncGRPCClient: - _RPC_REQUEST_CLASSES: ClassVar[dict[str, type[_message.Message]]] = {} - def __init__(self, channel: grpc.aio.Channel): self._channel = channel self._stub = None diff --git a/src/gateway/clients/client_factory.py b/src/gateway/clients/client_factory.py index cceda63..a7526e2 100644 --- a/src/gateway/clients/client_factory.py +++ b/src/gateway/clients/client_factory.py @@ -1,19 +1,19 @@ import grpc -from gateway.clients.comment import AsyncCommentServiceClient, CommentServiceClient -from gateway.clients.mod import AsyncModServiceClient, ModServiceClient -from gateway.clients.rating import AsyncRatingServiceClient, RatingServiceClient +from gateway.clients.comment import CommentServiceClient +from gateway.clients.mod import ModServiceClient +from gateway.clients.rating import RatingServiceClient from gateway.settings import Settings class GRPCClientFactory: def __init__(self, settings: Settings) -> None: self._settings = settings - self._channels: dict[str, grpc.Channel] = {} # На всякий случай + self._channels: dict[str, grpc.aio.Channel] = {} # На всякий случай def _add_to_channels_if_needed(self, url: str) -> None: if url not in self._channels: - self._channels[url] = grpc.insecure_channel(url) + self._channels[url] = grpc.aio.insecure_channel(url) def get_comment_client(self) -> CommentServiceClient: url = self._settings.comment_service_url @@ -33,38 +33,6 @@ def get_rating_client(self) -> RatingServiceClient: return RatingServiceClient(self._channels[url]) - def close_all(self) -> None: - for channel in self._channels.values(): - channel.close() - - -class AsyncGRPCClientFactory: - def __init__(self, settings: Settings) -> None: - self._settings = settings - self._channels: dict[str, grpc.aio.Channel] = {} # На всякий случай - - def _add_to_channels_if_needed(self, url: str) -> None: - if url not in self._channels: - self._channels[url] = grpc.aio.insecure_channel(url) - - def get_comment_client(self) -> AsyncCommentServiceClient: - url = self._settings.comment_service_url - self._add_to_channels_if_needed(url) - - return AsyncCommentServiceClient(self._channels[url]) - - def get_mod_client(self) -> AsyncModServiceClient: - url = self._settings.mod_service_url - self._add_to_channels_if_needed(url) - - return AsyncModServiceClient(self._channels[url]) - - def get_rating_client(self) -> AsyncRatingServiceClient: - url = self._settings.rating_service_url - self._add_to_channels_if_needed(url) - - return AsyncRatingServiceClient(self._channels[url]) - async def close_all(self) -> None: for channel in self._channels.values(): await channel.close() diff --git a/src/gateway/clients/comment.py b/src/gateway/clients/comment.py index a82e01c..9f0601d 100644 --- a/src/gateway/clients/comment.py +++ b/src/gateway/clients/comment.py @@ -4,7 +4,7 @@ from gateway.stubs import comment_pb2, comment_pb2_grpc -from .base_client import AsyncGRPCClient, GRPCClient +from .base_client import GRPCClient class CommentServiceClient(GRPCClient): @@ -15,40 +15,6 @@ class CommentServiceClient(GRPCClient): "GetComments": comment_pb2.GetCommentsRequest, } - def _initialize_stub(self) -> None: - self._stub = comment_pb2_grpc.CommentServiceStub(self._channel) # type: ignore - - def _create_request(self, rpc_name: str, request_data: dict[str, Any]) -> _message.Message: - request_class = self._RPC_REQUEST_CLASSES.get(rpc_name) - if not request_class: - raise ValueError(f"Неизветсный RPC метод: {rpc_name}") - - return request_class(**request_data) - - # Вообще я написал универсальный "call_rpc". Но обёртки никто не отменял :) - # Как минимум благодаря обёртке мы можем проставить типизацию - - def create_comment(self, mod_id: int, author_id: int, text: str) -> comment_pb2.CreateCommentResponse: - return self.call_rpc("CreateComment", {"mod_id": mod_id, "author_id": author_id, "text": text}) # type: ignore - - def edit_comment(self, comment_id: int, text: str) -> comment_pb2.EditCommentResponse: - return self.call_rpc("EditComment", {"comment_id": comment_id, "text": text}) # type: ignore - - def delete_comment(self, comment_id: int) -> comment_pb2.DeleteCommentResponse: - return self.call_rpc("DeleteComment", {"comment_id": comment_id}) # type: ignore - - def get_comments(self, mod_id: int) -> comment_pb2.GetCommentsResponse: - return self.call_rpc("GetComments", {"mod_id": mod_id}) # type: ignore - - -class AsyncCommentServiceClient(AsyncGRPCClient): - _RPC_REQUEST_CLASSES: ClassVar[dict[str, type[_message.Message]]] = { - "CreateComment": comment_pb2.CreateCommentRequest, - "EditComment": comment_pb2.EditCommentRequest, - "DeleteComment": comment_pb2.DeleteCommentRequest, - "GetComments": comment_pb2.GetCommentsRequest, - } - def _initialize_stub(self) -> None: self._stub = comment_pb2_grpc.CommentServiceStub(self._channel) # type: ignore diff --git a/src/gateway/clients/mod.py b/src/gateway/clients/mod.py index 2ba1542..e4422c0 100644 --- a/src/gateway/clients/mod.py +++ b/src/gateway/clients/mod.py @@ -4,7 +4,7 @@ from gateway.stubs import mod_pb2, mod_pb2_grpc -from .base_client import AsyncGRPCClient, GRPCClient +from .base_client import GRPCClient class ModServiceClient(GRPCClient): @@ -27,41 +27,6 @@ def _create_request(self, rpc_name: str, request_data: dict[str, Any]) -> _messa # *** # - def create_mod(self, title: str, author_id: int, filename: str, description: str) -> mod_pb2.CreateModResponse: - return self.call_rpc( # type: ignore - "CreateMod", {"title": title, "author_id": author_id, "filename": filename, "description": description} - ) - - def set_status_mod(self, mod_id: int, status: str) -> mod_pb2.SetStatusResponse: - return self.call_rpc("SetStatus", {"mod_id": mod_id, "status": status}) # type: ignore - - def get_mod_download_link(self, mod_id: int) -> mod_pb2.GetModDownloadLinkResponse: - return self.call_rpc("GetModDownloadLink", {"mod_id": mod_id}) # type: ignore - - def get_mods(self) -> mod_pb2.GetModsResponse: - return self.call_rpc("GetMods", {}) # type: ignore - - -class AsyncModServiceClient(AsyncGRPCClient): - _RPC_REQUEST_CLASSES: ClassVar[dict[str, type[_message.Message]]] = { - "CreateMod": mod_pb2.CreateModRequest, - "SetStatus": mod_pb2.SetStatusRequest, - "GetModDownloadLink": mod_pb2.GetModDownloadLinkRequest, - "GetMods": mod_pb2.GetModsRequest, - } - - def _initialize_stub(self) -> None: - self._stub = mod_pb2_grpc.ModServiceStub(self._channel) # type: ignore - - def _create_request(self, rpc_name: str, request_data: dict[str, Any]) -> _message.Message: - request_class = self._RPC_REQUEST_CLASSES.get(rpc_name) - if not request_class: - raise ValueError(f"Неизветсный RPC метод: {rpc_name}") - - return request_class(**request_data) - - # *** # - async def create_mod( self, title: str, author_id: int, filename: str, description: str ) -> mod_pb2.CreateModResponse: @@ -72,7 +37,7 @@ async def create_mod( async def set_status_mod(self, mod_id: int, status: str) -> mod_pb2.SetStatusResponse: return await self.call_rpc("SetStatus", {"mod_id": mod_id, "status": status}) # type: ignore - async def get_mod_download_link_rpc(self, mod_id: int) -> mod_pb2.GetModDownloadLinkResponse: + async def get_mod_download_link(self, mod_id: int) -> mod_pb2.GetModDownloadLinkResponse: return await self.call_rpc("GetModDownloadLink", {"mod_id": mod_id}) # type: ignore async def get_mods(self) -> mod_pb2.GetModsResponse: diff --git a/src/gateway/clients/rating.py b/src/gateway/clients/rating.py index 71ec0c4..f548cf3 100644 --- a/src/gateway/clients/rating.py +++ b/src/gateway/clients/rating.py @@ -4,7 +4,7 @@ from gateway.stubs import rating_pb2, rating_pb2_grpc -from .base_client import AsyncGRPCClient, GRPCClient +from .base_client import GRPCClient class RatingServiceClient(GRPCClient): @@ -25,31 +25,6 @@ def _create_request(self, rpc_name: str, request_data: dict[str, Any]) -> _messa # *** # - def rate_mod(self, mod_id: int, author_id: int, rate: str) -> rating_pb2.RateModResponse: - return self.call_rpc("RateMod", {"mod_id": mod_id, "author_id": author_id, "rate": rate}) # type: ignore - - def get_rates(self, mod_id: int) -> rating_pb2.GetRatesResponse: - return self.call_rpc("GetRates", {"mod_id": mod_id}) # type: ignore - - -class AsyncRatingServiceClient(AsyncGRPCClient): - _RPC_REQUEST_CLASSES: ClassVar[dict[str, type[_message.Message]]] = { - "RateMod": rating_pb2.RateModRequest, - "GetRates": rating_pb2.GetRatesRequest, - } - - def _initialize_stub(self) -> None: - self._stub = rating_pb2_grpc.RatingServiceStub(self._channel) # type: ignore - - def _create_request(self, rpc_name: str, request_data: dict[str, Any]) -> _message.Message: - request_class = self._RPC_REQUEST_CLASSES.get(rpc_name) - if not request_class: - raise ValueError(f"Неизветсный RPC метод: {rpc_name}") - - return request_class(**request_data) - - # *** # - async def rate_mod(self, mod_id: int, author_id: int, rate: str) -> rating_pb2.RateModResponse: return await self.call_rpc("RateMod", {"mod_id": mod_id, "author_id": author_id, "rate": rate}) # type: ignore diff --git a/src/gateway/esclient_graphql.py b/src/gateway/esclient_graphql.py index 7511658..c87bb85 100644 --- a/src/gateway/esclient_graphql.py +++ b/src/gateway/esclient_graphql.py @@ -1,11 +1,11 @@ from typing import Any -from gateway.clients.base_client import AsyncGRPCClient, GRPCClient +from gateway.clients.base_client import GRPCClient class GQLContextViewer: def __init__(self) -> None: - self.clients: dict[str, GRPCClient | AsyncGRPCClient] = {} + self.clients: dict[str, GRPCClient] = {} def get_current(self, request: Any) -> dict[str, Any]: return {"request": request, "clients": self.clients} diff --git a/src/gateway/server.py b/src/gateway/server.py index d8f9c65..124b784 100644 --- a/src/gateway/server.py +++ b/src/gateway/server.py @@ -6,7 +6,7 @@ from ariadne.asgi import GraphQL from ariadne.explorer import ExplorerGraphiQL -from gateway.clients.client_factory import AsyncGRPCClientFactory +from gateway.clients.client_factory import GRPCClientFactory from gateway.resolvers.mutation.comment import comment_mutation from gateway.resolvers.mutation.mod import mod_mutation from gateway.resolvers.mutation.root import mutation @@ -35,17 +35,16 @@ ) context_viewer = GQLContextViewer() -# sync_clients_factory = GRPCClientFactory(settings) -async_clients_factory = AsyncGRPCClientFactory(settings) +clients_factory = GRPCClientFactory(settings) app = GraphQL(schema, debug=True, explorer=ExplorerGraphiQL(), context_value=context_viewer.get_current) async def main(): # type: ignore logger.info("Инициализация gRPC клиентов...") - comment_client = async_clients_factory.get_comment_client() - mod_client = async_clients_factory.get_mod_client() - rating_client = async_clients_factory.get_rating_client() + comment_client = clients_factory.get_comment_client() + mod_client = clients_factory.get_mod_client() + rating_client = clients_factory.get_rating_client() logger.info("gRPC клиенты инициализированы.") context_viewer.clients = { @@ -68,7 +67,7 @@ async def main(): # type: ignore await server.serve() finally: logger.info("Закрытие gRPC каналов...") - await async_clients_factory.close_all() + await clients_factory.close_all() logger.info("Все соединения закрыты.") From 0ffa21962a9ac6819138fd0ae75d98825402e493 Mon Sep 17 00:00:00 2001 From: Andrey Kataev Date: Mon, 13 Oct 2025 14:31:43 +0300 Subject: [PATCH 7/7] =?UTF-8?q?=D0=9F=D1=80=D0=B0=D0=B2=D0=BA=D0=B8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/gateway/clients/base_client.py | 53 +++++++-------------- src/gateway/clients/client_factory.py | 23 ++++----- src/gateway/clients/comment.py | 34 ++++--------- src/gateway/clients/mod.py | 36 ++++---------- src/gateway/clients/rating.py | 26 +++------- src/gateway/esclient_graphql.py | 4 +- src/gateway/resolvers/grpc_error_wrapper.py | 6 +-- src/gateway/server.py | 8 +++- 8 files changed, 65 insertions(+), 125 deletions(-) diff --git a/src/gateway/clients/base_client.py b/src/gateway/clients/base_client.py index f71f9a1..d818e79 100644 --- a/src/gateway/clients/base_client.py +++ b/src/gateway/clients/base_client.py @@ -1,5 +1,6 @@ import logging -from typing import Any, ClassVar, Self +from typing import Self +from collections.abc import Awaitable, Callable import grpc import grpc.aio @@ -10,17 +11,11 @@ logger = logging.getLogger(__name__) -class GRPCError(Exception): +class GrpcError(Exception): pass -class RPCDoesNotExistException(GRPCError): - """Исключение когда RPC метод не найден в gRPC стабе""" - - -class GRPCClient: - _RPC_REQUEST_CLASSES: ClassVar[dict[str, type[_message.Message]]] = {} - +class GrpcClient: def __init__(self, channel: grpc.aio.Channel): self._channel = channel self._stub = None @@ -30,41 +25,25 @@ def _initialize_stub(self) -> None: raise NotImplementedError() @grpc_retry() # type: ignore - async def call_rpc(self, rpc_name: str, request_data: dict[str, Any], timeout: int = 30) -> _message.Message: - """ - Универсальный метод вызова RPC (async) - - Args: - rpc_name: имя RPC метода - request_data: данные для запроса - timeout: таймаут в секундах - - Returns: - gRPC response message - """ + async def call( + self, + rpc_method: Callable[["_message.Message"], Awaitable["_message.Message"]], + request: "_message.Message", + *, + timeout: int = 30, + ) -> "_message.Message": try: - rpc_method = self._get_rpc_method(rpc_name) - request = self._create_request(rpc_name, request_data) - response: _message.Message = await rpc_method(request, timeout=timeout) # type: ignore[operator] return response - except grpc.RpcError as e: - logger.error(f"gRPC ошибка {rpc_name}: {e} (async)") - raise GRPCError(f"gRPC Ошибка вызова: {e} (async)") from e + logger.error(f"gRPC ошибка {rpc_method.__name__ if hasattr(rpc_method, '__name__') else rpc_method}: {e}") + raise GrpcError(f"gRPC Ошибка вызова: {e}") from e except Exception as e: - logger.error(f"Неизвестная ошибка при вызове {rpc_name}: {e} (async)") + logger.error( + f"Неизвестная ошибка при вызове {rpc_method.__name__ if hasattr(rpc_method, '__name__') else rpc_method}: {e}" + ) raise - def _get_rpc_method(self, rpc_name: str) -> _message.Message: - rpc_method: _message.Message | None = getattr(self._stub, rpc_name, None) - if not rpc_method: - raise RPCDoesNotExistException(f"RPC метод '{rpc_name}' не найден в стабе (async)") - return rpc_method - - def _create_request(self, rpc_name: str, request_data: dict[str, Any]) -> _message.Message: - raise NotImplementedError() - async def close(self) -> None: if self._channel: await self._channel.close() diff --git a/src/gateway/clients/client_factory.py b/src/gateway/clients/client_factory.py index a7526e2..a585626 100644 --- a/src/gateway/clients/client_factory.py +++ b/src/gateway/clients/client_factory.py @@ -3,33 +3,34 @@ from gateway.clients.comment import CommentServiceClient from gateway.clients.mod import ModServiceClient from gateway.clients.rating import RatingServiceClient -from gateway.settings import Settings -class GRPCClientFactory: - def __init__(self, settings: Settings) -> None: - self._settings = settings +class GrpcClientFactory: + def __init__(self, comment_service_url: str, mod_service_url: str, rating_service_url: str) -> None: + self._comment_service_url = comment_service_url + self._mod_service_url = mod_service_url + self._rating_service_url = rating_service_url self._channels: dict[str, grpc.aio.Channel] = {} # На всякий случай - def _add_to_channels_if_needed(self, url: str) -> None: + def _add_to_channels(self, url: str) -> None: if url not in self._channels: self._channels[url] = grpc.aio.insecure_channel(url) def get_comment_client(self) -> CommentServiceClient: - url = self._settings.comment_service_url - self._add_to_channels_if_needed(url) + url = self._comment_service_url + self._add_to_channels(url) return CommentServiceClient(self._channels[url]) def get_mod_client(self) -> ModServiceClient: - url = self._settings.mod_service_url - self._add_to_channels_if_needed(url) + url = self._mod_service_url + self._add_to_channels(url) return ModServiceClient(self._channels[url]) def get_rating_client(self) -> RatingServiceClient: - url = self._settings.rating_service_url - self._add_to_channels_if_needed(url) + url = self._rating_service_url + self._add_to_channels(url) return RatingServiceClient(self._channels[url]) diff --git a/src/gateway/clients/comment.py b/src/gateway/clients/comment.py index 9f0601d..8447c97 100644 --- a/src/gateway/clients/comment.py +++ b/src/gateway/clients/comment.py @@ -1,40 +1,26 @@ -from typing import Any, ClassVar -from google.protobuf import message as _message from gateway.stubs import comment_pb2, comment_pb2_grpc -from .base_client import GRPCClient +from .base_client import GrpcClient -class CommentServiceClient(GRPCClient): - _RPC_REQUEST_CLASSES: ClassVar[dict[str, type[_message.Message]]] = { - "CreateComment": comment_pb2.CreateCommentRequest, - "EditComment": comment_pb2.EditCommentRequest, - "DeleteComment": comment_pb2.DeleteCommentRequest, - "GetComments": comment_pb2.GetCommentsRequest, - } - +class CommentServiceClient(GrpcClient): def _initialize_stub(self) -> None: self._stub = comment_pb2_grpc.CommentServiceStub(self._channel) # type: ignore - def _create_request(self, rpc_name: str, request_data: dict[str, Any]) -> _message.Message: - request_class = self._RPC_REQUEST_CLASSES.get(rpc_name) - if not request_class: - raise ValueError(f"Неизветсный RPC метод: {rpc_name}") - - return request_class(**request_data) - - # *** # - async def create_comment(self, mod_id: int, author_id: int, text: str) -> comment_pb2.CreateCommentResponse: - return await self.call_rpc("CreateComment", {"mod_id": mod_id, "author_id": author_id, "text": text}) # type: ignore + request = comment_pb2.CreateCommentRequest(mod_id=mod_id, author_id=author_id, text=text) + return await self.call(self._stub.CreateComment, request) async def edit_comment(self, comment_id: int, text: str) -> comment_pb2.EditCommentResponse: - return await self.call_rpc("EditComment", {"comment_id": comment_id, "text": text}) # type: ignore + request = comment_pb2.EditCommentRequest(comment_id=comment_id, text=text) + return await self.call(self._stub.EditComment, request) async def delete_comment(self, comment_id: int) -> comment_pb2.DeleteCommentResponse: - return await self.call_rpc("DeleteComment", {"comment_id": comment_id}) # type: ignore + request = comment_pb2.DeleteCommentRequest(comment_id=comment_id) + return await self.call(self._stub.DeleteComment, request) async def get_comments(self, mod_id: int) -> comment_pb2.GetCommentsResponse: - return await self.call_rpc("GetComments", {"mod_id": mod_id}) # type: ignore + request = comment_pb2.GetCommentsRequest(mod_id=mod_id) + return await self.call(self._stub.GetComments, request) diff --git a/src/gateway/clients/mod.py b/src/gateway/clients/mod.py index e4422c0..749877f 100644 --- a/src/gateway/clients/mod.py +++ b/src/gateway/clients/mod.py @@ -1,44 +1,28 @@ -from typing import Any, ClassVar -from google.protobuf import message as _message from gateway.stubs import mod_pb2, mod_pb2_grpc -from .base_client import GRPCClient +from .base_client import GrpcClient -class ModServiceClient(GRPCClient): - _RPC_REQUEST_CLASSES: ClassVar[dict[str, type[_message.Message]]] = { - "CreateMod": mod_pb2.CreateModRequest, - "SetStatus": mod_pb2.SetStatusRequest, - "GetModDownloadLink": mod_pb2.GetModDownloadLinkRequest, - "GetMods": mod_pb2.GetModsRequest, - } - +class ModServiceClient(GrpcClient): def _initialize_stub(self) -> None: self._stub = mod_pb2_grpc.ModServiceStub(self._channel) # type: ignore - def _create_request(self, rpc_name: str, request_data: dict[str, Any]) -> _message.Message: - request_class = self._RPC_REQUEST_CLASSES.get(rpc_name) - if not request_class: - raise ValueError(f"Неизветсный RPC метод: {rpc_name}") - - return request_class(**request_data) - - # *** # - async def create_mod( self, title: str, author_id: int, filename: str, description: str ) -> mod_pb2.CreateModResponse: - return await self.call_rpc( # type: ignore - "CreateMod", {"title": title, "author_id": author_id, "filename": filename, "description": description} - ) + request = mod_pb2.CreateModRequest(title=title, author_id=author_id, filename=filename, description=description) + return await self.call(self._stub.CreateMod, request) async def set_status_mod(self, mod_id: int, status: str) -> mod_pb2.SetStatusResponse: - return await self.call_rpc("SetStatus", {"mod_id": mod_id, "status": status}) # type: ignore + request = mod_pb2.SetStatusRequest(mod_id=mod_id, status=status) + return await self.call(self._stub.SetStatus, request) async def get_mod_download_link(self, mod_id: int) -> mod_pb2.GetModDownloadLinkResponse: - return await self.call_rpc("GetModDownloadLink", {"mod_id": mod_id}) # type: ignore + request = mod_pb2.GetModDownloadLinkRequest(mod_id=mod_id) + return await self.call(self._stub.GetModDownloadLink, request) async def get_mods(self) -> mod_pb2.GetModsResponse: - return await self.call_rpc("GetMods", {}) # type: ignore + request = mod_pb2.GetModsRequest() + return await self.call(self._stub.GetMods, request) diff --git a/src/gateway/clients/rating.py b/src/gateway/clients/rating.py index f548cf3..4df61d5 100644 --- a/src/gateway/clients/rating.py +++ b/src/gateway/clients/rating.py @@ -1,32 +1,18 @@ -from typing import Any, ClassVar -from google.protobuf import message as _message from gateway.stubs import rating_pb2, rating_pb2_grpc -from .base_client import GRPCClient +from .base_client import GrpcClient -class RatingServiceClient(GRPCClient): - _RPC_REQUEST_CLASSES: ClassVar[dict[str, type[_message.Message]]] = { - "RateMod": rating_pb2.RateModRequest, - "GetRates": rating_pb2.GetRatesRequest, - } - +class RatingServiceClient(GrpcClient): def _initialize_stub(self) -> None: self._stub = rating_pb2_grpc.RatingServiceStub(self._channel) # type: ignore - def _create_request(self, rpc_name: str, request_data: dict[str, Any]) -> _message.Message: - request_class = self._RPC_REQUEST_CLASSES.get(rpc_name) - if not request_class: - raise ValueError(f"Неизветсный RPC метод: {rpc_name}") - - return request_class(**request_data) - - # *** # - async def rate_mod(self, mod_id: int, author_id: int, rate: str) -> rating_pb2.RateModResponse: - return await self.call_rpc("RateMod", {"mod_id": mod_id, "author_id": author_id, "rate": rate}) # type: ignore + request = rating_pb2.RateModRequest(mod_id=mod_id, author_id=author_id, rate=rate) + return await self.call(self._stub.RateMod, request) async def get_rates(self, mod_id: int) -> rating_pb2.GetRatesResponse: - return await self.call_rpc("GetRates", {"mod_id": mod_id}) # type: ignore + request = rating_pb2.GetRatesRequest(mod_id=mod_id) + return await self.call(self._stub.GetRates, request) diff --git a/src/gateway/esclient_graphql.py b/src/gateway/esclient_graphql.py index c87bb85..aa7bd70 100644 --- a/src/gateway/esclient_graphql.py +++ b/src/gateway/esclient_graphql.py @@ -1,11 +1,11 @@ from typing import Any -from gateway.clients.base_client import GRPCClient +from gateway.clients.base_client import GrpcClient class GQLContextViewer: def __init__(self) -> None: - self.clients: dict[str, GRPCClient] = {} + self.clients: dict[str, GrpcClient] = {} def get_current(self, request: Any) -> dict[str, Any]: return {"request": request, "clients": self.clients} diff --git a/src/gateway/resolvers/grpc_error_wrapper.py b/src/gateway/resolvers/grpc_error_wrapper.py index 5c96f29..315ef59 100644 --- a/src/gateway/resolvers/grpc_error_wrapper.py +++ b/src/gateway/resolvers/grpc_error_wrapper.py @@ -5,7 +5,7 @@ from graphql import GraphQLError -from ..clients.base_client import GRPCError +from ..clients.base_client import GrpcError def handle_grpc_errors(func: Callable) -> Callable: # type: ignore @@ -15,7 +15,7 @@ def handle_grpc_errors(func: Callable) -> Callable: # type: ignore async def async_wrapper(*args, **kwargs) -> Any: # type: ignore try: return await func(*args, **kwargs) - except GRPCError as e: + except GrpcError as e: raise GraphQLError(f"gRPC ошибка: {e}") from None except Exception as e: raise GraphQLError(f"Неизвестная ошибка: {e}") from None @@ -28,7 +28,7 @@ async def async_wrapper(*args, **kwargs) -> Any: # type: ignore def sync_wrapper(*args, **kwargs) -> Any: # type: ignore try: return func(*args, **kwargs) - except GRPCError as e: + except GrpcError as e: raise GraphQLError(f"gRPC ошибка: {e}") from None except Exception as e: raise GraphQLError(f"Неизвестная ошибка: {e}") from None diff --git a/src/gateway/server.py b/src/gateway/server.py index 124b784..9f2b914 100644 --- a/src/gateway/server.py +++ b/src/gateway/server.py @@ -6,7 +6,7 @@ from ariadne.asgi import GraphQL from ariadne.explorer import ExplorerGraphiQL -from gateway.clients.client_factory import GRPCClientFactory +from gateway.clients.client_factory import GrpcClientFactory from gateway.resolvers.mutation.comment import comment_mutation from gateway.resolvers.mutation.mod import mod_mutation from gateway.resolvers.mutation.root import mutation @@ -35,7 +35,11 @@ ) context_viewer = GQLContextViewer() -clients_factory = GRPCClientFactory(settings) +clients_factory = GrpcClientFactory( + comment_service_url=settings.comment_service_url, + mod_service_url=settings.mod_service_url, + rating_service_url=settings.rating_service_url, +) app = GraphQL(schema, debug=True, explorer=ExplorerGraphiQL(), context_value=context_viewer.get_current)