Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Empty file added agave/tools/__init__.py
Empty file.
Empty file added agave/tools/asyncio/__init__.py
Empty file.
27 changes: 27 additions & 0 deletions agave/tools/asyncio/sqs_celery_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
import asyncio
from dataclasses import dataclass
from typing import Iterable, Optional

from ..celery import build_celery_message
from .sqs_client import SqsClient


@dataclass
class SqsCeleryClient(SqsClient):
async def send_task(
self,
name: str,
args: Optional[Iterable] = None,
kwargs: Optional[dict] = None,
) -> None:
celery_message = build_celery_message(name, args or (), kwargs or {})
await super().send_message(celery_message)

def send_background_task(
self,
name: str,
args: Optional[Iterable] = None,
kwargs: Optional[dict] = None,
) -> asyncio.Task:
celery_message = build_celery_message(name, args or (), kwargs or {})
return super().send_message_async(celery_message)
13 changes: 9 additions & 4 deletions agave/tasks/sqs_client.py → agave/tools/asyncio/sqs_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@
from types_aiobotocore_sqs import SQSClient
except ImportError:
raise ImportError(
"You must install agave with [fastapi, tasks] option.\n"
"You can install it with: pip install agave[fastapi, tasks]"
"You must install agave with [asyncio_aws_tools] option.\n"
"You can install it with: pip install agave[asyncio_aws_tools]"
)


Expand All @@ -25,11 +25,16 @@ class SqsClient:
def background_tasks(self) -> set:
return self._background_tasks

async def __aenter__(self):
async def __aenter__(self) -> "SqsClient":
await self.start()
return self

async def __aexit__(self, exc_type, exc_val, exc_tb):
async def __aexit__(
self,
exc_type: Optional[type],
exc_val: Optional[Exception],
exc_tb: Optional[object],
) -> None:
await self.close()

async def start(self):
Expand Down
37 changes: 6 additions & 31 deletions agave/tasks/sqs_celery_client.py → agave/tools/celery.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
import asyncio
import json
from base64 import b64encode
from dataclasses import dataclass
from typing import Iterable, Optional
from typing import Iterable
from uuid import uuid4

from agave.tasks.sqs_client import SqsClient

def _b64_encode(value: str) -> str:
encoded = b64encode(bytes(value, 'utf-8'))
return encoded.decode('utf-8')

