diff --git a/litellm/proxy/db/db_spend_update_writer.py b/litellm/proxy/db/db_spend_update_writer.py index 3277f19c7de0..9c83dc433be3 100644 --- a/litellm/proxy/db/db_spend_update_writer.py +++ b/litellm/proxy/db/db_spend_update_writer.py @@ -58,7 +58,7 @@ def __init__( ): self.redis_cache = redis_cache self.redis_update_buffer = RedisUpdateBuffer(redis_cache=self.redis_cache) - self.pod_lock_manager = PodLockManager() + self.pod_lock_manager = PodLockManager(redis_cache=self.redis_cache) self.spend_update_queue = SpendUpdateQueue() self.daily_spend_update_queue = DailySpendUpdateQueue() self.daily_team_spend_update_queue = DailySpendUpdateQueue() @@ -839,6 +839,7 @@ async def _update_daily_spend( Generic function to update daily spend for any entity type (user, team, tag) """ from litellm.proxy.utils import _raise_failed_update_spend_exception + from litellm.proxy.db.exception_handler import is_deadlock_error verbose_proxy_logger.debug( f"Daily {entity_type.capitalize()} Spend transactions: {len(daily_spend_transactions)}" @@ -849,9 +850,9 @@ async def _update_daily_spend( try: for i in range(n_retry_times + 1): try: - transactions_to_process = dict( - list(daily_spend_transactions.items())[:BATCH_SIZE] - ) + # Deterministic order to reduce lock-order inversions across workers + ordered_items = sorted(daily_spend_transactions.items(), key=lambda kv: kv[0]) + transactions_to_process = dict(ordered_items[:BATCH_SIZE]) if len(transactions_to_process) == 0: verbose_proxy_logger.debug( @@ -977,6 +978,19 @@ async def _update_daily_spend( proxy_logging_obj=proxy_logging_obj, ) await asyncio.sleep(2**i) + except Exception as e: + # Retry on Postgres deadlock (40P01) + if is_deadlock_error(e): + if i >= n_retry_times: + _raise_failed_update_spend_exception( + e=e, + start_time=start_time, + proxy_logging_obj=proxy_logging_obj, + ) + # small jitter to desynchronize conflicting workers + await asyncio.sleep((2**i) + (0.001 * (i + 1))) + continue + raise except Exception as e: if "transactions_to_process" in locals(): diff --git a/litellm/proxy/db/exception_handler.py b/litellm/proxy/db/exception_handler.py index db73f9e9c939..50d69f317146 100644 --- a/litellm/proxy/db/exception_handler.py +++ b/litellm/proxy/db/exception_handler.py @@ -59,3 +59,28 @@ def handle_db_exception(e: Exception): ): return None raise e + + +def is_deadlock_error(e: Exception) -> bool: + """ + Best-effort detector for Postgres deadlocks (SQLSTATE 40P01) coming through Prisma. + + We can't rely on a strongly-typed error from the Prisma Python client for this, + so we match on the SQLSTATE code or message text. + """ + message = str(e) + if "40P01" in message: + return True + if "deadlock detected" in message.lower(): + return True + try: + import prisma # type: ignore + + if isinstance(e, prisma.errors.PrismaError) and ( + "40P01" in message or "deadlock" in message.lower() + ): + return True + except Exception: + # If prisma isn't available or anything goes wrong, fall back to string checks + pass + return False