Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions configs/dev.yaml
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
HOST: 0.0.0.0
PORT: 8000
PORT: 7776

COMMENT_SERVICE_HOST: 0.0.0.0
COMMENT_SERVICE_HOST: localhost
COMMENT_SERVICE_PORT: 7002

RATING_SERVICE_HOST: 0.0.0.0
RATING_SERVICE_HOST: localhost
RATING_SERVICE_PORT: 7003

MOD_SERVICE_HOST: 0.0.0.0
MOD_SERVICE_HOST: localhost
MOD_SERVICE_PORT: 7004
55 changes: 55 additions & 0 deletions src/gateway/clients/base_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
import logging
from typing import Self
from collections.abc import Awaitable, Callable

import grpc
import grpc.aio
from google.protobuf import message as _message

from ..helpers.retry import grpc_retry

logger = logging.getLogger(__name__)


class GrpcError(Exception):
pass


class GrpcClient:
def __init__(self, channel: grpc.aio.Channel):
self._channel = channel
self._stub = None
self._initialize_stub()

def _initialize_stub(self) -> None:
raise NotImplementedError()

@grpc_retry() # type: ignore
async def call(
self,
rpc_method: Callable[["_message.Message"], Awaitable["_message.Message"]],
request: "_message.Message",
*,
timeout: int = 30,
) -> "_message.Message":
try:
response: _message.Message = await rpc_method(request, timeout=timeout) # type: ignore[operator]
return response
except grpc.RpcError as e:
logger.error(f"gRPC ошибка {rpc_method.__name__ if hasattr(rpc_method, '__name__') else rpc_method}: {e}")
raise GrpcError(f"gRPC Ошибка вызова: {e}") from e
except Exception as e:
logger.error(
f"Неизвестная ошибка при вызове {rpc_method.__name__ if hasattr(rpc_method, '__name__') else rpc_method}: {e}"
)
raise

async def close(self) -> None:
if self._channel:
await self._channel.close()

async def __aenter__(self) -> Self:
return self

async def __aexit__(self, exc_type, exc_val, exc_tb) -> None: # type: ignore
await self.close()
45 changes: 45 additions & 0 deletions src/gateway/clients/client_factory.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
import grpc

from gateway.clients.comment import CommentServiceClient
from gateway.clients.mod import ModServiceClient
from gateway.clients.rating import RatingServiceClient


class GrpcClientFactory:
def __init__(self, comment_service_url: str, mod_service_url: str, rating_service_url: str) -> None:
self._comment_service_url = comment_service_url
self._mod_service_url = mod_service_url
self._rating_service_url = rating_service_url
self._channels: dict[str, grpc.aio.Channel] = {} # На всякий случай

def _add_to_channels(self, url: str) -> None:
if url not in self._channels:
self._channels[url] = grpc.aio.insecure_channel(url)

def get_comment_client(self) -> CommentServiceClient:
url = self._comment_service_url
self._add_to_channels(url)

return CommentServiceClient(self._channels[url])

def get_mod_client(self) -> ModServiceClient:
url = self._mod_service_url
self._add_to_channels(url)

return ModServiceClient(self._channels[url])

def get_rating_client(self) -> RatingServiceClient:
url = self._rating_service_url
self._add_to_channels(url)

return RatingServiceClient(self._channels[url])

async def close_all(self) -> None:
for channel in self._channels.values():
await channel.close()

async def __aenter__(self): # type: ignore
return self

async def __aexit__(self, exc_type, exc, tb): # type: ignore
await self.close_all()
41 changes: 17 additions & 24 deletions src/gateway/clients/comment.py
Original file line number Diff line number Diff line change
@@ -1,33 +1,26 @@
import grpc

import gateway.stubs.comment_pb2
import gateway.stubs.comment_pb2_grpc
from gateway.settings import Settings

_settings = Settings()
_channel = grpc.insecure_channel(_settings.comment_service_url)
_stub = gateway.stubs.comment_pb2_grpc.CommentServiceStub(_channel) # type: ignore
from gateway.stubs import comment_pb2, comment_pb2_grpc

from .base_client import GrpcClient

def create_comment_rpc(mod_id: int, author_id: int, text: str) -> gateway.stubs.comment_pb2.CreateCommentResponse:
req = gateway.stubs.comment_pb2.CreateCommentRequest(mod_id=mod_id, author_id=author_id, text=text)
return _stub.CreateComment(req) # type: ignore

class CommentServiceClient(GrpcClient):
def _initialize_stub(self) -> None:
self._stub = comment_pb2_grpc.CommentServiceStub(self._channel) # type: ignore

