Skip to content

Commit c1df458

Browse files
Avoid memory leak with weakref, avoid race condition, log closures (#17)
This suggestion implements my comments on [1408](interuss#1408) plus a few additional things: * Avoids memory leak by using weakref in periodic closure check * Avoids race condition between closure and new request by adding _closure_lock * Informs user when automatic closure occurs via log message, including differentiating UTMClientSession instances with int ID * Increases keepalive period to just under max keepalive time likely used by external infrastructure Tested in current state and verified a large number of sessions (90+) and closures. Tested cherrypicking [1411](interuss#1411) and verified a smaller number of sessions and closures.
1 parent 48e5736 commit c1df458

1 file changed

Lines changed: 70 additions & 17 deletions

File tree

monitoring/monitorlib/infrastructure.py

Lines changed: 70 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,18 @@
1+
from __future__ import annotations
2+
13
import asyncio
24
import datetime
35
import functools
46
import threading
57
import time
68
import urllib.parse
9+
import weakref
710
from enum import Enum
811

912
import jwt
1013
import requests
1114
from aiohttp import ClientSession
15+
from loguru import logger
1216

1317
ALL_SCOPES = [
1418
"dss.write.identification_service_areas",
@@ -22,7 +26,7 @@
2226
EPOCH = datetime.datetime.fromtimestamp(0, datetime.UTC)
2327
TOKEN_REFRESH_MARGIN = datetime.timedelta(seconds=15)
2428
CLIENT_TIMEOUT = 10 # seconds
25-
SOCKET_KEEP_ALIVE_LIMIT = 15 # seconds.
29+
SOCKET_KEEP_ALIVE_LIMIT = 57 # seconds.
2630

2731

2832
AuthSpec = str
@@ -89,6 +93,14 @@ class UTMClientSession(requests.Session):
8993
DSS).
9094
"""
9195

96+
_next_session_id: int = 1
97+
_session_id: int
98+
_session_id_lock = threading.Lock()
99+
100+
_closure_timer: threading.Timer | None = None
101+
_closure_lock: threading.Lock
102+
_last_used: float | None = None
103+
92104
def __init__(
93105
self,
94106
prefix_url: str,
@@ -97,14 +109,64 @@ def __init__(
97109
):
98110
super().__init__()
99111

112+
with UTMClientSession._session_id_lock:
113+
self._session_id = UTMClientSession._next_session_id
114+
UTMClientSession._next_session_id += 1
115+
100116
self._prefix_url = prefix_url[0:-1] if prefix_url[-1] == "/" else prefix_url
101117
self.auth_adapter = auth_adapter
102118
self.default_scopes: list[str] | None = None
103119
self.timeout_seconds = timeout_seconds or CLIENT_TIMEOUT
104-
self._last_used = None
120+
self._closure_lock = threading.Lock()
121+
122+
UTMClientSession._start_closure_timer(weakref.ref(self))
123+
124+
@staticmethod
125+
def _start_closure_timer(wref: weakref.ReferenceType[UTMClientSession]) -> None:
126+
def wrapper():
127+
weak_self_final = wref()
128+
if weak_self_final is not None:
129+
try:
130+
weak_self_final.close_if_idle()
131+
finally:
132+
UTMClientSession._start_closure_timer(wref)
133+
134+
weak_self_initial = wref()
135+
if weak_self_initial:
136+
weak_self_initial._closure_timer = threading.Timer(
137+
weak_self_initial.seconds_until_idle(), wrapper
138+
)
139+
weak_self_initial._closure_timer.daemon = True
140+
weak_self_initial._closure_timer.start()
105141

106-
t = threading.Thread(target=self._idle_watchdog, daemon=True)
107-
t.start()
142+
def close_if_idle(self) -> None:
143+
with self._closure_lock:
144+
if (
145+
self._last_used
146+
and time.monotonic() - self._last_used > SOCKET_KEEP_ALIVE_LIMIT
147+
):
148+
logger.debug(
149+
"Closing idle UTMClientSession {} to {} with default scopes {} last used {} (now {})",
150+
self._session_id,
151+
self._prefix_url,
152+
", ".join(self.default_scopes) if self.default_scopes else "<none>",
153+
self._last_used,
154+
time.monotonic(),
155+
)
156+
self.close()
157+
self._last_used = None
158+
159+
def seconds_until_idle(self) -> float:
160+
if self._last_used is None:
161+
return SOCKET_KEEP_ALIVE_LIMIT
162+
else:
163+
return max(
164+
0.0, SOCKET_KEEP_ALIVE_LIMIT - (time.monotonic() - self._last_used)
165+
)
166+
167+
def __del__(self):
168+
if timer := self._closure_timer:
169+
timer.cancel()
108170

109171
# Overrides method on requests.Session
110172
def prepare_request(self, request, **kwargs):
@@ -142,9 +204,10 @@ def request(self, method, url, *args, **kwargs):
142204
if "auth" not in kwargs:
143205
kwargs = self.adjust_request_kwargs(kwargs)
144206

145-
self._last_used = time.monotonic()
146-
147-
return super().request(method, url, *args, **kwargs)
207+
with self._closure_lock:
208+
result = super().request(method, url, *args, **kwargs)
209+
self._last_used = time.monotonic()
210+
return result
148211

149212
def get_prefix_url(self):
150213
return self._prefix_url
@@ -155,16 +218,6 @@ def get(self, *args, **kwargs):
155218
def delete(self, *args, **kwargs):
156219
return super().delete(*args, **kwargs)
157220

158-
def _idle_watchdog(self):
159-
while True:
160-
time.sleep(SOCKET_KEEP_ALIVE_LIMIT)
161-
if (
162-
self._last_used
163-
and time.monotonic() - self._last_used > SOCKET_KEEP_ALIVE_LIMIT
164-
):
165-
self.close()
166-
self._last_used = None
167-
168221

169222
class AsyncUTMTestSession:
170223
"""

0 commit comments

Comments
 (0)