diff --git a/.github/workflows/check-in-container.yml b/.github/workflows/check-in-container.yml index 49e5b48d..323cbc09 100644 --- a/.github/workflows/check-in-container.yml +++ b/.github/workflows/check-in-container.yml @@ -16,6 +16,7 @@ jobs: include: - target: check-agents-in-container - target: check-mcp-server-in-container + - target: check-jira-issue-fetcher-in-container steps: - name: Checkout code uses: actions/checkout@v4 diff --git a/beeai/Containerfile.tests b/beeai/Containerfile.tests index f9516085..dbb36e7f 100644 --- a/beeai/Containerfile.tests +++ b/beeai/Containerfile.tests @@ -11,11 +11,17 @@ RUN dnf -y install \ python3-pytest \ python3-pytest-asyncio \ python3-specfile \ + python3-redis \ + python3-requests \ + python3-backoff \ && dnf clean all RUN git config --global user.email "jotnar-tests@example.com" \ && git config --global user.name "Jotnar Tests" +# Set PYTHONPATH so agents module can be imported +ENV PYTHONPATH=/src:$PYTHONPATH + # Install BeeAI Framework and FastMCP RUN pip3 install --no-cache-dir beeai-framework fastmcp redis diff --git a/beeai/Makefile b/beeai/Makefile index 5cb7b1d9..4e56e905 100644 --- a/beeai/Makefile +++ b/beeai/Makefile @@ -135,3 +135,5 @@ check-agents-in-container: $(MAKE) -f Makefile.tests check-agents-in-container check-mcp-server-in-container: $(MAKE) -f Makefile.tests check-mcp-server-in-container +check-jira-issue-fetcher-in-container: + $(MAKE) -f Makefile.tests check-jira-issue-fetcher-in-container diff --git a/beeai/Makefile.tests b/beeai/Makefile.tests index 8bae8b6a..202cff79 100644 --- a/beeai/Makefile.tests +++ b/beeai/Makefile.tests @@ -7,17 +7,26 @@ CONTAINER_ENGINE ?= $(shell command -v podman 2>/dev/null || echo "docker") build-test-image: $(CONTAINER_ENGINE) build --rm --tag $(TEST_IMAGE) -f Containerfile.tests -.PHONY: check check-agents check-mcp-server check-in-container check-agents-in-container check-mcp-server-in-container +.PHONY: check check-agents check-mcp-server check-in-container \ + check-agents-in-container check-mcp-server-in-container check-jira-issue-fetcher-in-container + check-agents: cd ./agents && \ PYTHONPATH=$(CURDIR) PYTHONDONTWRITEBYTECODE=1 python3 -m pytest --verbose --showlocals $(TEST_TARGET) check-mcp-server: cd ./mcp_server && \ PYTHONPATH=$(CURDIR) PYTHONDONTWRITEBYTECODE=1 python3 -m pytest --verbose --showlocals $(TEST_TARGET) -check: check-agents check-mcp-server +check-jira-issue-fetcher: + cd ./jira_issue_fetcher && \ + PYTHONPATH=$(CURDIR) PYTHONDONTWRITEBYTECODE=1 python3 -m pytest --verbose --showlocals $(TEST_TARGET) + +check: check-agents check-mcp-server check-jira-issue-fetcher check-agents-in-container: $(CONTAINER_ENGINE) run --rm -it -v $(CURDIR):/src:z --env TEST_TARGET $(TEST_IMAGE) make -f Makefile.tests check-agents check-mcp-server-in-container: $(CONTAINER_ENGINE) run --rm -it -v $(CURDIR):/src:z --env TEST_TARGET $(TEST_IMAGE) make -f Makefile.tests check-mcp-server -check-in-container: check-agents-in-container check-mcp-server-in-container +check-jira-issue-fetcher-in-container: + $(CONTAINER_ENGINE) run --rm -it -v $(CURDIR):/src:z --env TEST_TARGET $(TEST_IMAGE) make -f Makefile.tests check-jira-issue-fetcher + +check-in-container: check-agents-in-container check-mcp-server-in-container check-jira-issue-fetcher-in-container diff --git a/beeai/agents/backport_agent.py b/beeai/agents/backport_agent.py index e3ed80b8..175f769e 100644 --- a/beeai/agents/backport_agent.py +++ b/beeai/agents/backport_agent.py @@ -29,7 +29,8 @@ from agents.build_agent import create_build_agent, get_prompt as get_build_prompt from common.models import BackportInputSchema, BackportOutputSchema, BuildInputSchema, BuildOutputSchema, Task from common.utils import redis_client, fix_await -from constants import I_AM_JOTNAR, CAREFULLY_REVIEW_CHANGES, JiraLabels +from constants import I_AM_JOTNAR, CAREFULLY_REVIEW_CHANGES +from common.constants import JiraLabels, RedisQueues from observability import setup_observability from tools.commands import RunShellCommandTool from tools.specfile import AddChangelogEntryTool, BumpReleaseTool @@ -346,7 +347,7 @@ async def comment_in_jira(state): while True: logger.info("Waiting for tasks from backport_queue (timeout: 30s)...") - element = await fix_await(redis.brpop(["backport_queue"], timeout=30)) + element = await fix_await(redis.brpop([RedisQueues.BACKPORT_QUEUE.value], timeout=30)) if element is None: logger.info("No tasks received, continuing to wait...") continue @@ -371,7 +372,7 @@ async def retry(task, error): f"Task failed (attempt {task.attempts}/{max_retries}), " f"re-queuing for retry: {backport_data.jira_issue}" ) - await fix_await(redis.lpush("backport_queue", task.model_dump_json())) + await fix_await(redis.lpush(RedisQueues.BACKPORT_QUEUE.value, task.model_dump_json())) else: logger.error( f"Task failed after {max_retries} attempts, " @@ -383,7 +384,7 @@ async def retry(task, error): labels_to_remove=[JiraLabels.BACKPORT_IN_PROGRESS.value], dry_run=dry_run ) - await fix_await(redis.lpush("error_list", error)) + await fix_await(redis.lpush(RedisQueues.ERROR_LIST.value, error)) try: logger.info(f"Starting backport processing for {backport_data.jira_issue}") @@ -411,7 +412,7 @@ async def retry(task, error): labels_to_remove=[JiraLabels.BACKPORT_IN_PROGRESS.value], dry_run=dry_run ) - await redis.lpush("completed_backport_list", state.backport_result.model_dump_json()) + await redis.lpush(RedisQueues.COMPLETED_BACKPORT_LIST.value, state.backport_result.model_dump_json()) else: logger.warning(f"Backport failed for {backport_data.jira_issue}: {state.backport_result.error}") await tasks.set_jira_labels( diff --git a/beeai/agents/constants.py b/beeai/agents/constants.py index 90b7ec86..6aef442b 100644 --- a/beeai/agents/constants.py +++ b/beeai/agents/constants.py @@ -13,27 +13,3 @@ I_AM_JOTNAR = "by Jotnar, a Red Hat Enterprise Linux software maintenance AI agent." CAREFULLY_REVIEW_CHANGES = "Carefully review the changes and make sure they are correct." - -class JiraLabels(Enum): - """Constants for Jira labels used by Jotnar agents""" - REBASE_IN_PROGRESS = "jotnar_rebase_in_progress" - BACKPORT_IN_PROGRESS = "jotnar_backport_in_progress" - NEEDS_ATTENTION = "jotnar_needs_attention" - NO_ACTION_NEEDED = "jotnar_no_action_needed" - - REBASED = "jotnar_rebased" - BACKPORTED = "jotnar_backported" - - REBASE_ERRORED = "jotnar_rebase_errored" - BACKPORT_ERRORED = "jotnar_backport_errored" - TRIAGE_ERRORED = "jotnar_triage_errored" - - REBASE_FAILED = "jotnar_rebase_failed" - BACKPORT_FAILED = "jotnar_backport_failed" - - RETRY_NEEDED = "jotnar_retry_needed" - - @classmethod - def all_labels(cls) -> set[str]: - """Return all Jotnar labels for cleanup operations""" - return {label.value for label in cls} diff --git a/beeai/agents/rebase_agent.py b/beeai/agents/rebase_agent.py index 5cc0a9a8..3e853d78 100644 --- a/beeai/agents/rebase_agent.py +++ b/beeai/agents/rebase_agent.py @@ -28,7 +28,8 @@ from common.config import get_package_instructions from common.models import BuildInputSchema, BuildOutputSchema, RebaseInputSchema, RebaseOutputSchema, Task from common.utils import redis_client, fix_await -from constants import I_AM_JOTNAR, CAREFULLY_REVIEW_CHANGES, JiraLabels +from constants import I_AM_JOTNAR, CAREFULLY_REVIEW_CHANGES +from common.constants import JiraLabels, RedisQueues from observability import setup_observability from tools.commands import RunShellCommandTool from tools.specfile import AddChangelogEntryTool @@ -343,7 +344,7 @@ async def comment_in_jira(state): while True: logger.info("Waiting for tasks from rebase_queue (timeout: 30s)...") - element = await fix_await(redis.brpop(["rebase_queue"], timeout=30)) + element = await fix_await(redis.brpop([RedisQueues.REBASE_QUEUE.value], timeout=30)) if element is None: logger.info("No tasks received, continuing to wait...") continue @@ -368,7 +369,7 @@ async def retry(task, error): f"Task failed (attempt {task.attempts}/{max_retries}), " f"re-queuing for retry: {rebase_data.jira_issue}" ) - await fix_await(redis.lpush("rebase_queue", task.model_dump_json())) + await fix_await(redis.lpush(RedisQueues.REBASE_QUEUE.value, task.model_dump_json())) else: logger.error( f"Task failed after {max_retries} attempts, " @@ -380,7 +381,7 @@ async def retry(task, error): labels_to_remove=[JiraLabels.REBASE_IN_PROGRESS.value], dry_run=dry_run ) - await fix_await(redis.lpush("error_list", error)) + await fix_await(redis.lpush(RedisQueues.ERROR_LIST.value, error)) try: logger.info(f"Starting rebase processing for {rebase_data.jira_issue}") @@ -407,7 +408,7 @@ async def retry(task, error): labels_to_remove=[JiraLabels.REBASE_IN_PROGRESS.value], dry_run=dry_run ) - await fix_await(redis.lpush("completed_rebase_list", state.rebase_result.model_dump_json())) + await fix_await(redis.lpush(RedisQueues.COMPLETED_REBASE_LIST.value, state.rebase_result.model_dump_json())) else: logger.warning(f"Rebase failed for {rebase_data.jira_issue}: {state.rebase_result.error}") await tasks.set_jira_labels( diff --git a/beeai/agents/tasks.py b/beeai/agents/tasks.py index 7b8e5f99..a2f0b392 100644 --- a/beeai/agents/tasks.py +++ b/beeai/agents/tasks.py @@ -8,7 +8,7 @@ from beeai_framework.tools import Tool from common.utils import is_cs_branch -from constants import BRANCH_PREFIX, JIRA_COMMENT_TEMPLATE, JiraLabels +from constants import BRANCH_PREFIX, JIRA_COMMENT_TEMPLATE from utils import check_subprocess, run_subprocess, run_tool, mcp_tools logger = logging.getLogger(__name__) diff --git a/beeai/agents/triage_agent.py b/beeai/agents/triage_agent.py index 84af006e..fd59bad9 100644 --- a/beeai/agents/triage_agent.py +++ b/beeai/agents/triage_agent.py @@ -38,7 +38,7 @@ CVEEligibilityResult, ) from common.utils import redis_client, fix_await -from constants import JiraLabels +from common.constants import JiraLabels, RedisQueues from observability import setup_observability from tools.commands import RunShellCommandTool from tools.patch_validator import PatchValidatorTool @@ -451,7 +451,7 @@ async def comment_in_jira(state): while True: logger.info("Waiting for tasks from triage_queue (timeout: 30s)...") - element = await fix_await(redis.brpop(["triage_queue"], timeout=30)) + element = await fix_await(redis.brpop([RedisQueues.TRIAGE_QUEUE.value], timeout=30)) if element is None: logger.info("No tasks received, continuing to wait...") continue @@ -470,7 +470,7 @@ async def retry(task, error): f"Task failed (attempt {task.attempts}/{max_retries}), " f"re-queuing for retry: {input.issue}" ) - await fix_await(redis.lpush("triage_queue", task.model_dump_json())) + await fix_await(redis.lpush(RedisQueues.TRIAGE_QUEUE.value, task.model_dump_json())) else: logger.error( f"Task failed after {max_retries} attempts, " f"moving to error list: {input.issue}" @@ -480,7 +480,7 @@ async def retry(task, error): labels_to_add=[JiraLabels.TRIAGE_ERRORED.value], dry_run=dry_run ) - await fix_await(redis.lpush("error_list", error)) + await fix_await(redis.lpush(RedisQueues.ERROR_LIST.value, error)) try: await tasks.set_jira_labels( @@ -514,7 +514,7 @@ async def retry(task, error): dry_run=dry_run ) task = Task(metadata=state.model_dump()) - await redis.lpush("rebase_queue", task.model_dump_json()) + await redis.lpush(RedisQueues.REBASE_QUEUE.value, task.model_dump_json()) elif output.resolution == Resolution.BACKPORT: logger.info(f"Triage resolved as BACKPORT for {input.issue}, " f"adding to backport queue") await tasks.set_jira_labels( @@ -523,7 +523,7 @@ async def retry(task, error): dry_run=dry_run ) task = Task(metadata=state.model_dump()) - await redis.lpush("backport_queue", task.model_dump_json()) + await redis.lpush(RedisQueues.BACKPORT_QUEUE.value, task.model_dump_json()) elif output.resolution == Resolution.CLARIFICATION_NEEDED: logger.info( f"Triage resolved as CLARIFICATION_NEEDED for {input.issue}, " @@ -535,7 +535,7 @@ async def retry(task, error): dry_run=dry_run ) task = Task(metadata=state.model_dump()) - await redis.lpush("clarification_needed_queue", task.model_dump_json()) + await redis.lpush(RedisQueues.CLARIFICATION_NEEDED_QUEUE.value, task.model_dump_json()) elif output.resolution == Resolution.NO_ACTION: logger.info(f"Triage resolved as NO_ACTION for {input.issue}, " f"adding to no action list") await tasks.set_jira_labels( @@ -543,7 +543,7 @@ async def retry(task, error): labels_to_add=[JiraLabels.NO_ACTION_NEEDED.value], dry_run=dry_run ) - await fix_await(redis.lpush("no_action_list", output.data.model_dump_json())) + await fix_await(redis.lpush(RedisQueues.NO_ACTION_LIST.value, output.data.model_dump_json())) elif output.resolution == Resolution.ERROR: logger.warning(f"Triage resolved as ERROR for {input.issue}, retrying") await tasks.set_jira_labels( diff --git a/beeai/common/constants.py b/beeai/common/constants.py new file mode 100644 index 00000000..f4168830 --- /dev/null +++ b/beeai/common/constants.py @@ -0,0 +1,54 @@ +from enum import Enum + +class RedisQueues(Enum): + """Constants for Redis queue names used by Jotnar agents""" + TRIAGE_QUEUE = "triage_queue" + REBASE_QUEUE = "rebase_queue" + BACKPORT_QUEUE = "backport_queue" + CLARIFICATION_NEEDED_QUEUE = "clarification_needed_queue" + ERROR_LIST = "error_list" + NO_ACTION_LIST = "no_action_list" + COMPLETED_REBASE_LIST = "completed_rebase_list" + COMPLETED_BACKPORT_LIST = "completed_backport_list" + + @classmethod + def all_queues(cls) -> set[str]: + """Return all Redis queue names for operations that need to check all queues""" + return {queue.value for queue in cls} + + @classmethod + def input_queues(cls) -> set[str]: + """Return input queue names that contain Task objects with metadata""" + return {cls.TRIAGE_QUEUE.value, cls.REBASE_QUEUE.value, cls.BACKPORT_QUEUE.value, cls.CLARIFICATION_NEEDED_QUEUE.value} + + @classmethod + def data_queues(cls) -> set[str]: + """Return data queue names that contain schema objects""" + return {cls.ERROR_LIST.value, + cls.NO_ACTION_LIST.value, cls.COMPLETED_REBASE_LIST.value, + cls.COMPLETED_BACKPORT_LIST.value} + + +class JiraLabels(Enum): + """Constants for Jira labels used by Jotnar agents""" + REBASE_IN_PROGRESS = "jotnar_rebase_in_progress" + BACKPORT_IN_PROGRESS = "jotnar_backport_in_progress" + NEEDS_ATTENTION = "jotnar_needs_attention" + NO_ACTION_NEEDED = "jotnar_no_action_needed" + + REBASED = "jotnar_rebased" + BACKPORTED = "jotnar_backported" + + REBASE_ERRORED = "jotnar_rebase_errored" + BACKPORT_ERRORED = "jotnar_backport_errored" + TRIAGE_ERRORED = "jotnar_triage_errored" + + REBASE_FAILED = "jotnar_rebase_failed" + BACKPORT_FAILED = "jotnar_backport_failed" + + RETRY_NEEDED = "jotnar_retry_needed" + + @classmethod + def all_labels(cls) -> set[str]: + """Return all Jotnar labels for cleanup operations""" + return {label.value for label in cls} diff --git a/beeai/jira_issue_fetcher/jira_issue_fetcher.py b/beeai/jira_issue_fetcher/jira_issue_fetcher.py index febd1315..ec54483a 100644 --- a/beeai/jira_issue_fetcher/jira_issue_fetcher.py +++ b/beeai/jira_issue_fetcher/jira_issue_fetcher.py @@ -42,6 +42,7 @@ ErrorData ) from common.utils import redis_client, fix_await +from common.constants import JiraLabels, RedisQueues # Configure logging logging.basicConfig( @@ -185,16 +186,7 @@ async def _get_existing_issue_keys(self, redis_conn: redis.Redis) -> set[str]: """ try: # All Redis queues and lists to check - queue_names = [ - "triage_queue", - "rebase_queue", - "backport_queue", - "clarification_needed_queue", - "error_list", - "no_action_list", - "completed_rebase_list", - "completed_backport_list" - ] + queue_names = list(RedisQueues.all_queues()) existing_keys = set() @@ -209,41 +201,38 @@ async def _get_existing_issue_keys(self, redis_conn: redis.Redis) -> set[str]: issue_key = None # For input queues, parse as Task and extract from metadata - if queue_name in ["triage_queue", "rebase_queue", "backport_queue"]: + if queue_name in RedisQueues.input_queues(): task = Task.model_validate_json(item) if task.metadata: - if queue_name == "triage_queue": - schema = TriageInputSchema.model_validate(task.metadata) - issue_key = schema.issue.upper() - elif queue_name == "rebase_queue": - schema = RebaseInputSchema.model_validate(task.metadata) - issue_key = schema.jira_issue.upper() - elif queue_name == "backport_queue": - schema = BackportInputSchema.model_validate(task.metadata) - issue_key = schema.jira_issue.upper() + match queue_name: + case RedisQueues.TRIAGE_QUEUE.value: + schema = TriageInputSchema.model_validate(task.metadata) + issue_key = schema.issue.upper() + case RedisQueues.REBASE_QUEUE.value | RedisQueues.BACKPORT_QUEUE.value | RedisQueues.CLARIFICATION_NEEDED_QUEUE.value: + issue_key = task.metadata.get("jira_issue", "").upper() + case _: + continue # For result/data queues, parse the data directly else: try: - if queue_name in ["completed_rebase_list"]: - schema = RebaseOutputSchema.model_validate_json(item) - # Output schemas don't have issue keys, skip these - # hopefully we get it from the jira labels query - continue - elif queue_name in ["completed_backport_list"]: - schema = BackportOutputSchema.model_validate_json(item) - # Output schemas don't have issue keys, skip these - # hopefully we get it from the jira labels query - continue - elif queue_name in ["clarification_needed_queue"]: - schema = ClarificationNeededData.model_validate_json(item) - issue_key = schema.jira_issue.upper() - elif queue_name in ["no_action_list"]: - schema = NoActionData.model_validate_json(item) - issue_key = schema.jira_issue.upper() - elif queue_name in ["error_list"]: - schema = ErrorData.model_validate_json(item) - issue_key = schema.jira_issue.upper() + match queue_name: + case RedisQueues.COMPLETED_REBASE_LIST.value: + schema = RebaseOutputSchema.model_validate_json(item) + # Schema doesn't have issue keys, skip these + continue + case RedisQueues.COMPLETED_BACKPORT_LIST.value: + schema = BackportOutputSchema.model_validate_json(item) + # Schema doesn't have issue keys, skip these + continue + case RedisQueues.NO_ACTION_LIST.value: + schema = NoActionData.model_validate_json(item) + issue_key = schema.jira_issue.upper() + case RedisQueues.ERROR_LIST.value: + schema = ErrorData.model_validate_json(item) + issue_key = schema.jira_issue.upper() + case _: + continue except ValueError: # Fallback to task parsing for these queues if direct parsing fails task = Task.model_validate_json(item) @@ -294,17 +283,15 @@ async def push_issues_to_queue(self, issues: List[Dict[str, Any]]) -> int: jotnar_labels = [label for label in labels if label.startswith('jotnar_')] # If issue has jötnar labels and there is no jotnar_retry_needed label, mark as existing - if jotnar_labels and 'jotnar_retry_needed' not in jotnar_labels: + if jotnar_labels and JiraLabels.RETRY_NEEDED.value not in jotnar_labels: existing_keys.add(issue_key) logger.info(f"Issue {issue_key} has jötnar labels {jotnar_labels} - marking as existing") - elif 'jotnar_retry_needed' in jotnar_labels: + elif JiraLabels.RETRY_NEEDED.value in jotnar_labels: logger.info(f"Issue {issue_key} has jotnar_retry_needed label - marking for retry") remove_issues_for_retry.add(issue_key) elif not jotnar_labels: - # TODO: uncomment this when we have implemented applying the labels to the issues in all the agents - # logger.info(f"Issue {issue_key} has no jötnar labels - marking for retry") - # remove_issues_for_retry.add(issue_key) - pass + logger.info(f"Issue {issue_key} has no jötnar labels - marking for retry") + remove_issues_for_retry.add(issue_key) pushed_count = 0 skipped_count = 0 @@ -321,7 +308,7 @@ async def push_issues_to_queue(self, issues: List[Dict[str, Any]]) -> int: # Create task using shared Pydantic model task = Task.from_issue(issue_key) - await fix_await(redis_conn.lpush("triage_queue", task.to_json())) + await fix_await(redis_conn.lpush(RedisQueues.TRIAGE_QUEUE.value, task.to_json())) pushed_count += 1 # Add to existing_keys to avoid duplicates within this batch diff --git a/beeai/jira_issue_fetcher/tests/unit/test_jira_issue_fetcher.py b/beeai/jira_issue_fetcher/tests/unit/test_jira_issue_fetcher.py new file mode 100644 index 00000000..8cd28eab --- /dev/null +++ b/beeai/jira_issue_fetcher/tests/unit/test_jira_issue_fetcher.py @@ -0,0 +1,402 @@ +import asyncio +import json +import os +import sys +import time +import jira_issue_fetcher +from contextlib import asynccontextmanager + +import pytest +import requests +from flexmock import flexmock + +from jira_issue_fetcher import JiraIssueFetcher +from common.models import Task, TriageInputSchema, RebaseInputSchema, BackportInputSchema, RebaseOutputSchema, BackportOutputSchema, ClarificationNeededData, NoActionData, ErrorData, RebaseData, BackportData +from common.constants import JiraLabels, RedisQueues +from common.utils import redis_client + + +@pytest.fixture +def mock_env_vars(monkeypatch): + """Mock environment variables.""" + monkeypatch.setenv('JIRA_URL', 'https://jira.test.com') + monkeypatch.setenv('JIRA_TOKEN', 'test_token') + monkeypatch.setenv('REDIS_URL', 'redis://localhost:6379') + monkeypatch.setenv('QUERY', 'filter = "Jotnar_1000_packages"') + +@pytest.fixture +def fetcher(mock_env_vars): + """Create a JiraIssueFetcher instance with mocked environment.""" + return JiraIssueFetcher() + +@pytest.fixture +def mock_redis_context(): + """Create a mock Redis context manager for testing.""" + # Create mock Redis object + mock_redis = flexmock() + + @asynccontextmanager + async def mock_context_manager(*_, **__): + yield mock_redis + + # Mock the redis_client function in the jira_issue_fetcher module where it's imported + flexmock(jira_issue_fetcher).should_receive('redis_client').replace_with(mock_context_manager) + return mock_redis, mock_context_manager + +def create_async_mock_return_value(value): + """Create a mock awaitable that returns the given value when awaited.""" + async def async_return(): + return value + return async_return() + +def test_init(mock_env_vars): + """Test JiraIssueFetcher initialization.""" + fetcher = JiraIssueFetcher() + + assert fetcher.jira_url == 'https://jira.test.com' + assert fetcher.jira_token == 'test_token' + assert fetcher.redis_url == 'redis://localhost:6379' + assert fetcher.query == 'filter = "Jotnar_1000_packages"' + assert fetcher.max_results_per_page == 500 + assert 'Bearer test_token' in fetcher.headers['Authorization'] + +@pytest.mark.asyncio +async def test_rate_limit(fetcher): + """Test rate limiting functionality.""" + flexmock(time).should_receive('time').and_return(0.0, 0.2).one_by_one() + + # Mock asyncio.sleep to return an awaitable coroutine + async def mock_sleep(sleep_time): + pass + + flexmock(asyncio).should_receive('sleep').and_return(mock_sleep(0.2)).once() + + fetcher.last_request_time = 0.0 + await fetcher._rate_limit() + + # Should have updated last_request_time + assert fetcher.last_request_time == 0.2 + +def test_make_request_with_retries_success(fetcher): + """Test successful HTTP request.""" + mock_response = flexmock() + mock_response.should_receive('raise_for_status').once() + mock_response.should_receive('json').and_return({'issues': []}).once() + mock_response.status_code = 200 + + flexmock(requests).should_receive('post').with_args( + 'https://jira.test.com/rest/api/2/search', + json={'jql': 'test query', 'startAt': 0, 'maxResults': 50}, + headers=fetcher.headers, + timeout=90 + ).and_return(mock_response).once() + + result = fetcher._make_request_with_retries( + 'https://jira.test.com/rest/api/2/search', + {'jql': 'test query', 'startAt': 0, 'maxResults': 50} + ) + + assert result == {'issues': []} + +def test_make_request_with_retries_rate_limited(fetcher): + """Test HTTP request with rate limiting (429 error).""" + mock_response = flexmock() + mock_response.status_code = 429 + + flexmock(requests).should_receive('post').and_return(mock_response) + flexmock(requests.HTTPError) + + # Mock the logger that's defined in the jira_issue_fetcher module + mock_logger = flexmock() + mock_logger.should_receive('warning') + flexmock(sys.modules['jira_issue_fetcher']).should_receive('logger').and_return(mock_logger) + + with pytest.raises(requests.HTTPError): + fetcher._make_request_with_retries( + 'https://jira.test.com/rest/api/2/search', + {'jql': 'test query'} + ) + +@pytest.mark.asyncio +async def test_search_issues_single_page(fetcher): + """Test searching issues with single page result.""" + mock_issues = [ + {'key': 'TEST-1', 'fields': {'labels': []}}, + {'key': 'TEST-2', 'fields': {'labels': [JiraLabels.RETRY_NEEDED.value]}} + ] + + # Mock _rate_limit to return an awaitable coroutine + async def mock_rate_limit(): + pass + flexmock(fetcher).should_receive('_rate_limit').and_return(mock_rate_limit()).once() + flexmock(fetcher).should_receive('_make_request_with_retries').with_args( + 'https://jira.test.com/rest/api/2/search', + json_data={ + 'jql': 'filter = "Jotnar_1000_packages"', + 'startAt': 0, + 'maxResults': 500, + 'fields': ['key', 'labels'] + } + ).and_return({ + 'issues': mock_issues, + 'total': 2, + 'startAt': 0, + 'maxResults': 500 + }).once() + + result = await fetcher.search_issues() + + assert len(result) == 2 + assert result[0]['key'] == 'TEST-1' + assert result[1]['key'] == 'TEST-2' + +@pytest.mark.asyncio +async def test_search_issues_multiple_pages(fetcher): + """Test searching issues with pagination.""" + mock_issues_page1 = [{'key': 'TEST-1', 'fields': {'labels': []}}] + mock_issues_page2 = [{'key': 'TEST-2', 'fields': {'labels': []}}] + + # Mock _rate_limit to return an awaitable coroutine, we can't reuse an awaitable coroutine + async def mock_rate_limit_1(): + pass + async def mock_rate_limit_2(): + pass + flexmock(fetcher).should_receive('_rate_limit').and_return(mock_rate_limit_1()).and_return(mock_rate_limit_2()) + flexmock(fetcher).should_receive('_make_request_with_retries').and_return( + { + 'issues': mock_issues_page1, + 'total': 2, + 'startAt': 0, + 'maxResults': 1 + } + ).and_return( + { + 'issues': mock_issues_page2, + 'total': 2, + 'startAt': 1, + 'maxResults': 1 + } + ) + + # Override max results for this test + fetcher.max_results_per_page = 1 + + result = await fetcher.search_issues() + + assert len(result) == 2 + +@pytest.mark.asyncio +async def test_get_existing_issue_keys(fetcher, mock_redis_context): + """Test getting existing issue keys from Redis queues.""" + # Mock the Task and schema imports + # Create actual Task and TriageInputSchema instances + task_data = {'metadata': {'issue': 'EXISTING-1'}, 'attempts': 0} + task_json = json.dumps(task_data) + + mock_redis, _ = mock_redis_context + mock_redis.should_receive('lrange').with_args(RedisQueues.TRIAGE_QUEUE.value, 0, -1).and_return( + create_async_mock_return_value([task_json]) + ) + + # Mock other queues as empty + for queue in [RedisQueues.REBASE_QUEUE.value, RedisQueues.BACKPORT_QUEUE.value, RedisQueues.CLARIFICATION_NEEDED_QUEUE.value, + RedisQueues.ERROR_LIST.value, RedisQueues.NO_ACTION_LIST.value, RedisQueues.COMPLETED_REBASE_LIST.value, RedisQueues.COMPLETED_BACKPORT_LIST.value]: + mock_redis.should_receive('lrange').with_args(queue, 0, -1).and_return( + create_async_mock_return_value([]) + ) + + result = await fetcher._get_existing_issue_keys(mock_redis) + + assert 'EXISTING-1' in result + +@pytest.mark.asyncio +async def test_push_issues_to_queue(fetcher, mock_redis_context): + """Test pushing new issues to the triage queue.""" + mock_redis, _ = mock_redis_context + # Create a real task and get its JSON representation + task = Task.from_issue('NEW-1') + task_json = task.to_json() + + mock_redis.should_receive('lpush').with_args(RedisQueues.TRIAGE_QUEUE.value, task_json).and_return( + create_async_mock_return_value(1) + ).once() + + issues = [{'key': 'NEW-1', 'fields': {'labels': []}}] + existing_keys = set() + + flexmock(fetcher).should_receive('_get_existing_issue_keys').and_return(create_async_mock_return_value(existing_keys)) + + result = await fetcher.push_issues_to_queue(issues) + + assert result == 1 + +@pytest.mark.asyncio +async def test_push_issues_to_queue_skip_existing(fetcher, mock_redis_context): + """Test that existing issues are skipped.""" + mock_redis, _ = mock_redis_context + + issues = [ + {'key': 'EXISTING-1', 'fields': {'labels': []}}, + {'key': 'NEW-1', 'fields': {'labels': []}} + ] + existing_keys = {'EXISTING-1'} + + flexmock(fetcher).should_receive('_get_existing_issue_keys').and_return(create_async_mock_return_value(existing_keys)) + + # Create a real task and get its JSON representation + task = Task.from_issue('NEW-1') + task_json = task.to_json() + + mock_redis.should_receive('lpush').with_args(RedisQueues.TRIAGE_QUEUE.value, task_json).and_return( + create_async_mock_return_value(1) + ).once() + + result = await fetcher.push_issues_to_queue(issues) + + assert result == 1 # Only NEW-1 should be pushed + +@pytest.mark.asyncio +async def test_push_issues_to_queue_skip_labeled_issues(fetcher, mock_redis_context): + """Test that issues with jotnar labels (except retry_needed) are skipped.""" + mock_redis, _ = mock_redis_context + + issues = [ + {'key': 'LABELED-1', 'fields': {'labels': [JiraLabels.REBASE_IN_PROGRESS.value]}}, + {'key': 'RETRY-1', 'fields': {'labels': [JiraLabels.RETRY_NEEDED.value]}}, + {'key': 'CLEAN-1', 'fields': {'labels': []}} + ] + existing_keys = set() + + flexmock(fetcher).should_receive('_get_existing_issue_keys').and_return(create_async_mock_return_value(existing_keys)) + + # Create real tasks and get their JSON representations + task1 = Task.from_issue('RETRY-1') + task2 = Task.from_issue('CLEAN-1') + task1_json = task1.to_json() + task2_json = task2.to_json() + + mock_redis.should_receive('lpush').with_args(RedisQueues.TRIAGE_QUEUE.value, task1_json).and_return( + create_async_mock_return_value(1) + ).once() + mock_redis.should_receive('lpush').with_args(RedisQueues.TRIAGE_QUEUE.value, task2_json).and_return( + create_async_mock_return_value(1) + ).once() + + result = await fetcher.push_issues_to_queue(issues) + + assert result == 2 # RETRY-1 and CLEAN-1 should be pushed + +@pytest.mark.asyncio +async def test_run_full_workflow(fetcher): + """Test the complete run workflow.""" + mock_issues = [{'key': 'TEST-1', 'fields': {'labels': []}}] + + # Mock all the methods + flexmock(fetcher).should_receive('search_issues').and_return(create_async_mock_return_value(mock_issues)).once() + flexmock(fetcher).should_receive('push_issues_to_queue').with_args(mock_issues).and_return(create_async_mock_return_value(1)).once() + + await fetcher.run() + +@pytest.mark.asyncio +async def test_run_full_workflow_with_labeled_issues(fetcher, mock_redis_context): + """Test the complete run workflow with issues that have different label states.""" + mock_redis, mock_context_manager = mock_redis_context + + # Create test issues with different label states + mock_issues = [ + {'key': 'ISSUE-1', 'fields': {'labels': []}}, # No labels - should be pushed + {'key': 'ISSUE-2', 'fields': {'labels': ['jotnar_rebase_in_progress']}}, # Has jotnar label - should be skipped + {'key': 'ISSUE-3', 'fields': {'labels': ['jotnar_backport_in_progress']}}, # Has jotnar label - should be skipped + {'key': 'ISSUE-4', 'fields': {'labels': ['jotnar_retry_needed']}}, # Has retry label - should be pushed + {'key': 'ISSUE-5', 'fields': {'labels': []}}, # No labels - should be pushed + {'key': 'ISSUE-6', 'fields': {'labels': ['jotnar_completed']}}, # Has jotnar label - should be skipped + ] + + # Create existing issues that are already in Redis queues using the correct data structures + # Input queues (REBASE_QUEUE, BACKPORT_QUEUE) contain Task objects with triage_agent.State metadata + # Data queues contain the appropriate schema objects directly + + # Create Task objects for input queues with proper triage_agent.State metadata + triage_state_for_rebase = { + "jira_issue": "ISSUE-2", + "triage_result": { + "resolution": "rebase", + "data": RebaseData(jira_issue="ISSUE-2", package="test-package", version="1.0.0").model_dump() + } + } + task_for_rebase = Task(metadata=triage_state_for_rebase).model_dump_json() + + triage_state_for_backport = { + "jira_issue": "ISSUE-3", + "triage_result": { + "resolution": "backport", + "data": BackportData(jira_issue="ISSUE-3", package="test-package", patch_url="https://example.com/patch", justification="Security fix", cve_id="CVE-2023-1234").model_dump() + } + } + task_for_backport = Task(metadata=triage_state_for_backport).model_dump_json() + + # Create schema objects for data queues + existing_issues = { + 'ISSUE-1': NoActionData(jira_issue="ISSUE-1", reasoning="Issue requires no action").model_dump_json(), + 'ISSUE-2': task_for_rebase, # Task object for input queue + 'ISSUE-3': task_for_backport, # Task object for input queue + 'ISSUE-4': NoActionData(jira_issue="ISSUE-4", reasoning="Issue requires no action").model_dump_json(), + 'ISSUE-5': Task(metadata={"jira_issue": "ISSUE-5", "triage_result": {"resolution": "clarification-needed", "data": ClarificationNeededData(jira_issue="ISSUE-5", findings="Investigation incomplete", additional_info_needed="More details needed").model_dump()}}).model_dump_json(), + 'ISSUE-6': ErrorData(jira_issue="ISSUE-6", details="Build failed").model_dump_json(), # Use ErrorData for error_list + } + + # Mock lrange calls for existing issues distributed across different queues + # Distribute issues across different queues to test the logic + mock_redis.should_receive('lrange').with_args(RedisQueues.NO_ACTION_LIST.value, 0, -1).and_return( + create_async_mock_return_value([existing_issues['ISSUE-1'], existing_issues['ISSUE-4']]) + ) + + mock_redis.should_receive('lrange').with_args(RedisQueues.REBASE_QUEUE.value, 0, -1).and_return( + create_async_mock_return_value([existing_issues['ISSUE-2']]) + ) + + mock_redis.should_receive('lrange').with_args(RedisQueues.BACKPORT_QUEUE.value, 0, -1).and_return( + create_async_mock_return_value([existing_issues['ISSUE-3']]) + ) + + mock_redis.should_receive('lrange').with_args(RedisQueues.CLARIFICATION_NEEDED_QUEUE.value, 0, -1).and_return( + create_async_mock_return_value([existing_issues['ISSUE-5']]) + ) + + mock_redis.should_receive('lrange').with_args(RedisQueues.ERROR_LIST.value, 0, -1).and_return( + create_async_mock_return_value([existing_issues['ISSUE-6']]) + ) + + # Mock other queues as empty to avoid flexmock errors + mock_redis.should_receive('lrange').with_args(RedisQueues.TRIAGE_QUEUE.value, 0, -1).and_return( + create_async_mock_return_value([]) + ) + mock_redis.should_receive('lrange').with_args(RedisQueues.COMPLETED_REBASE_LIST.value, 0, -1).and_return( + create_async_mock_return_value([]) + ) + mock_redis.should_receive('lrange').with_args(RedisQueues.COMPLETED_BACKPORT_LIST.value, 0, -1).and_return( + create_async_mock_return_value([]) + ) + + # Mock lpush calls for issues that should be pushed despite already existing + # ISSUE-1, ISSUE-4, and ISSUE-5 should be pushed (no labels or retry_needed) + # ISSUE-2, ISSUE-3, and ISSUE-6 should be skipped (have jotnar labels) + # The actual code pushes JSON strings, not just issue keys + task1 = Task.from_issue("ISSUE-1") + task4 = Task.from_issue("ISSUE-4") + task5 = Task.from_issue("ISSUE-5") + mock_redis.should_receive('lpush').with_args(RedisQueues.TRIAGE_QUEUE.value, task1.to_json()).and_return( + create_async_mock_return_value(1) + ).once() + mock_redis.should_receive('lpush').with_args(RedisQueues.TRIAGE_QUEUE.value, task4.to_json()).and_return( + create_async_mock_return_value(1) + ).once() + mock_redis.should_receive('lpush').with_args(RedisQueues.TRIAGE_QUEUE.value, task5.to_json()).and_return( + create_async_mock_return_value(1) + ).once() + + # Mock the methods that are called internally + flexmock(fetcher).should_receive('search_issues').and_return(create_async_mock_return_value(mock_issues)).once() + + # Run the workflow + await fetcher.run() diff --git a/beeai/pyproject.toml b/beeai/pyproject.toml index d9d454f3..0598c408 100644 --- a/beeai/pyproject.toml +++ b/beeai/pyproject.toml @@ -1,3 +1,10 @@ +[build-system] +requires = ["setuptools>=45", "wheel"] +build-backend = "setuptools.build_meta" + +[tool.setuptools.packages.find] +include = ["agents*", "common*", "mcp_server*", "jira_issue_fetcher*"] + [project] name = "ai-workflows-beeai" version = "0.1.0" @@ -9,13 +16,25 @@ dependencies = [ "aiofiles>=24.1.0", "arize-phoenix-otel>=0.13.0", "beeai-framework[mcp,duckduckgo]>=0.1.36", + "copr>=1.129", "fastmcp>=2.11.3", - "flexmock>=0.12.2", "ogr>=0.55.0", "openinference-instrumentation-beeai>=0.1.8", - "pytest>=8.4.1", - "pytest-asyncio>=1.1.0", "redis>=6.4.0", "rpm>=0.4.0", "specfile>=0.36.0", + "backoff>=2.2.1" +] + +[project.optional-dependencies] +test = [ + "pytest>=8.4.1", + "pytest-asyncio>=1.1.0", + "pytest-cov>=6.0.0", + "pytest-mock>=3.14.0", + "flexmock>=0.12.2", + "coverage>=7.6.0", + "pytest-xdist>=3.6.0", + "pytest-html>=4.1.1", + "httpx>=0.28.0", ]