def edit_comment_rpc(comment_id: int, text: str) -> gateway.stubs.comment_pb2.EditCommentResponse:
req = gateway.stubs.comment_pb2.EditCommentRequest(comment_id=comment_id, text=text)
return _stub.EditComment(req) # type: ignore
async def create_comment(self, mod_id: int, author_id: int, text: str) -> comment_pb2.CreateCommentResponse:
request = comment_pb2.CreateCommentRequest(mod_id=mod_id, author_id=author_id, text=text)
return await self.call(self._stub.CreateComment, request)

async def edit_comment(self, comment_id: int, text: str) -> comment_pb2.EditCommentResponse:
request = comment_pb2.EditCommentRequest(comment_id=comment_id, text=text)
return await self.call(self._stub.EditComment, request)

def delete_comment_rpc(
comment_id: int,
) -> gateway.stubs.comment_pb2.DeleteCommentResponse:
req = gateway.stubs.comment_pb2.DeleteCommentRequest(comment_id=comment_id)
return _stub.DeleteComment(req) # type: ignore
async def delete_comment(self, comment_id: int) -> comment_pb2.DeleteCommentResponse:
request = comment_pb2.DeleteCommentRequest(comment_id=comment_id)
return await self.call(self._stub.DeleteComment, request)


def get_comments_rpc(
mod_id: int,
) -> gateway.stubs.comment_pb2.GetCommentsResponse:
req = gateway.stubs.comment_pb2.GetCommentsRequest(mod_id=mod_id)
return _stub.GetComments(req) # type: ignore
async def get_comments(self, mod_id: int) -> comment_pb2.GetCommentsResponse:
request = comment_pb2.GetCommentsRequest(mod_id=mod_id)
return await self.call(self._stub.GetComments, request)
49 changes: 19 additions & 30 deletions src/gateway/clients/mod.py
Original file line number Diff line number Diff line change
@@ -1,39 +1,28 @@
import grpc

import gateway.stubs.mod_pb2
import gateway.stubs.mod_pb2_grpc
from gateway.converters.mod_status_converter import graphql_to_proto_mod_status
from gateway.settings import Settings

_settings = Settings()
_channel = grpc.insecure_channel(_settings.mod_service_url)
_stub = gateway.stubs.mod_pb2_grpc.ModServiceStub(_channel) # type: ignore
from gateway.stubs import mod_pb2, mod_pb2_grpc

from .base_client import GrpcClient

def create_mod_rpc(
title: str, author_id: int, filename: str, description: str
) -> gateway.stubs.mod_pb2.CreateModResponse:
req = gateway.stubs.mod_pb2.CreateModRequest(
title=title,
author_id=author_id,
filename=filename,
description=description,
)
return _stub.CreateMod(req) # type: ignore

class ModServiceClient(GrpcClient):
def _initialize_stub(self) -> None:
self._stub = mod_pb2_grpc.ModServiceStub(self._channel) # type: ignore

def set_status_mod_rpc(mod_id: int, status: str) -> gateway.stubs.mod_pb2.SetStatusResponse:
req = gateway.stubs.mod_pb2.SetStatusRequest(mod_id=mod_id, status=graphql_to_proto_mod_status(status)) # type: ignore
return _stub.SetStatus(req) # type: ignore
async def create_mod(
self, title: str, author_id: int, filename: str, description: str
) -> mod_pb2.CreateModResponse:
request = mod_pb2.CreateModRequest(title=title, author_id=author_id, filename=filename, description=description)
return await self.call(self._stub.CreateMod, request)

async def set_status_mod(self, mod_id: int, status: str) -> mod_pb2.SetStatusResponse:
request = mod_pb2.SetStatusRequest(mod_id=mod_id, status=status)
return await self.call(self._stub.SetStatus, request)

def get_mod_download_link_rpc(
mod_id: int,
) -> gateway.stubs.mod_pb2.GetModDownloadLinkResponse:
req = gateway.stubs.mod_pb2.GetModDownloadLinkRequest(mod_id=mod_id)
return _stub.GetModDownloadLink(req) # type: ignore
async def get_mod_download_link(self, mod_id: int) -> mod_pb2.GetModDownloadLinkResponse:
request = mod_pb2.GetModDownloadLinkRequest(mod_id=mod_id)
return await self.call(self._stub.GetModDownloadLink, request)


def get_mods_rpc() -> gateway.stubs.mod_pb2.GetModsResponse:
req = gateway.stubs.mod_pb2.GetModsRequest()
return _stub.GetMods(req) # type: ignore
async def get_mods(self) -> mod_pb2.GetModsResponse:
request = mod_pb2.GetModsRequest()
return await self.call(self._stub.GetMods, request)
25 changes: 12 additions & 13 deletions src/gateway/clients/rating.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,18 @@
import grpc

import gateway.stubs.rating_pb2
import gateway.stubs.rating_pb2_grpc
from gateway.settings import Settings

_settings = Settings()
_channel = grpc.insecure_channel(_settings.rating_service_url)
_stub = gateway.stubs.rating_pb2_grpc.RatingServiceStub(_channel) # type: ignore
from gateway.stubs import rating_pb2, rating_pb2_grpc

