From d8bb1fcc682c72bb7fe6f58feb239f07776cf5ed Mon Sep 17 00:00:00 2001 From: Lucas Armand Date: Tue, 11 Nov 2025 18:53:17 -0800 Subject: [PATCH 1/2] add fifo queue Bump pyworker version --- lib/backend.py | 153 +++++++++++++++++++++++++++++++++++++------------ 1 file changed, 116 insertions(+), 37 deletions(-) diff --git a/lib/backend.py b/lib/backend.py index 5cbb7ff..639aba1 100644 --- a/lib/backend.py +++ b/lib/backend.py @@ -9,6 +9,7 @@ from typing import Tuple, Awaitable, NoReturn, List, Union, Callable, Optional from functools import cached_property from distutils.util import strtobool +from collections import deque from anyio import open_file from aiohttp import web, ClientResponse, ClientSession, ClientConnectorError, ClientTimeout, TCPConnector @@ -30,7 +31,7 @@ BenchmarkResult ) -VERSION = "0.2.0" +VERSION = "0.2.1" MSG_HISTORY_LEN = 100 log = logging.getLogger(__file__) @@ -63,6 +64,7 @@ class Backend: version = VERSION msg_history = [] sem: Semaphore = dataclasses.field(default_factory=Semaphore) + queue: deque = dataclasses.field(default_factory=deque, repr=False) unsecured: bool = dataclasses.field( default_factory=lambda: bool(strtobool(os.environ.get("UNSECURED", "false"))), ) @@ -141,6 +143,19 @@ async def __handle_request( workload = payload.count_workload() request_metrics: RequestMetrics = RequestMetrics(request_idx=auth_data.request_idx, reqnum=auth_data.reqnum, workload=workload, status="Created") + + def advance_queue_after_completion(event: asyncio.Event): + """Pop current head and wake next waiter, if any.""" + if self.queue and self.queue[0] is event: + self.queue.popleft() + if self.queue: + self.queue[0].set() + else: + try: + self.queue.remove(event) + except ValueError: + pass + async def cancel_api_call_if_disconnected() -> web.Response: await request.wait_for_disconnection() log.debug(f"request with reqnum: {request_metrics.reqnum} was canceled") @@ -162,7 +177,7 @@ async def make_request() -> Union[web.Response, web.StreamResponse]: res = await handler.generate_client_response(request, response) self.metrics._request_success(request_metrics) return res - except requests.exceptions.RequestException as e: + except Exception as e: log.debug(f"[backend] Request error: {e}") self.metrics._request_errored(request_metrics) return web.Response(status=500) @@ -177,46 +192,110 @@ async def make_request() -> Union[web.Response, web.StreamResponse]: self.metrics._request_reject(request_metrics) return web.Response(status=429) - acquired = False - try: - self.metrics._request_start(request_metrics) - if self.allow_parallel_requests is False: - log.debug(f"Waiting to aquire Sem for reqnum:{request_metrics.reqnum}") - await self.sem.acquire() - acquired = True - log.debug( - f"Sem acquired for reqnum:{request_metrics.reqnum}, starting request..." - ) - else: + disconnect_task = create_task(cancel_api_call_if_disconnected()) + self.metrics._request_start(request_metrics) + + if self.allow_parallel_requests: + try: log.debug(f"Starting request for reqnum:{request_metrics.reqnum}") - done, pending = await wait( - [ - create_task(make_request()), - create_task(cancel_api_call_if_disconnected()), - ], - return_when=FIRST_COMPLETED, - ) - for t in pending: - t.cancel() - await asyncio.gather(*pending, return_exceptions=True) + work_task = create_task(make_request()) + done, pending = await wait([work_task, disconnect_task], return_when=FIRST_COMPLETED) + + for t in pending: + t.cancel() + await asyncio.gather(*pending, return_exceptions=True) + + if disconnect_task in done: + # Make sure work_task is settled/cancelled + try: + await work_task + except Exception: + pass + return web.Response(status=499) + + # otherwise work_task completed + return await work_task + + except asyncio.CancelledError: + return web.Response(status=499) + except Exception as e: + log.debug(f"Exception in main handler loop {e}") + return web.Response(status=500) + finally: + self.metrics._request_end(request_metrics) + + else: + # Insert a Event into the queue for this request + # Event.set() == our request is up next + event = asyncio.Event() + self.queue.append(event) + if self.queue and self.queue[0] is event: + event.set() - done_task = done.pop() try: - return done_task.result() + # Race between our request being next and request being cancelled + next_request_task = create_task(event.wait()) + first_done, first_pending = await wait( + [next_request_task, disconnect_task], return_when=FIRST_COMPLETED + ) + + # If the disconnect task wins the race + if disconnect_task in first_done and not event.is_set(): + was_head = (self.queue and self.queue[0] is event) + try: + self.queue.remove(event) + except ValueError: + pass + if was_head and self.queue: + self.queue[0].set() + + for t in first_pending: + t.cancel() + await asyncio.gather(*first_pending, return_exceptions=True) + return web.Response(status=499) + + # We are the next-up request in the queue + log.debug(f"Starting work on request {request_metrics.reqnum}...") + + # Race the backend API call with the disconnect task + work_task = create_task(make_request()) + done, pending = await wait([work_task, disconnect_task], return_when=FIRST_COMPLETED) + for t in pending: + t.cancel() + await asyncio.gather(*pending, return_exceptions=True) + + if disconnect_task in done: + # ensure work is cancelled and accounted for + try: + await work_task + except Exception: + pass + return web.Response(status=499) + + # otherwise work_task completed + return await work_task + + except asyncio.CancelledError: + # Cleanup if request was cancelled + was_head = (self.queue and self.queue[0] is event) + try: + self.queue.remove(event) + except ValueError: + pass + if was_head and self.queue: + self.queue[0].set() + + return web.Response(status=499) + except Exception as e: - log.debug(f"Request task raised exception: {e}") + log.debug(f"Exception in main handler loop {e}") return web.Response(status=500) - except asyncio.CancelledError: - # Client is gone. Do not write a response; just unwind. - return web.Response(status=499) - except Exception as e: - log.debug(f"Exception in main handler loop {e}") - return web.Response(status=500) - finally: - # Always release the semaphore if it was acquired - if acquired: - self.sem.release() - self.metrics._request_end(request_metrics) + + finally: + self.metrics._request_end(request_metrics) + if event.is_set(): + # The request is done, advance the queue + advance_queue_after_completion(event) @cached_property def healthcheck_session(self): From 249ca2eb9964ac1d45d0c4aaf2a6cae64320da21 Mon Sep 17 00:00:00 2001 From: Lucas Armand Date: Wed, 12 Nov 2025 15:23:42 -0800 Subject: [PATCH 2/2] refactor, handle zombie tasks --- lib/backend.py | 105 ++++++++++++++++++++----------------------------- 1 file changed, 43 insertions(+), 62 deletions(-) diff --git a/lib/backend.py b/lib/backend.py index 639aba1..39ef7c7 100644 --- a/lib/backend.py +++ b/lib/backend.py @@ -146,21 +146,23 @@ async def __handle_request( def advance_queue_after_completion(event: asyncio.Event): """Pop current head and wake next waiter, if any.""" + # If this event is current head, wake next waiter if self.queue and self.queue[0] is event: self.queue.popleft() if self.queue: self.queue[0].set() else: + # Else, remove it from the queue try: self.queue.remove(event) except ValueError: pass - async def cancel_api_call_if_disconnected() -> web.Response: + async def cancel_api_call_if_disconnected() -> None: await request.wait_for_disconnection() - log.debug(f"request with reqnum: {request_metrics.reqnum} was canceled") + log.debug(f"Request with reqnum: {request_metrics.reqnum} was canceled") self.metrics._request_canceled(request_metrics) - raise asyncio.CancelledError + return async def make_request() -> Union[web.Response, web.StreamResponse]: try: @@ -177,6 +179,8 @@ async def make_request() -> Union[web.Response, web.StreamResponse]: res = await handler.generate_client_response(request, response) self.metrics._request_success(request_metrics) return res + except asyncio.CancelledError: + raise except Exception as e: log.debug(f"[backend] Request error: {e}") self.metrics._request_errored(request_metrics) @@ -193,10 +197,14 @@ async def make_request() -> Union[web.Response, web.StreamResponse]: return web.Response(status=429) disconnect_task = create_task(cancel_api_call_if_disconnected()) + next_request_task = None + work_task = None + event = asyncio.Event() # Used in finally block, so initialize here + self.metrics._request_start(request_metrics) - if self.allow_parallel_requests: - try: + try: + if self.allow_parallel_requests: log.debug(f"Starting request for reqnum:{request_metrics.reqnum}") work_task = create_task(make_request()) done, pending = await wait([work_task, disconnect_task], return_when=FIRST_COMPLETED) @@ -206,33 +214,19 @@ async def make_request() -> Union[web.Response, web.StreamResponse]: await asyncio.gather(*pending, return_exceptions=True) if disconnect_task in done: - # Make sure work_task is settled/cancelled - try: - await work_task - except Exception: - pass return web.Response(status=499) # otherwise work_task completed return await work_task + + # FIFO-queue branch + else: + # Insert a Event into the queue for this request + # Event.set() == our request is up next + self.queue.append(event) + if self.queue and self.queue[0] is event: + event.set() - except asyncio.CancelledError: - return web.Response(status=499) - except Exception as e: - log.debug(f"Exception in main handler loop {e}") - return web.Response(status=500) - finally: - self.metrics._request_end(request_metrics) - - else: - # Insert a Event into the queue for this request - # Event.set() == our request is up next - event = asyncio.Event() - self.queue.append(event) - if self.queue and self.queue[0] is event: - event.set() - - try: # Race between our request being next and request being cancelled next_request_task = create_task(event.wait()) first_done, first_pending = await wait( @@ -240,15 +234,8 @@ async def make_request() -> Union[web.Response, web.StreamResponse]: ) # If the disconnect task wins the race - if disconnect_task in first_done and not event.is_set(): - was_head = (self.queue and self.queue[0] is event) - try: - self.queue.remove(event) - except ValueError: - pass - if was_head and self.queue: - self.queue[0].set() - + if disconnect_task in first_done: + # Clean up the next_request_task, then exit for t in first_pending: t.cancel() await asyncio.gather(*first_pending, return_exceptions=True) @@ -259,43 +246,37 @@ async def make_request() -> Union[web.Response, web.StreamResponse]: # Race the backend API call with the disconnect task work_task = create_task(make_request()) + done, pending = await wait([work_task, disconnect_task], return_when=FIRST_COMPLETED) for t in pending: t.cancel() await asyncio.gather(*pending, return_exceptions=True) if disconnect_task in done: - # ensure work is cancelled and accounted for - try: - await work_task - except Exception: - pass return web.Response(status=499) # otherwise work_task completed return await work_task + + except asyncio.CancelledError: + return web.Response(status=499) + + except Exception as e: + log.debug(f"Exception in main handler loop {e}") + return web.Response(status=500) + + finally: + if not self.allow_parallel_requests: + advance_queue_after_completion(event) + + self.metrics._request_end(request_metrics) + cleanup_tasks = [t for t in (next_request_task, work_task, disconnect_task) if t] + for t in cleanup_tasks: + if not t.done(): + t.cancel() + if cleanup_tasks: + await asyncio.gather(*cleanup_tasks, return_exceptions=True) - except asyncio.CancelledError: - # Cleanup if request was cancelled - was_head = (self.queue and self.queue[0] is event) - try: - self.queue.remove(event) - except ValueError: - pass - if was_head and self.queue: - self.queue[0].set() - - return web.Response(status=499) - - except Exception as e: - log.debug(f"Exception in main handler loop {e}") - return web.Response(status=500) - - finally: - self.metrics._request_end(request_metrics) - if event.is_set(): - # The request is done, advance the queue - advance_queue_after_completion(event) @cached_property def healthcheck_session(self):