Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/datastores/sql/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@
# SQLALCHEMY_DATABASE_URL, connect_args={"check_same_thread": False}
# )

engine = create_engine(SQLALCHEMY_DATABASE_URL)
engine = create_engine(SQLALCHEMY_DATABASE_URL, pool_pre_ping=True)
SessionLocal = sessionmaker(autocommit=False, autoflush=True, bind=engine)


Expand Down
15 changes: 4 additions & 11 deletions src/importers/gcp/importer.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,17 +139,14 @@ def main() -> None:
"""
Main function to subscribe to Pub/Sub messages and process GCS events.

This function initializes a database session, creates a Pub/Sub subscriber client,
subscribes to the specified subscription path, and listens for incoming messages.
This function initializes a Pub/Sub subscriber client, subscribes to the
specified subscription path, and listens for incoming messages.
Each message is then processed by the process_gcs_message function.
"""
if not ROBOT_ACCOUNT_USER_ID:
logger.error("ROBOT_ACCOUNT_USER_ID environment variable is not set.")
return
try:
# Initialize database session
db = database.SessionLocal()

# Create a Pub/Sub subscriber client
subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(PROJECT_ID, SUBSCRIPTION_ID)
Expand All @@ -158,7 +155,8 @@ def main() -> None:

# Define a callback function to process incoming messages
def callback(message: pubsub_v1.subscriber.message.Message) -> None:
process_gcs_message(message, db)
with database.SessionLocal() as db:
process_gcs_message(message, db)

# Subscribe to the subscription path and start listening for messages
listener = subscriber.subscribe(subscription_path, callback=callback)
Expand All @@ -168,11 +166,6 @@ def callback(message: pubsub_v1.subscriber.message.Message) -> None:
except Exception as e: # Broad exception handling to catch all errors in main.
logger.exception(f"An error occurred during processing: {e}")

finally:
# Clean up resources by closing the database session
if db:
db.close()


if __name__ == "__main__":
main()
Loading