Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 48 additions & 0 deletions tests/test_broker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
import pytest
import asyncio
from fasta2a.broker import InMemoryBroker
from fasta2a.schema import TaskSendParams

pytestmark = pytest.mark.anyio

@pytest.fixture
def broker():
return InMemoryBroker()

async def test_send_and_receive_task(broker: InMemoryBroker):
task_params: TaskSendParams = {
'id': 'task_123',
'context_id': 'ctx_abc',
'message': {
'role': 'user',
'parts': [{'kind': 'text', 'text': 'test message'}],
'kind': 'message',
'message_id': 'msg_xyz'
}
}

result_queue = asyncio.Queue()

async def consumer():
async for op in broker.receive_task_operations():
await result_queue.put(op)
break

async with broker:
consumer_task = asyncio.create_task(consumer())
await asyncio.sleep(0.01)

await broker.run_task(task_params)

try:
received_op = await asyncio.wait_for(result_queue.get(), timeout=1)
finally:
consumer_task.cancel()
try:
await consumer_task
except asyncio.CancelledError:
pass

assert received_op is not None
assert received_op['operation'] == 'run'
assert received_op['params']['id'] == 'task_123'
55 changes: 55 additions & 0 deletions tests/test_storage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
import pytest
from fasta2a.storage import InMemoryStorage, ContextT
from fasta2a.schema import Message, TextPart

pytestmark = pytest.mark.anyio

@pytest.fixture
def storage() -> InMemoryStorage[ContextT]:
return InMemoryStorage()

@pytest.mark.asyncio
async def test_submit_and_load_task(storage: InMemoryStorage):
message = Message(
role='user',
parts=[TextPart(text='hello', kind='text')],
kind='message',
message_id='msg1'
)

task = await storage.submit_task(context_id="ctx1", message=message)
assert task is not None
assert task['id'] is not None
assert task['status']['state'] == 'submitted'
assert task['context_id'] == 'ctx1'

loaded_task = await storage.load_task(task['id'])
assert loaded_task is not None
assert loaded_task['id'] == task['id']

@pytest.mark.asyncio
async def test_update_task(storage: InMemoryStorage):
message = Message(
role='user',
parts=[TextPart(text='hello', kind='text')],
kind='message',
message_id='msg2'
)
task = await storage.submit_task(context_id="ctx2", message=message)

await storage.update_task(task['id'], state='completed')

updated_task = await storage.load_task(task['id'])
assert updated_task is not None
assert updated_task['status']['state'] == 'completed'

@pytest.mark.asyncio
async def test_context_storage(storage: InMemoryStorage):
context = await storage.load_context("ctx3")
assert context is None

await storage.update_context("ctx3", ["message1", "message2"])

loaded_context = await storage.load_context("ctx3")
assert loaded_context is not None
assert loaded_context == ["message1", "message2"]
105 changes: 105 additions & 0 deletions tests/test_task_manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
# tests/test_task_manager.py

import pytest
import asyncio
from typing import Any

from fasta2a.task_manager import TaskManager
from fasta2a.broker import InMemoryBroker
from fasta2a.storage import InMemoryStorage, ContextT
from fasta2a.worker import Worker
from fasta2a.schema import (
Artifact,
Message,
TextPart,
TaskSendParams,
TaskIdParams,
SendMessageRequest,
GetTaskRequest
)

pytestmark = pytest.mark.anyio

class MockWorker(Worker[ContextT]):
def __init__(self, storage, broker):
super().__init__(storage=storage, broker=broker)
self.executed_tasks = []
self.canceled_tasks = []
self.is_running = asyncio.Event()

async def run_task(self, params: TaskSendParams) -> None:
self.executed_tasks.append(params['id'])
await self.storage.update_task(params['id'], state='working')
await asyncio.sleep(0.1)
response_message = Message(
role='agent',
parts=[TextPart(text=f"Completed task {params['id']}", kind='text')],
kind='message',
message_id='response_msg'
)
await self.storage.update_task(
params['id'],
state='completed',
new_messages=[response_message]
)

async def cancel_task(self, params: TaskIdParams) -> None:
self.canceled_tasks.append(params['id'])
await self.storage.update_task(params['id'], state='canceled')

def build_message_history(self, history: list[Message]) -> list[Any]:
return []

def build_artifacts(self, result: Any) -> list[Artifact]:
return []

async def _loop(self) -> None:
self.is_running.set()
await super()._loop()

@pytest.fixture
def storage() -> InMemoryStorage[ContextT]:
return InMemoryStorage()

@pytest.fixture
def broker():
return InMemoryBroker()

@pytest.fixture
def worker(storage, broker):
return MockWorker(storage=storage, broker=broker)

@pytest.fixture
def task_manager(storage, broker):
return TaskManager(storage=storage, broker=broker)

async def test_task_lifecycle(task_manager: TaskManager, worker: MockWorker, storage: InMemoryStorage):
async with task_manager, worker.run():
await worker.is_running.wait()
send_request = SendMessageRequest(
jsonrpc='2.0',
id='req1',
method='message/send',
params={
'message': {
'role': 'user',
'parts': [{'kind': 'text', 'text': 'Please run a test task.'}],
'kind': 'message',
'message_id': 'user_msg1'
}
}
)
response = await task_manager.send_message(send_request)
task_id = response['result']['id']
await asyncio.sleep(0.2)
assert task_id in worker.executed_tasks
get_request = GetTaskRequest(
jsonrpc='2.0',
id='req2',
method='tasks/get',
params={'id': task_id}
)
get_response = await task_manager.get_task(get_request)
assert get_response['result']['status']['state'] == 'completed'
assert len(get_response['result']['history']) == 2
assert get_response['result']['history'][-1]['role'] == 'agent'
Loading