From 5608750e814f7b308f2b68f96e3bd8f56a2f6759 Mon Sep 17 00:00:00 2001 From: Dodam Ih Date: Mon, 5 Jan 2026 21:19:16 -0800 Subject: [PATCH] feat: separate process handles SQS upkeep --- zetta_utils/mazepa/upkeep_handlers.py | 327 ++++++++++++++++++ zetta_utils/mazepa/worker.py | 211 ++++++++--- .../configurations/worker_pool.py | 51 +-- zetta_utils/message_queues/sqs/queue.py | 2 +- zetta_utils/message_queues/sqs/utils.py | 56 ++- 5 files changed, 546 insertions(+), 101 deletions(-) create mode 100644 zetta_utils/mazepa/upkeep_handlers.py diff --git a/zetta_utils/mazepa/upkeep_handlers.py b/zetta_utils/mazepa/upkeep_handlers.py new file mode 100644 index 000000000..5a85086be --- /dev/null +++ b/zetta_utils/mazepa/upkeep_handlers.py @@ -0,0 +1,327 @@ +from __future__ import annotations + +import logging +import multiprocessing +import threading +import time +from dataclasses import dataclass +from typing import Callable + +import tenacity + +from zetta_utils import log +from zetta_utils.common.partial import ComparablePartial + +logger = log.get_logger("mazepa") + + +def perform_direct_upkeep( + extend_lease_fn: Callable, + extend_duration: int, + task_start_time: float, +) -> None: + """ + Perform upkeep by directly calling extend_lease_fn. + + Used as a fallback for non-SQS queues where the process-based handler + cannot be used. + """ + current_time = time.time() + elapsed_since_start = current_time - task_start_time + logger.debug( + f"UPKEEP: [T+{elapsed_since_start:.1f}s] Timer fired, calling extend_lease_fn directly" + ) + try: + start_time = time.time() + extend_lease_fn(extend_duration) + api_duration = time.time() - start_time + logger.debug( + f"UPKEEP: [T+{elapsed_since_start:.1f}s] Successfully extended lease by " + f"{extend_duration}s (API call took {api_duration:.1f}s)" + ) + except tenacity.RetryError as e: # pragma: no cover + logger.error(f"UPKEEP: Failed to extend lease after retries: {e}") + except Exception as e: # pragma: no cover # pylint: disable=broad-except + logger.error(f"UPKEEP: Unexpected error: {type(e).__name__}: {e}") + + +def extract_sqs_metadata(extend_lease_fn: Callable) -> dict | None: + """ + Extract SQS metadata from extend_lease_fn if it's a ComparablePartial wrapping + an SQS queue's _extend_msg_lease method. + + Returns a dict with queue_name, region_name, endpoint_url, receipt_handle if found, + or None if this is not an SQS-based extend function. + """ + if not isinstance(extend_lease_fn, ComparablePartial): + return None + + msg = extend_lease_fn.kwargs.get("msg") + if msg is None: + return None + + # Check if msg has the SQS-specific attributes + if not all(hasattr(msg, attr) for attr in ("receipt_handle", "queue_name", "region_name")): + return None + + return { + "receipt_handle": msg.receipt_handle, + "queue_name": msg.queue_name, + "region_name": msg.region_name, + "endpoint_url": getattr(msg, "endpoint_url", None), + } + + +@dataclass +class UpkeepCommand: + """Command sent to the SQS upkeep handler process.""" + + action: str # "start_upkeep", "stop_upkeep", or "shutdown" + # Required for start_upkeep + task_id: str | None = None + receipt_handle: str | None = None + visibility_timeout: int | None = None + interval_sec: float | None = None + queue_name: str | None = None + region_name: str | None = None + endpoint_url: str | None = None + + +def run_sqs_upkeep_handler( + command_queue: multiprocessing.Queue, + log_level: str = "INFO", +) -> None: + """ + Main loop for the SQS upkeep handler process. + + Runs in a separate process to handle SQS visibility extensions. This isolates + SQS operations from the main worker process's GIL, ensuring that heavy CPU work + in the main process doesn't delay upkeep operations. + + The handler manages its own timer, so timing is not affected by the main + process's CPU usage. + """ + # pylint: disable=import-outside-toplevel + from zetta_utils.mazepa.worker import worker_init + from zetta_utils.message_queues.sqs import utils + + # Initialize the process (logging, signal handlers, etc.) + # Don't set start method or load train/inference for the upkeep handler + worker_init(log_level=log_level) + + logger.info( + "SQS_HANDLER: Upkeep handler process started (PID: %d)", + multiprocessing.current_process().pid, + ) + + # Track active upkeep tasks: task_id -> (stop_event, thread) + active_upkeeps: dict[str, tuple[threading.Event, threading.Thread]] = {} + + def _run_upkeep_loop( + task_id: str, + stop_event: threading.Event, + receipt_handle: str, + visibility_timeout: int, + interval_sec: float, + queue_name: str, + region_name: str, + endpoint_url: str | None, + ): + """Timer loop that extends visibility at regular intervals.""" + task_start_time = time.time() + logger.info( + f"SQS_HANDLER: [{task_id}] Starting upkeep loop: interval={interval_sec}s, " + f"extend_by={visibility_timeout}s" + ) + + while not stop_event.wait(timeout=interval_sec): + elapsed = time.time() - task_start_time + logger.info( + f"SQS_HANDLER: [{task_id}] [T+{elapsed:.1f}s] Extending visibility to " + f"{visibility_timeout}s for queue '{queue_name}'" + ) + try: + api_start = time.time() + utils.change_message_visibility( + receipt_handle=receipt_handle, + visibility_timeout=visibility_timeout, + queue_name=queue_name, + region_name=region_name, + endpoint_url=endpoint_url, + ) + api_duration = time.time() - api_start + logger.info( + f"SQS_HANDLER: [{task_id}] [T+{elapsed:.1f}s] Successfully extended " + f"(API took {api_duration:.1f}s)" + ) + except Exception as e: # pylint: disable=broad-except + logger.error( + f"SQS_HANDLER: [{task_id}] Failed to extend visibility: " + f"{type(e).__name__}: {e}" + ) + + elapsed = time.time() - task_start_time + logger.info(f"SQS_HANDLER: [{task_id}] Upkeep loop stopped after {elapsed:.1f}s") + + while True: + try: + cmd = command_queue.get() + + if cmd.action == "shutdown": + logger.info("SQS_HANDLER: Received shutdown command") + # Stop all active upkeeps + for task_id, (stop_event, thread) in active_upkeeps.items(): + logger.info(f"SQS_HANDLER: Stopping upkeep for task {task_id}") + stop_event.set() + thread.join(timeout=2.0) + break + + if cmd.action == "start_upkeep": + if cmd.task_id in active_upkeeps: + logger.warning( + f"SQS_HANDLER: Upkeep already active for task {cmd.task_id}, ignoring" + ) + continue + + stop_event = threading.Event() + thread = threading.Thread( + target=_run_upkeep_loop, + args=( + cmd.task_id, + stop_event, + cmd.receipt_handle, + cmd.visibility_timeout, + cmd.interval_sec, + cmd.queue_name, + cmd.region_name, + cmd.endpoint_url, + ), + daemon=True, + name=f"upkeep-{cmd.task_id[:8]}", + ) + thread.start() + active_upkeeps[cmd.task_id] = (stop_event, thread) + logger.info(f"SQS_HANDLER: Started upkeep for task {cmd.task_id}") + + elif cmd.action == "stop_upkeep": + if cmd.task_id not in active_upkeeps: + logger.warning( + f"SQS_HANDLER: No active upkeep for task {cmd.task_id}, ignoring" + ) + continue + + stop_event, thread = active_upkeeps.pop(cmd.task_id) + stop_event.set() + thread.join(timeout=2.0) + logger.info(f"SQS_HANDLER: Stopped upkeep for task {cmd.task_id}") + + else: + logger.warning(f"SQS_HANDLER: Unknown action: {cmd.action}") + + except Exception as e: # pylint: disable=broad-except + logger.error(f"SQS_HANDLER: Error processing command: {type(e).__name__}: {e}") + + logger.info("SQS_HANDLER: Handler process exiting") + + +class SQSUpkeepHandlerManager: + """ + Manages the lifecycle of an SQS upkeep handler process. + + The handler process manages its own timers for visibility extensions, + completely isolated from the main process's GIL. + + Usage: + manager = SQSUpkeepHandlerManager() + manager.start() + try: + manager.start_upkeep(task_id, ...) # Handler starts its own timer + # ... task runs ... + manager.stop_upkeep(task_id) # Handler stops the timer + finally: + manager.shutdown() + """ + + def __init__(self): + self._command_queue: multiprocessing.Queue | None = None + self._handler_process: multiprocessing.Process | None = None + + def start(self) -> None: + """Start the handler process.""" + if self._command_queue is not None: + return # Already running + + # Get current log level to pass to handler process + current_log_level = logging.getLevelName(logging.getLogger("mazepa").getEffectiveLevel()) + + self._command_queue = multiprocessing.Queue() + self._handler_process = multiprocessing.Process( + target=run_sqs_upkeep_handler, + args=(self._command_queue, current_log_level), + daemon=True, + name="sqs-upkeep-handler", + ) + self._handler_process.start() + logger.info(f"Started SQS upkeep handler process (PID: {self._handler_process.pid})") + + def shutdown(self, timeout: float = 10.0) -> None: + """Shutdown the handler process gracefully.""" + if self._command_queue is None: + return # Not running + + logger.info("Shutting down SQS upkeep handler process...") + self._command_queue.put(UpkeepCommand(action="shutdown")) + + if self._handler_process is not None: + self._handler_process.join(timeout=timeout) + if self._handler_process.is_alive(): + logger.warning("Handler process did not stop gracefully, terminating...") + self._handler_process.terminate() + self._handler_process.join(timeout=1.0) + + self._command_queue = None + self._handler_process = None + logger.info("SQS upkeep handler process stopped.") + + def start_upkeep( + self, + task_id: str, + receipt_handle: str, + visibility_timeout: int, + interval_sec: float, + queue_name: str, + region_name: str, + endpoint_url: str | None = None, + ) -> None: + """ + Start upkeep for a task. The handler process will manage its own timer + and extend visibility at regular intervals. + """ + if self._command_queue is None: + logger.warning("SQS_HANDLER: Handler not running, start_upkeep ignored") + return + + cmd = UpkeepCommand( + action="start_upkeep", + task_id=task_id, + receipt_handle=receipt_handle, + visibility_timeout=visibility_timeout, + interval_sec=interval_sec, + queue_name=queue_name, + region_name=region_name, + endpoint_url=endpoint_url, + ) + self._command_queue.put_nowait(cmd) + + def stop_upkeep(self, task_id: str) -> None: + """Stop upkeep for a task.""" + if self._command_queue is None: + logger.warning("SQS_HANDLER: Handler not running, stop_upkeep ignored") + return + + self._command_queue.put_nowait(UpkeepCommand(action="stop_upkeep", task_id=task_id)) + + @property + def is_running(self) -> bool: + """Check if the handler process is running.""" + return self._handler_process is not None and self._handler_process.is_alive() diff --git a/zetta_utils/mazepa/worker.py b/zetta_utils/mazepa/worker.py index 81a0b6361..67611a7fd 100644 --- a/zetta_utils/mazepa/worker.py +++ b/zetta_utils/mazepa/worker.py @@ -1,15 +1,15 @@ from __future__ import annotations import math +import multiprocessing +import os import sys import time import traceback from typing import Any, Callable, Optional -import tenacity - -from zetta_utils import builder, log -from zetta_utils.common import RepeatTimer, monitor_resources +from zetta_utils import builder, log, try_load_train_inference +from zetta_utils.common import RepeatTimer, monitor_resources, reset_signal_handlers from zetta_utils.mazepa import constants, exceptions from zetta_utils.mazepa.exceptions import MazepaCancel, MazepaTimeoutError from zetta_utils.mazepa.pool_activity import PoolActivityTracker @@ -18,11 +18,67 @@ MAX_TRANSIENT_RETRIES, TRANSIENT_ERROR_CONDITIONS, ) +from zetta_utils.mazepa.upkeep_handlers import ( + SQSUpkeepHandlerManager, + extract_sqs_metadata, + perform_direct_upkeep, +) from zetta_utils.message_queues.base import MessageQueue, ReceivedMessage from . import Task +class DummyBuffer: + def read(self, data): + pass + + def write(self, data): + pass + + def flush(self): + pass + + +def redirect_buffers() -> None: + sys.stdin = DummyBuffer() # type: ignore + sys.stdout = DummyBuffer() # type: ignore + sys.stderr = DummyBuffer() # type: ignore + + +def worker_init( + log_level: str, + suppress_logs: bool = False, + set_start_method: bool = False, + multiprocessing_start_method: str = "spawn", + load_train_inference: bool = False, +) -> None: + """ + Initialize a worker process with proper logging and signal handling. + + Args: + log_level: Log level string (e.g., "INFO", "DEBUG") + suppress_logs: If True, redirect stdout/stderr to dummy buffers + set_start_method: If True, set multiprocessing start method (for worker pools) + multiprocessing_start_method: The start method to use if set_start_method is True + load_train_inference: If True, try to load train/inference modules (for worker pools) + """ + # Reset signal handlers inherited from parent to default behavior + reset_signal_handlers() + # For Kubernetes compatibility, ensure unbuffered output + os.environ["PYTHONUNBUFFERED"] = "1" + + if suppress_logs: + redirect_buffers() + else: + log.configure_logger(level=log_level, force=True) + + if set_start_method: + multiprocessing.set_start_method(multiprocessing_start_method, force=True) + + if load_train_inference: + try_load_train_inference() + + class AcceptAllTasks: def __call__(self, task: Task): return True @@ -90,6 +146,7 @@ def _process_task_batch( outcome_queue: MessageQueue[OutcomeReport], activity_tracker: PoolActivityTracker | None, debug: bool, + upkeep_handler: SQSUpkeepHandlerManager | None = None, ) -> None: logger.info("STARTING: task batch execution.") @@ -104,15 +161,15 @@ def _process_task_batch( with log.logging_tag_ctx("task_id", task.id_): with log.logging_tag_ctx("execution_id", task.execution_id): if task_filter_fn(task): - ack_task, outcome = process_task_message(msg=msg, debug=debug) + ack_task, outcome = process_task_message( + msg=msg, debug=debug, upkeep_handler=upkeep_handler + ) else: ack_task = True outcome = TaskOutcome(exception=MazepaCancel()) if ack_task: - outcome_report = OutcomeReport( - task_id=msg.payload.id_, outcome=outcome - ) + outcome_report = OutcomeReport(task_id=msg.payload.id_, outcome=outcome) outcome_queue.push([outcome_report]) msg.acknowledge_fn() @@ -154,38 +211,57 @@ def run_worker( idle_timeout: float | None = None, pool_name: str | None = None, ) -> str: - with monitor_resources(resource_monitor_interval): - start_time = time.time() - activity_tracker = PoolActivityTracker(pool_name) if pool_name else None + # Start SQS upkeep handler process for handling visibility extensions + upkeep_handler = SQSUpkeepHandlerManager() + upkeep_handler.start() - while True: - task_msgs = _pull_tasks_with_error_handling( - task_queue, outcome_queue, max_pull_num - ) + try: + with monitor_resources(resource_monitor_interval): + start_time = time.time() + activity_tracker = PoolActivityTracker(pool_name) if pool_name else None - if len(task_msgs) == 0: - _handle_idle_state(sleep_sec, idle_timeout, activity_tracker) - else: - _process_task_batch( - task_msgs, task_filter_fn, outcome_queue, activity_tracker, debug + while True: + task_msgs = _pull_tasks_with_error_handling( + task_queue, outcome_queue, max_pull_num ) - should_exit, reason = _check_exit_conditions( - start_time, max_runtime, idle_timeout, activity_tracker - ) - if should_exit: - assert reason is not None - logger.info(f"Worker exiting: {reason}") - return reason + if len(task_msgs) == 0: + _handle_idle_state(sleep_sec, idle_timeout, activity_tracker) + else: + _process_task_batch( + task_msgs, + task_filter_fn, + outcome_queue, + activity_tracker, + debug, + upkeep_handler, + ) + + should_exit, reason = _check_exit_conditions( + start_time, max_runtime, idle_timeout, activity_tracker + ) + if should_exit: + assert reason is not None + logger.info(f"Worker exiting: {reason}") + return reason + finally: + upkeep_handler.shutdown() def process_task_message( - msg: ReceivedMessage[Task], debug: bool, handle_exceptions: bool = True + msg: ReceivedMessage[Task], + debug: bool, + handle_exceptions: bool = True, + upkeep_handler: SQSUpkeepHandlerManager | None = None, ) -> tuple[bool, TaskOutcome]: task = msg.payload if task.upkeep_settings.perform_upkeep: outcome = _run_task_with_upkeep( - task, msg.extend_lease_fn, debug=debug, handle_exceptions=handle_exceptions + task, + msg.extend_lease_fn, + debug=debug, + handle_exceptions=handle_exceptions, + upkeep_handler=upkeep_handler, ) else: outcome = task(debug=debug, handle_exceptions=handle_exceptions) @@ -209,23 +285,70 @@ def process_task_message( def _run_task_with_upkeep( - task: Task, extend_lease_fn: Callable, debug: bool, handle_exceptions: bool + task: Task, + extend_lease_fn: Callable, + debug: bool, + handle_exceptions: bool, + upkeep_handler: SQSUpkeepHandlerManager | None = None, ) -> TaskOutcome: - def _perform_upkeep_callbacks(): - assert task.upkeep_settings.interval_sec is not None + task_start_time = time.time() + assert task.upkeep_settings.interval_sec is not None + extend_duration = math.ceil(task.upkeep_settings.interval_sec * 10) + + # Try to extract SQS metadata and use process-based handler + sqs_metadata = extract_sqs_metadata(extend_lease_fn) + use_process_handler = sqs_metadata is not None and upkeep_handler is not None + + if use_process_handler: + # Handler process manages its own timer - completely isolated from main process GIL + logger.debug( + f"UPKEEP: Starting upkeep via handler process: " + f"interval={task.upkeep_settings.interval_sec}s, extend_by={extend_duration}s" + ) + assert sqs_metadata is not None + assert upkeep_handler is not None + upkeep_handler.start_upkeep( + task_id=task.id_, + receipt_handle=sqs_metadata["receipt_handle"], + visibility_timeout=extend_duration, + interval_sec=task.upkeep_settings.interval_sec, + queue_name=sqs_metadata["queue_name"], + region_name=sqs_metadata["region_name"], + endpoint_url=sqs_metadata["endpoint_url"], + ) try: - extend_lease_fn(math.ceil(task.upkeep_settings.interval_sec * 5)) - except tenacity.RetryError as e: # pragma: no cover - logger.info(f"Couldn't perform upkeep: {e}") + logger.info("Task execution starting") + result = task(debug=debug, handle_exceptions=handle_exceptions) + elapsed = time.time() - task_start_time + logger.info(f"Task execution completed successfully after {elapsed:.2f}s") + except Exception as e: # pragma: no cover # pylint: disable=broad-except + elapsed = time.time() - task_start_time + logger.error( + f"Task execution failed with {type(e).__name__}: {e} after {elapsed:.2f}s" + ) + raise e + finally: + logger.debug("UPKEEP: Stopping upkeep via handler process") + upkeep_handler.stop_upkeep(task.id_) + else: + # Fallback: use RepeatTimer in main process for non-SQS queues + def upkeep_callback(): + perform_direct_upkeep(extend_lease_fn, extend_duration, task_start_time) - assert task.upkeep_settings.interval_sec is not None - upkeep = RepeatTimer(task.upkeep_settings.interval_sec, _perform_upkeep_callbacks) - upkeep.start() - try: - result = task(debug=debug, handle_exceptions=handle_exceptions) - except Exception as e: # pragma: no cover # pylint: disable=broad-except - raise e - finally: - upkeep.cancel() + upkeep = RepeatTimer(task.upkeep_settings.interval_sec, upkeep_callback) + upkeep.start() + try: + logger.info("Task execution starting") + result = task(debug=debug, handle_exceptions=handle_exceptions) + elapsed = time.time() - task_start_time + logger.info(f"Task execution completed successfully after {elapsed:.2f}s") + except Exception as e: # pragma: no cover # pylint: disable=broad-except + elapsed = time.time() - task_start_time + logger.error( + f"Task execution failed with {type(e).__name__}: {e} after {elapsed:.2f}s" + ) + raise e + finally: + upkeep.cancel() return result diff --git a/zetta_utils/mazepa_addons/configurations/worker_pool.py b/zetta_utils/mazepa_addons/configurations/worker_pool.py index 78eccf7d6..9a4394b34 100644 --- a/zetta_utils/mazepa_addons/configurations/worker_pool.py +++ b/zetta_utils/mazepa_addons/configurations/worker_pool.py @@ -3,59 +3,22 @@ import contextlib import logging import multiprocessing -import os -import sys import time from contextlib import ExitStack import pebble -from zetta_utils import builder, log, try_load_train_inference -from zetta_utils.common import monitor_resources, reset_signal_handlers +from zetta_utils import builder, log +from zetta_utils.common import monitor_resources from zetta_utils.mazepa import SemaphoreType, Task, configure_semaphores, run_worker from zetta_utils.mazepa.pool_activity import PoolActivityTracker from zetta_utils.mazepa.task_outcome import OutcomeReport +from zetta_utils.mazepa.worker import worker_init from zetta_utils.message_queues import FileQueue, SQSQueue logger = log.get_logger("mazepa") -class DummyBuffer: - def read(self, data): - pass - - def write(self, data): - pass - - def flush(self): - pass - - -def redirect_buffers() -> None: # Do not need to implement 14 passes for typing.FileIO - sys.stdin = DummyBuffer() # type: ignore - sys.stdout = DummyBuffer() # type: ignore - sys.stderr = DummyBuffer() # type: ignore - - -def worker_init( - suppress_worker_logs: bool, multiprocessing_start_method: str, log_level: str -) -> None: - # Reset signal handlers inherited from parent to default behavior - # This prevents parent's signal handlers from interfering with worker cleanup - reset_signal_handlers() - # For Kubernetes compatibility, ensure unbuffered output - os.environ["PYTHONUNBUFFERED"] = "1" - if suppress_worker_logs: - redirect_buffers() - else: - # Reconfigure logging in worker process with parent's log level - log.configure_logger(level=log_level, force=True) - - # Inherit the start method from the calling process - multiprocessing.set_start_method(multiprocessing_start_method, force=True) - try_load_train_inference() - - def run_local_worker( task_queue_name: str, outcome_queue_name: str, @@ -115,9 +78,11 @@ def setup_local_worker_pool( ), # 'fork' has issues with CV sharded reads initializer=worker_init, initargs=[ - suppress_worker_logs, - multiprocessing.get_start_method(), - current_log_level, + current_log_level, # log_level + suppress_worker_logs, # suppress_logs + True, # set_start_method + multiprocessing.get_start_method(), # multiprocessing_start_method + True, # load_train_inference ], ) try: diff --git a/zetta_utils/message_queues/sqs/queue.py b/zetta_utils/message_queues/sqs/queue.py index d4aebb84e..f033a6244 100644 --- a/zetta_utils/message_queues/sqs/queue.py +++ b/zetta_utils/message_queues/sqs/queue.py @@ -42,7 +42,7 @@ class SQSQueue(MessageQueue[T]): insertion_threads: int = 5 _queue: Any = attrs.field(init=False, default=None) pull_wait_sec: int = 0 - pull_lease_sec: int = 10 # TODO: get a better value + pull_lease_sec: int = 30 def _get_tq_queue(self) -> Any: if self._queue is None: diff --git a/zetta_utils/message_queues/sqs/utils.py b/zetta_utils/message_queues/sqs/utils.py index d13b3f5ce..5806865a1 100644 --- a/zetta_utils/message_queues/sqs/utils.py +++ b/zetta_utils/message_queues/sqs/utils.py @@ -48,6 +48,10 @@ def receive_msgs( msg_batch_size: int = 10, visibility_timeout: int = 60, ) -> list[SQSReceivedMsg]: + logger.debug( + f"RECEIVE: Attempting to receive messages from queue '{queue_name}' " + f"with visibility_timeout={visibility_timeout}s" + ) result = [] # type: list[SQSReceivedMsg] start_ts = time.time() while True: @@ -73,6 +77,12 @@ def receive_msgs( ) for message in resp["Messages"] ] + for msg in message_batch: + logger.debug( + f"RECEIVE: Message from queue '{queue_name}' with handle " + f"{msg.receipt_handle[:30]}... has initial visibility timeout of " + f"{visibility_timeout}s (expires at relative T+{visibility_timeout}s)" + ) result += message_batch if len(result) >= max_msg_num: @@ -91,14 +101,22 @@ def delete_msg_by_receipt_handle( region_name: str, endpoint_url: Optional[str] = None, ): + queue_url = get_queue_url(queue_name, region_name, endpoint_url=endpoint_url) logger.debug( - f"Deleting message with handle '{receipt_handle}' from queue '{queue_name}'" - f"in region '{region_name}'" - ) - get_sqs_client(region_name, endpoint_url=endpoint_url).delete_message( - QueueUrl=get_queue_url(queue_name, region_name, endpoint_url=endpoint_url), - ReceiptHandle=receipt_handle, + f"DELETE: Deleting message from queue '{queue_name}' (URL: {queue_url}), " + f"handle: {receipt_handle[:30]}..." ) + try: + get_sqs_client(region_name, endpoint_url=endpoint_url).delete_message( + QueueUrl=queue_url, + ReceiptHandle=receipt_handle, + ) + logger.debug(f"DELETE: Successfully deleted message from queue '{queue_name}'") + except Exception as e: + logger.error( + f"DELETE: Failed to delete message from queue '{queue_name}': {type(e).__name__}: {e}" + ) + raise @retry(stop=stop_after_attempt(10), wait=wait_random(min=0.1, max=5)) @@ -109,15 +127,27 @@ def change_message_visibility( region_name: str, endpoint_url: Optional[str] = None, ): + queue_url = get_queue_url(queue_name, region_name, endpoint_url=endpoint_url) logger.debug( - f"Changing visibility of the message with handle '{receipt_handle}' " - f"from queue '{queue_name}' in region '{region_name}' to {visibility_timeout}." - ) - get_sqs_client(region_name, endpoint_url=endpoint_url).change_message_visibility( - QueueUrl=get_queue_url(queue_name, region_name, endpoint_url=endpoint_url), - ReceiptHandle=receipt_handle, - VisibilityTimeout=visibility_timeout, + f"VISIBILITY: Changing visibility to {visibility_timeout}s for queue '{queue_name}' " + f"(URL: {queue_url}), handle: {receipt_handle[:30]}..." ) + try: + get_sqs_client(region_name, endpoint_url=endpoint_url).change_message_visibility( + QueueUrl=queue_url, + ReceiptHandle=receipt_handle, + VisibilityTimeout=visibility_timeout, + ) + logger.debug( + f"VISIBILITY: Successfully changed visibility to {visibility_timeout}s " + f"for queue '{queue_name}'" + ) + except Exception as e: + logger.error( + f"VISIBILITY: Failed to change visibility for queue '{queue_name}' " + f"to {visibility_timeout}s: {type(e).__name__}: {e}" + ) + raise # To be revived if we need batch deletes: