From ee763c6ff961a7777842a17a59f1874ed4903964 Mon Sep 17 00:00:00 2001 From: Johan Berggren Date: Wed, 11 Mar 2026 13:28:50 +0100 Subject: [PATCH] Improve database connection reliability and session management --- src/datastores/sql/database.py | 2 +- src/importers/gcp/importer.py | 15 ++++----------- 2 files changed, 5 insertions(+), 12 deletions(-) diff --git a/src/datastores/sql/database.py b/src/datastores/sql/database.py index af181ea..072c88e 100644 --- a/src/datastores/sql/database.py +++ b/src/datastores/sql/database.py @@ -54,7 +54,7 @@ # SQLALCHEMY_DATABASE_URL, connect_args={"check_same_thread": False} # ) -engine = create_engine(SQLALCHEMY_DATABASE_URL) +engine = create_engine(SQLALCHEMY_DATABASE_URL, pool_pre_ping=True) SessionLocal = sessionmaker(autocommit=False, autoflush=True, bind=engine) diff --git a/src/importers/gcp/importer.py b/src/importers/gcp/importer.py index d4a0225..74fc14d 100644 --- a/src/importers/gcp/importer.py +++ b/src/importers/gcp/importer.py @@ -139,17 +139,14 @@ def main() -> None: """ Main function to subscribe to Pub/Sub messages and process GCS events. - This function initializes a database session, creates a Pub/Sub subscriber client, - subscribes to the specified subscription path, and listens for incoming messages. + This function initializes a Pub/Sub subscriber client, subscribes to the + specified subscription path, and listens for incoming messages. Each message is then processed by the process_gcs_message function. """ if not ROBOT_ACCOUNT_USER_ID: logger.error("ROBOT_ACCOUNT_USER_ID environment variable is not set.") return try: - # Initialize database session - db = database.SessionLocal() - # Create a Pub/Sub subscriber client subscriber = pubsub_v1.SubscriberClient() subscription_path = subscriber.subscription_path(PROJECT_ID, SUBSCRIPTION_ID) @@ -158,7 +155,8 @@ def main() -> None: # Define a callback function to process incoming messages def callback(message: pubsub_v1.subscriber.message.Message) -> None: - process_gcs_message(message, db) + with database.SessionLocal() as db: + process_gcs_message(message, db) # Subscribe to the subscription path and start listening for messages listener = subscriber.subscribe(subscription_path, callback=callback) @@ -168,11 +166,6 @@ def callback(message: pubsub_v1.subscriber.message.Message) -> None: except Exception as e: # Broad exception handling to catch all errors in main. logger.exception(f"An error occurred during processing: {e}") - finally: - # Clean up resources by closing the database session - if db: - db.close() - if __name__ == "__main__": main()