Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 18 additions & 4 deletions litellm/proxy/db/db_spend_update_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ def __init__(
):
self.redis_cache = redis_cache
self.redis_update_buffer = RedisUpdateBuffer(redis_cache=self.redis_cache)
self.pod_lock_manager = PodLockManager()
self.pod_lock_manager = PodLockManager(redis_cache=self.redis_cache)
self.spend_update_queue = SpendUpdateQueue()
self.daily_spend_update_queue = DailySpendUpdateQueue()
self.daily_team_spend_update_queue = DailySpendUpdateQueue()
Expand Down Expand Up @@ -839,6 +839,7 @@ async def _update_daily_spend(
Generic function to update daily spend for any entity type (user, team, tag)
"""
from litellm.proxy.utils import _raise_failed_update_spend_exception
from litellm.proxy.db.exception_handler import is_deadlock_error

verbose_proxy_logger.debug(
f"Daily {entity_type.capitalize()} Spend transactions: {len(daily_spend_transactions)}"
Expand All @@ -849,9 +850,9 @@ async def _update_daily_spend(
try:
for i in range(n_retry_times + 1):
try:
transactions_to_process = dict(
list(daily_spend_transactions.items())[:BATCH_SIZE]
)
# Deterministic order to reduce lock-order inversions across workers
ordered_items = sorted(daily_spend_transactions.items(), key=lambda kv: kv[0])
transactions_to_process = dict(ordered_items[:BATCH_SIZE])

if len(transactions_to_process) == 0:
verbose_proxy_logger.debug(
Expand Down Expand Up @@ -977,6 +978,19 @@ async def _update_daily_spend(
proxy_logging_obj=proxy_logging_obj,
)
await asyncio.sleep(2**i)
except Exception as e:
# Retry on Postgres deadlock (40P01)
if is_deadlock_error(e):
if i >= n_retry_times:
_raise_failed_update_spend_exception(
e=e,
start_time=start_time,
proxy_logging_obj=proxy_logging_obj,
)
# small jitter to desynchronize conflicting workers
await asyncio.sleep((2**i) + (0.001 * (i + 1)))
continue
raise

except Exception as e:
if "transactions_to_process" in locals():
Expand Down
25 changes: 25 additions & 0 deletions litellm/proxy/db/exception_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,3 +59,28 @@ def handle_db_exception(e: Exception):
):
return None
raise e


def is_deadlock_error(e: Exception) -> bool:
"""
Best-effort detector for Postgres deadlocks (SQLSTATE 40P01) coming through Prisma.

We can't rely on a strongly-typed error from the Prisma Python client for this,
so we match on the SQLSTATE code or message text.
"""
message = str(e)
if "40P01" in message:
return True
if "deadlock detected" in message.lower():
return True
try:
import prisma # type: ignore

if isinstance(e, prisma.errors.PrismaError) and (
"40P01" in message or "deadlock" in message.lower()
):
return True
except Exception:
# If prisma isn't available or anything goes wrong, fall back to string checks
pass
return False