from .base_client import GrpcClient

def rate_mod_rpc(mod_id: int, author_id: int, rate: str) -> gateway.stubs.rating_pb2.RateModResponse:
req = gateway.stubs.rating_pb2.RateModRequest(mod_id=mod_id, author_id=author_id, rate=rate)
return _stub.RateMod(req) # type: ignore

class RatingServiceClient(GrpcClient):
def _initialize_stub(self) -> None:
self._stub = rating_pb2_grpc.RatingServiceStub(self._channel) # type: ignore

def get_rates_rpc(mod_id: int) -> gateway.stubs.rating_pb2.GetRatesResponse:
req = gateway.stubs.rating_pb2.GetRatesRequest(mod_id=mod_id)
return _stub.GetRates(req) # type: ignore
async def rate_mod(self, mod_id: int, author_id: int, rate: str) -> rating_pb2.RateModResponse:
request = rating_pb2.RateModRequest(mod_id=mod_id, author_id=author_id, rate=rate)
return await self.call(self._stub.RateMod, request)

async def get_rates(self, mod_id: int) -> rating_pb2.GetRatesResponse:
request = rating_pb2.GetRatesRequest(mod_id=mod_id)
return await self.call(self._stub.GetRates, request)
11 changes: 11 additions & 0 deletions src/gateway/esclient_graphql.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
from typing import Any

from gateway.clients.base_client import GrpcClient


class GQLContextViewer:
def __init__(self) -> None:
self.clients: dict[str, GrpcClient] = {}

def get_current(self, request: Any) -> dict[str, Any]:
return {"request": request, "clients": self.clients}
53 changes: 53 additions & 0 deletions src/gateway/helpers/retry.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
import logging

import grpc
from tenacity import RetryCallState, retry, retry_if_exception, stop_after_attempt, wait_exponential

logger = logging.getLogger(__name__)

NON_RETRYABLE = {grpc.StatusCode.UNIMPLEMENTED, grpc.StatusCode.INVALID_ARGUMENT, grpc.StatusCode.NOT_FOUND}

RETRY_ATTEMPTS = 3
RETRY_DELAY_MULTIPLIER = 1
RETRY_DELAY_MIN = 1
RETRY_DELAY_MAX = 10


def is_retryable_grpc_exception(exc: BaseException) -> bool:
if isinstance(exc, grpc.RpcError):
code = exc.code() if hasattr(exc, "code") else None
return code not in NON_RETRYABLE
return False


def log_retry_attempt(retry_state: RetryCallState) -> None:
if retry_state.attempt_number < 1:
return

if retry_state.outcome is None or not retry_state.outcome.failed or not hasattr(retry_state.outcome, "exception"):
return

try:
exception = retry_state.outcome.exception()
if exception is None:
return

sleep_time = getattr(retry_state.next_action, "sleep", 0) if retry_state.next_action else 0

logger.warning(
f"Повторная попытка {retry_state.attempt_number}/{RETRY_ATTEMPTS} "
f"для gRPC вызова. Ошибка: {exception} "
f"(следующая попытка через {sleep_time:.1f} сек)"
)
except Exception as e:
logger.debug(f"Ошибка при логировании повторной попытки: {e}")


def grpc_retry(): # type: ignore
return retry(
stop=stop_after_attempt(RETRY_ATTEMPTS),
wait=wait_exponential(multiplier=RETRY_DELAY_MULTIPLIER, min=RETRY_DELAY_MIN, max=RETRY_DELAY_MAX),
retry=retry_if_exception(is_retryable_grpc_exception),
reraise=True,
before_sleep=log_retry_attempt,
)
36 changes: 36 additions & 0 deletions src/gateway/resolvers/grpc_error_wrapper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
import asyncio
from collections.abc import Callable
from functools import wraps
from typing import Any

from graphql import GraphQLError

from ..clients.base_client import GrpcError


def handle_grpc_errors(func: Callable) -> Callable: # type: ignore
if asyncio.iscoroutinefunction(func):

@wraps(func)
async def async_wrapper(*args, **kwargs) -> Any: # type: ignore
try:
return await func(*args, **kwargs)
except GrpcError as e:
raise GraphQLError(f"gRPC ошибка: {e}") from None
except Exception as e:
raise GraphQLError(f"Неизвестная ошибка: {e}") from None

return async_wrapper

else:

@wraps(func)
def sync_wrapper(*args, **kwargs) -> Any: # type: ignore
try:
return func(*args, **kwargs)
except GrpcError as e:
raise GraphQLError(f"gRPC ошибка: {e}") from None
except Exception as e:
raise GraphQLError(f"Неизвестная ошибка: {e}") from None

return sync_wrapper
Loading
Loading