def _build_celery_message(

def build_celery_message(
task_name: str, args_: Iterable, kwargs_: dict
) -> str:
task_id = str(uuid4())
Expand Down Expand Up @@ -47,29 +48,3 @@ def _build_celery_message(

encoded = _b64_encode(json.dumps(message))
return encoded


def _b64_encode(value: str) -> str:
encoded = b64encode(bytes(value, 'utf-8'))
return encoded.decode('utf-8')


@dataclass
class SqsCeleryClient(SqsClient):
async def send_task(
self,
name: str,
args: Optional[Iterable] = None,
kwargs: Optional[dict] = None,
) -> None:
celery_message = _build_celery_message(name, args or (), kwargs or {})
await super().send_message(celery_message)

def send_background_task(
self,
name: str,
args: Optional[Iterable] = None,
kwargs: Optional[dict] = None,
) -> asyncio.Task:
celery_message = _build_celery_message(name, args or (), kwargs or {})
return super().send_message_async(celery_message)
Empty file added agave/tools/sync/__init__.py
Empty file.
17 changes: 17 additions & 0 deletions agave/tools/sync/sqs_celery_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
from dataclasses import dataclass
from typing import Iterable, Optional

from ..celery import build_celery_message
from .sqs_client import SqsClient


@dataclass
class SqsCeleryClient(SqsClient):
def send_task(
self,
name: str,
args: Optional[Iterable] = None,
kwargs: Optional[dict] = None,
) -> None:
celery_message = build_celery_message(name, args or (), kwargs or {})
self.send_message(celery_message)
34 changes: 34 additions & 0 deletions agave/tools/sync/sqs_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
import json
from dataclasses import dataclass, field
from typing import Optional, Union
from uuid import uuid4

try:
import boto3
from types_boto3_sqs import SQSClient as Boto3SQSClient
except ImportError:
raise ImportError(
"You must install agave with [sync_aws_tools] option.\n"
"You can install it with: pip install agave[sync_aws_tools]"
)


@dataclass
class SqsClient:
queue_url: str
region_name: str
_sqs: Boto3SQSClient = field(init=False)

def __post_init__(self) -> None:
self._sqs = boto3.client('sqs', region_name=self.region_name)

def send_message(
self,
data: Union[str, dict],
message_group_id: Optional[str] = None,
) -> None:
self._sqs.send_message(
QueueUrl=self.queue_url,
MessageBody=data if isinstance(data, str) else json.dumps(data),
MessageGroupId=message_group_id or str(uuid4()),
)
2 changes: 1 addition & 1 deletion agave/version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = '1.1.0'
__version__ = '1.2.0'
2 changes: 1 addition & 1 deletion requirements-test.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@ mypy==1.14.1
mongomock==4.3.0
mock==5.1.0
pytest-freezegun==0.4.2
pytest-chalice==0.0.5
click==8.1.8
moto[server]==5.0.26
pytest-vcr==1.0.2
pytest-asyncio==0.18.*
requests==2.32.3
httpx==0.28.1
typing_extensions==4.12.2
2 changes: 2 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
cuenca-validations==2.1.0
boto3==1.35.74
types-boto3[sqs]==1.35.74
chalice==1.31.3
mongoengine==0.29.1
fastapi==0.115.6
Expand Down
8 changes: 8 additions & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,14 @@
'aiobotocore>=2.0.0,<3.0.0',
'types-aiobotocore-sqs>=2.1.0,<3.0.0',
],
'sync_aws_tools': [
'boto3>=1.34.106,<2.0.0',
'types-boto3[sqs]>=1.34.106,<2.0.0',
],
'asyncio_aws_tools': [
'aiobotocore>=2.0.0,<3.0.0',
'types-aiobotocore-sqs>=2.1.0,<3.0.0',
],
},
classifiers=[
'Programming Language :: Python :: 3.9',
Expand Down
103 changes: 103 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,21 @@
import datetime as dt
import functools
import json
import os
from functools import partial
from typing import Callable, Generator

import aiobotocore
import boto3
import pytest
from _pytest.monkeypatch import MonkeyPatch
from aiobotocore.session import AioSession
from chalice.test import Client as OriginalChaliceClient
from fastapi.testclient import TestClient as FastAPIClient
from mongoengine import Document
from typing_extensions import deprecated

from agave.tasks import sqs_tasks
from examples.config import (
TEST_DEFAULT_PLATFORM_ID,
TEST_DEFAULT_USER_ID,
Expand Down Expand Up @@ -216,3 +224,98 @@ def chalice_client() -> Generator[ChaliceClient, None, None]:

client = ChaliceClient(app)
yield client


@deprecated('Use fixtures from cuenca-test-fixtures')
@pytest.fixture(scope='session')
def aws_credentials() -> None:
"""Mocked AWS Credentials for moto."""
os.environ['AWS_ACCESS_KEY_ID'] = 'testing'
os.environ['AWS_SECRET_ACCESS_KEY'] = 'testing'
os.environ['AWS_SECURITY_TOKEN'] = 'testing'
os.environ['AWS_DEFAULT_REGION'] = 'us-east-1'
boto3.setup_default_session()


@deprecated('Use fixtures from cuenca-test-fixtures')
@pytest.fixture(scope='session')
def aws_endpoint_urls(
aws_credentials,
) -> Generator[dict[str, str], None, None]:
from moto.server import ThreadedMotoServer

server = ThreadedMotoServer(port=4000)
server.start()

endpoints = dict(
sqs='http://127.0.0.1:4000/',
)
yield endpoints

server.stop()


@pytest.fixture(autouse=True)
def patch_tasks_count(monkeypatch: MonkeyPatch) -> None:
def one_loop(*_, **__):
# Para pruebas solo unos cuantos ciclos
for i in range(5):
yield i

monkeypatch.setattr(sqs_tasks, 'count', one_loop)


@deprecated('Use fixtures from cuenca-test-fixtures')
@pytest.fixture(autouse=True)
def patch_aiobotocore_create_client(
aws_endpoint_urls, monkeypatch: MonkeyPatch
) -> None:
create_client = AioSession.create_client

def mock_create_client(*args, **kwargs):
service_name = next(a for a in args if type(a) is str)
kwargs['endpoint_url'] = aws_endpoint_urls[service_name]

return create_client(*args, **kwargs)

monkeypatch.setattr(AioSession, 'create_client', mock_create_client)


@deprecated('Use fixtures from cuenca-test-fixtures')
@pytest.fixture(autouse=True)
def patch_boto3_create_client(
aws_endpoint_urls, monkeypatch: MonkeyPatch
) -> None:
create_client = boto3.Session.client

def mock_client(*args, **kwargs):
service_name = next(a for a in args if type(a) is str)
if service_name in aws_endpoint_urls:
kwargs['endpoint_url'] = aws_endpoint_urls[service_name]
return create_client(*args, **kwargs)

monkeypatch.setattr(boto3.Session, 'client', mock_client)


@deprecated('Use fixtures from cuenca-test-fixtures')
@pytest.fixture
async def sqs_client():
session = aiobotocore.session.get_session()
async with session.create_client('sqs', 'us-east-1') as sqs:
await sqs.create_queue(
QueueName='core.fifo',
Attributes={
'FifoQueue': 'true',
'ContentBasedDeduplication': 'true',
},
)
resp = await sqs.get_queue_url(QueueName='core.fifo')
sqs.send_message = partial(sqs.send_message, QueueUrl=resp['QueueUrl'])
sqs.receive_message = partial(
sqs.receive_message,
QueueUrl=resp['QueueUrl'],
AttributeNames=['ApproximateReceiveCount'],
)
sqs.queue_url = resp['QueueUrl']
yield sqs
await sqs.purge_queue(QueueUrl=resp['QueueUrl'])
Loading
Loading