Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 34 additions & 23 deletions pals/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,17 @@ class Locker:

It holds the name of the application (so lock names are namespaced and less likely to
collide) and the SQLAlchemy engine instance (and therefore the connection pool).

If `unlock_all_on_checkin` is true (default), it will defensively do a
unlock all operation when connections are given back to the connection
pool. It is unlikely that a lock is left un-released (esp when using
the context manager), but this step adds safety.
The operation comes with some cost, so it may be desireable to disable
in high-frequency use cases. Users should monitor locks in this case
and recycle connections regularly.
"""
def __init__(self, app_name, db_url=None, blocking_default=True, acquire_timeout_default=30000,
create_engine_callable=None):
create_engine_callable=None, unlock_all_on_checkin=True):
self.app_name = app_name
self.blocking_default = blocking_default
self.acquire_timeout_default = acquire_timeout_default
Expand All @@ -40,28 +48,31 @@ def __init__(self, app_name, db_url=None, blocking_default=True, acquire_timeout
else:
self.engine = sa.create_engine(db_url)

@sa.event.listens_for(self.engine, 'checkin')
def on_conn_checkin(dbapi_connection, connection_record):
"""
This function will be called when a connection is checked back into the connection
pool. That should happen when .close() is called on it or when the connection
proxy goes out of scope and is garbage collected.
"""
if dbapi_connection is None:
# This may occur in rare circumstances where the connection is already closed or an
# error occurred while connecting to the database. In these cases any held locks
# should already be released when the connection terminated.
return

with dbapi_connection.cursor() as cur:
# If the connection is "closed" we want all locks to be cleaned up since this
# connection is going to be recycled. This step is to take extra care that we don't
# accidentally leave a lock acquired.
cur.execute('select pg_advisory_unlock_all()')
# Connections in the pool should not have an open transaction, therefore we need to roll back
# before we return the connection because the execute above might have triggered an automatic
# BEGIN.
dbapi_connection.rollback()
if unlock_all_on_checkin:
sa.event.listen(self.engine, 'checkin', self.on_conn_checkin)

def on_conn_checkin(self, dbapi_connection, connection_record):
"""
This method will be called when a connection is checked back into the connection
pool and `unlock_all_on_checkin` is true. A connection is checked
in to the pool when .close() is called on it or when the connection
proxy goes out of scope and is garbage collected.
"""
if dbapi_connection is None:
# This may occur in rare circumstances where the connection is already closed or an
# error occurred while connecting to the database. In these cases any held locks
# should already be released when the connection terminated.
return

with dbapi_connection.cursor() as cur:
# If the connection is "closed" we want all locks to be cleaned up since this
# connection is going to be recycled. This step is to take extra care that we don't
# accidentally leave a lock acquired.
cur.execute('select pg_advisory_unlock_all()')
# Connections in the pool should not have an open transaction, therefore we need to roll back
# before we return the connection because the execute above might have triggered an automatic
# BEGIN.
dbapi_connection.rollback()

def _lock_name(self, name):
if self.app_name is None:
Expand Down
4 changes: 4 additions & 0 deletions readme.rst
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,10 @@ connection (a.k.a session) ends, even if the client disconnects ungracefully. S
crashes or otherwise disappears, PostgreSQL should notice and remove all locks held by that
connection/session.

Please note that `pg_advisory_unlock_all` takes some time. In high frequency cases, you might
want to disable it (by passing `unlock_all_on_checkin=False` to the `Locker`).
Users should monitor locks in this case and recycle connections regularly.

The possibility could exist that PostgreSQL does not detect a connection has closed and keeps
a lock open indefinitely. However, in manual testing using `scripts/hang.py` no way was found
to end the Python process without PostgreSQL detecting it.
Expand Down