From 863ccd9d13a5533dc05ac054a1ee5b5ff9e81d5f Mon Sep 17 00:00:00 2001 From: Stephen Thompson Date: Thu, 4 Dec 2025 16:00:16 +0000 Subject: [PATCH 01/35] A threaded implemented with blocking connection --- waveform_controller/controller.py | 34 ++++++++++++++++++++++++++----- waveform_controller/db.py | 18 +++++++++++++--- 2 files changed, 44 insertions(+), 8 deletions(-) diff --git a/waveform_controller/controller.py b/waveform_controller/controller.py index e5c73dc..fd8d150 100644 --- a/waveform_controller/controller.py +++ b/waveform_controller/controller.py @@ -3,16 +3,28 @@ based on https://www.rabbitmq.com/tutorials/tutorial-one-python """ +import functools +import threading import pika import waveform_controller.db as db import waveform_controller.settings as settings +emap_db = db.starDB() +emap_db.init_query() + + +def on_message(ch, method_frame, _header_frame, body, args): + thrds = args + delivery_tag = method_frame.delivery_tag + t = threading.Thread( + target=emap_db.waveform_callback, args=(ch, delivery_tag, body) + ) + t.start() + thrds.append(t) + def receiver(): # set up database connection - emap_db = db.starDB() - emap_db.init_query() - rabbitmq_credentials = pika.PlainCredentials( username=settings.RABBITMQ_USERNAME, password=settings.RABBITMQ_PASSWORD ) @@ -23,9 +35,21 @@ def receiver(): ) connection = pika.BlockingConnection(connection_parameters) channel = connection.channel() + channel.basic_qos(prefetch_count=1) + threads = [] + on_message_callback = functools.partial(on_message, args=(threads)) channel.basic_consume( queue=settings.RABBITMQ_QUEUE, auto_ack=False, - on_message_callback=emap_db.waveform_callback, + on_message_callback=on_message_callback, ) - channel.start_consuming() + try: + channel.start_consuming() + except KeyboardInterrupt: + channel.stop_consuming() + + # Wait for all to complete + for thread in threads: + thread.join() + + connection.close() diff --git a/waveform_controller/db.py b/waveform_controller/db.py index c03f6f7..5839ba8 100644 --- a/waveform_controller/db.py +++ b/waveform_controller/db.py @@ -2,11 +2,23 @@ from psycopg2 import sql import json from datetime import datetime, timedelta +import functools import waveform_controller.settings as settings import waveform_controller.csv_writer as writer +def ack_message(ch, delivery_tag): + """Note that `ch` must be the same pika channel instance via which the + message being ACKed was retrieved (AMQP protocol constraint).""" + if ch.is_open: + ch.basic_ack(delivery_tag) + else: + # Channel is already closed, so we can't ACK this message; + # log and/or do something that makes sense for your app in this case. + pass + + class starDB: sql_query: str = "" connection_string: str = "dbname={} user={} password={} host={} port={}".format( @@ -42,7 +54,7 @@ def get_row(self, location_string: str, start_datetime: str, end_datetime: str): return single_row - def waveform_callback(self, ch, method, properties, body): + def waveform_callback(self, ch, delivery_tag, body): data = json.loads(body) location_string = data.get("mappedLocationString", "unknown") observation_time = data.get("observationTime", "NaT") @@ -54,6 +66,6 @@ def waveform_callback(self, ch, method, properties, body): obs_time_str = observation_time.strftime("%Y-%m-%d:%H:%M:%S") start_time_str = start_time.strftime("%Y-%m-%d:%H:%M:%S") matched_mrn = self.get_row(location_string, start_time_str, obs_time_str) - if writer.write_frame(data, matched_mrn[2], matched_mrn[0]): - ch.basic_ack(method.delivery_tag) + cb = functools.partial(ack_message, ch, delivery_tag) + ch.connection.add_callback_threadsafe(cb) From 9ca869e78c6f6c17910854d4dff5207eaaef651e Mon Sep 17 00:00:00 2001 From: Stephen Thompson Date: Mon, 8 Dec 2025 16:11:36 +0000 Subject: [PATCH 02/35] Use source stream ID for file name, sourceSystem is not very interesting --- waveform_controller/csv_writer.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/waveform_controller/csv_writer.py b/waveform_controller/csv_writer.py index 3b8a9b1..c449b6b 100644 --- a/waveform_controller/csv_writer.py +++ b/waveform_controller/csv_writer.py @@ -20,7 +20,7 @@ def write_frame(waveform_message: dict, csn: str, mrn: str) -> bool: :return: True if write was successful. """ - sourceSystem = waveform_message.get("sourceSystem", None) + sourceSystem = waveform_message.get("sourceStreamId", None) observationTime = waveform_message.get("observationTime", False) if not observationTime: From 72f32932b5ecb83238ae319fe7023a544c24d9be Mon Sep 17 00:00:00 2001 From: Stephen Thompson Date: Mon, 8 Dec 2025 16:17:23 +0000 Subject: [PATCH 03/35] Add the sourceSystemId to the csv --- waveform_controller/csv_writer.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/waveform_controller/csv_writer.py b/waveform_controller/csv_writer.py index c449b6b..d9cce02 100644 --- a/waveform_controller/csv_writer.py +++ b/waveform_controller/csv_writer.py @@ -20,7 +20,7 @@ def write_frame(waveform_message: dict, csn: str, mrn: str) -> bool: :return: True if write was successful. """ - sourceSystem = waveform_message.get("sourceStreamId", None) + sourceStreamId = waveform_message.get("sourceStreamId", None) observationTime = waveform_message.get("observationTime", False) if not observationTime: @@ -33,7 +33,7 @@ def write_frame(waveform_message: dict, csn: str, mrn: str) -> bool: Path(out_path).mkdir(exist_ok=True) filename = out_path + create_file_name( - sourceSystem, observation_datetime, csn, units + sourceStreamId, observation_datetime, csn, units ) with open(filename, "a") as fileout: wv_writer = csv.writer(fileout, delimiter=",") @@ -45,6 +45,7 @@ def write_frame(waveform_message: dict, csn: str, mrn: str) -> bool: [ csn, mrn, + sourceStreamId, units, waveform_message.get("samplingRate", ""), observationTime, From 3416fa8cd28c729b9b9f8e0ed9609b262225efac Mon Sep 17 00:00:00 2001 From: Stephen Thompson Date: Mon, 8 Dec 2025 17:05:31 +0000 Subject: [PATCH 04/35] Update query to use location visit admission times --- waveform_controller/sql/mrn_based_on_bed_and_datetime.sql | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/waveform_controller/sql/mrn_based_on_bed_and_datetime.sql b/waveform_controller/sql/mrn_based_on_bed_and_datetime.sql index 2f998f4..f14f21b 100644 --- a/waveform_controller/sql/mrn_based_on_bed_and_datetime.sql +++ b/waveform_controller/sql/mrn_based_on_bed_and_datetime.sql @@ -14,5 +14,6 @@ INNER JOIN {schema_name}.location_visit lv INNER JOIN {schema_name}.location loc ON lv.location_id = loc.location_id WHERE loc.location_string = %(location_string)s - AND hv.valid_from BETWEEN %(start_datetime)s AND %(end_datetime)s -ORDER by hv.valid_from DESC + AND lv.admission_datetime BETWEEN %(start_datetime)s AND %(end_datetime)s +ORDER by lv.admission_datetime DESC +LIMIT 1 From 5cb051b78fb4e82dcd8c68e71a7985f4e6feff08 Mon Sep 17 00:00:00 2001 From: Stephen Thompson Date: Mon, 8 Dec 2025 17:41:43 +0000 Subject: [PATCH 05/35] Check location discharge datetime too --- waveform_controller/sql/mrn_based_on_bed_and_datetime.sql | 1 + 1 file changed, 1 insertion(+) diff --git a/waveform_controller/sql/mrn_based_on_bed_and_datetime.sql b/waveform_controller/sql/mrn_based_on_bed_and_datetime.sql index f14f21b..ff76e8f 100644 --- a/waveform_controller/sql/mrn_based_on_bed_and_datetime.sql +++ b/waveform_controller/sql/mrn_based_on_bed_and_datetime.sql @@ -15,5 +15,6 @@ INNER JOIN {schema_name}.location loc ON lv.location_id = loc.location_id WHERE loc.location_string = %(location_string)s AND lv.admission_datetime BETWEEN %(start_datetime)s AND %(end_datetime)s + AND ( lv.discharge_datetime > %(end_datetime)s OR lv.discharge_datetime is NULL ) ORDER by lv.admission_datetime DESC LIMIT 1 From 40f2f49d24d05363ff61950b6f590b768c95c50c Mon Sep 17 00:00:00 2001 From: Stephen Thompson Date: Tue, 9 Dec 2025 10:22:40 +0000 Subject: [PATCH 06/35] Added failing test --- pyproject.toml | 1 + tests/__init__.py | 0 tests/test_file_writer.py | 24 +++++++++++++++ uv.lock | 63 +++++++++++++++++++++++++++++++++++++++ 4 files changed, 88 insertions(+) create mode 100644 tests/__init__.py create mode 100644 tests/test_file_writer.py diff --git a/pyproject.toml b/pyproject.toml index fc9c7bb..153c6e9 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -8,6 +8,7 @@ dependencies = [ "pika>=1.3.2", "pre-commit>=4.5.0", "psycopg2-binary>=2.9.11", + "pytest>=9.0.2", ] [project.scripts] diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/test_file_writer.py b/tests/test_file_writer.py new file mode 100644 index 0000000..53d26e4 --- /dev/null +++ b/tests/test_file_writer.py @@ -0,0 +1,24 @@ +import pytest +from waveform_controller.csv_writer import create_file_name +from datetime import datetime, timezone + + +@pytest.mark.parametrize( + "units, expected_filename", + [ + ("uV", "2025-01-01.12345678.11.uV.csv"), + ("mL/s", "2025-01-01.12345678.11.mL/s.csv"), + ], +) +def test_create_file_name_handles_units(units, expected_filename, tmp_path): + sourceSystem = "11" + observationTime = datetime(2025, 1, 1, tzinfo=timezone.utc) + csn = "12345678" + + filename = create_file_name(sourceSystem, observationTime, csn, units) + + assert filename == expected_filename + + # check we can write to it + with open(f"{tmp_path}/{filename}", "w") as fileout: + fileout.write("Test string") diff --git a/uv.lock b/uv.lock index a953717..922bbd2 100644 --- a/uv.lock +++ b/uv.lock @@ -11,6 +11,15 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/db/3c/33bac158f8ab7f89b2e59426d5fe2e4f63f7ed25df84c036890172b412b5/cfgv-3.5.0-py2.py3-none-any.whl", hash = "sha256:a8dc6b26ad22ff227d2634a65cb388215ce6cc96bbcc5cfde7641ae87e8dacc0", size = 7445, upload-time = "2025-11-19T20:55:50.744Z" }, ] +[[package]] +name = "colorama" +version = "0.4.6" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/d8/53/6f443c9a4a8358a93a6792e2acffb9d9d5cb0a5cfd8802644b7b1c9a02e4/colorama-0.4.6.tar.gz", hash = "sha256:08695f5cb7ed6e0531a20572697297273c47b8cae5a63ffc6d6ed5c201be6e44", size = 27697, upload-time = "2022-10-25T02:36:22.414Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/d1/d6/3965ed04c63042e047cb6a3e6ed1a63a35087b6a609aa3a15ed8ac56c221/colorama-0.4.6-py2.py3-none-any.whl", hash = "sha256:4f1d9991f5acc0ca119f9d443620b77f9d6b33703e51011c16baf57afb285fc6", size = 25335, upload-time = "2022-10-25T02:36:20.889Z" }, +] + [[package]] name = "distlib" version = "0.4.0" @@ -38,6 +47,15 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/0f/1c/e5fd8f973d4f375adb21565739498e2e9a1e54c858a97b9a8ccfdc81da9b/identify-2.6.15-py2.py3-none-any.whl", hash = "sha256:1181ef7608e00704db228516541eb83a88a9f94433a8c80bb9b5bd54b1d81757", size = 99183, upload-time = "2025-10-02T17:43:39.137Z" }, ] +[[package]] +name = "iniconfig" +version = "2.3.0" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/72/34/14ca021ce8e5dfedc35312d08ba8bf51fdd999c576889fc2c24cb97f4f10/iniconfig-2.3.0.tar.gz", hash = "sha256:c76315c77db068650d49c5b56314774a7804df16fee4402c1f19d6d15d8c4730", size = 20503, upload-time = "2025-10-18T21:55:43.219Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/cb/b1/3846dd7f199d53cb17f49cba7e651e9ce294d8497c8c150530ed11865bb8/iniconfig-2.3.0-py3-none-any.whl", hash = "sha256:f631c04d2c48c52b84d0d0549c99ff3859c98df65b3101406327ecc7d53fbf12", size = 7484, upload-time = "2025-10-18T21:55:41.639Z" }, +] + [[package]] name = "nodeenv" version = "1.9.1" @@ -47,6 +65,15 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/d2/1d/1b658dbd2b9fa9c4c9f32accbfc0205d532c8c6194dc0f2a4c0428e7128a/nodeenv-1.9.1-py2.py3-none-any.whl", hash = "sha256:ba11c9782d29c27c70ffbdda2d7415098754709be8a7056d79a737cd901155c9", size = 22314, upload-time = "2024-06-04T18:44:08.352Z" }, ] +[[package]] +name = "packaging" +version = "25.0" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/a1/d4/1fc4078c65507b51b96ca8f8c3ba19e6a61c8253c72794544580a7b6c24d/packaging-25.0.tar.gz", hash = "sha256:d443872c98d677bf60f6a1f2f8c1cb748e8fe762d2bf9d3148b5599295b0fc4f", size = 165727, upload-time = "2025-04-19T11:48:59.673Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/20/12/38679034af332785aac8774540895e234f4d07f7545804097de4b666afd8/packaging-25.0-py3-none-any.whl", hash = "sha256:29572ef2b1f17581046b3a2227d5c611fb25ec70ca1ba8554b24b0e69331a484", size = 66469, upload-time = "2025-04-19T11:48:57.875Z" }, +] + [[package]] name = "pika" version = "1.3.2" @@ -65,6 +92,15 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/73/cb/ac7874b3e5d58441674fb70742e6c374b28b0c7cb988d37d991cde47166c/platformdirs-4.5.0-py3-none-any.whl", hash = "sha256:e578a81bb873cbb89a41fcc904c7ef523cc18284b7e3b3ccf06aca1403b7ebd3", size = 18651, upload-time = "2025-10-08T17:44:47.223Z" }, ] +[[package]] +name = "pluggy" +version = "1.6.0" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/f9/e2/3e91f31a7d2b083fe6ef3fa267035b518369d9511ffab804f839851d2779/pluggy-1.6.0.tar.gz", hash = "sha256:7dcc130b76258d33b90f61b658791dede3486c3e6bfb003ee5c9bfb396dd22f3", size = 69412, upload-time = "2025-05-15T12:30:07.975Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/54/20/4d324d65cc6d9205fabedc306948156824eb9f0ee1633355a8f7ec5c66bf/pluggy-1.6.0-py3-none-any.whl", hash = "sha256:e920276dd6813095e9377c0bc5566d94c932c33b27a3e3945d8389c374dd4746", size = 20538, upload-time = "2025-05-15T12:30:06.134Z" }, +] + [[package]] name = "pre-commit" version = "4.5.0" @@ -133,6 +169,31 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/e1/36/9c0c326fe3a4227953dfb29f5d0c8ae3b8eb8c1cd2967aa569f50cb3c61f/psycopg2_binary-2.9.11-cp314-cp314-win_amd64.whl", hash = "sha256:4012c9c954dfaccd28f94e84ab9f94e12df76b4afb22331b1f0d3154893a6316", size = 2803913, upload-time = "2025-10-10T11:13:57.058Z" }, ] +[[package]] +name = "pygments" +version = "2.19.2" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/b0/77/a5b8c569bf593b0140bde72ea885a803b82086995367bf2037de0159d924/pygments-2.19.2.tar.gz", hash = "sha256:636cb2477cec7f8952536970bc533bc43743542f70392ae026374600add5b887", size = 4968631, upload-time = "2025-06-21T13:39:12.283Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/c7/21/705964c7812476f378728bdf590ca4b771ec72385c533964653c68e86bdc/pygments-2.19.2-py3-none-any.whl", hash = "sha256:86540386c03d588bb81d44bc3928634ff26449851e99741617ecb9037ee5ec0b", size = 1225217, upload-time = "2025-06-21T13:39:07.939Z" }, +] + +[[package]] +name = "pytest" +version = "9.0.2" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "colorama", marker = "sys_platform == 'win32'" }, + { name = "iniconfig" }, + { name = "packaging" }, + { name = "pluggy" }, + { name = "pygments" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/d1/db/7ef3487e0fb0049ddb5ce41d3a49c235bf9ad299b6a25d5780a89f19230f/pytest-9.0.2.tar.gz", hash = "sha256:75186651a92bd89611d1d9fc20f0b4345fd827c41ccd5c299a868a05d70edf11", size = 1568901, upload-time = "2025-12-06T21:30:51.014Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/3b/ab/b3226f0bd7cdcf710fbede2b3548584366da3b19b5021e74f5bde2a8fa3f/pytest-9.0.2-py3-none-any.whl", hash = "sha256:711ffd45bf766d5264d487b917733b453d917afd2b0ad65223959f59089f875b", size = 374801, upload-time = "2025-12-06T21:30:49.154Z" }, +] + [[package]] name = "pyyaml" version = "6.0.3" @@ -210,6 +271,7 @@ dependencies = [ { name = "pika" }, { name = "pre-commit" }, { name = "psycopg2-binary" }, + { name = "pytest" }, ] [package.metadata] @@ -217,4 +279,5 @@ requires-dist = [ { name = "pika", specifier = ">=1.3.2" }, { name = "pre-commit", specifier = ">=4.5.0" }, { name = "psycopg2-binary", specifier = ">=2.9.11" }, + { name = "pytest", specifier = ">=9.0.2" }, ] From 41d9dceaa7236da3146aaaf864172107c4d0573c Mon Sep 17 00:00:00 2001 From: Stephen Thompson Date: Tue, 9 Dec 2025 10:25:55 +0000 Subject: [PATCH 07/35] Added ci --- .github/workflows/pytest.yml | 28 ++++++++++++++++++++++++++++ 1 file changed, 28 insertions(+) create mode 100644 .github/workflows/pytest.yml diff --git a/.github/workflows/pytest.yml b/.github/workflows/pytest.yml new file mode 100644 index 0000000..65af8da --- /dev/null +++ b/.github/workflows/pytest.yml @@ -0,0 +1,28 @@ +name: Run Python tests + +on: + push: + branches: [main, dev] + pull_request: + branches: [main, dev] + types: ["opened", "reopened", "synchronize", "ready_for_review", "draft"] + workflow_dispatch: + +jobs: + build: + name: Run tests + runs-on: ubuntu-latest + strategy: + matrix: + python-version: ["3.11"] + + steps: + - uses: actions/checkout@v4 + - name: Set up Python ${{ matrix.python-version }} + uses: actions/setup-python@v4 + with: + python-version: ${{ matrix.python-version }} + - name: Install dependencies + run: pip install pytest + - name: run pytest + run: pytest tests/* From b4b5b20b7dadfe3eb86775c54745b9976bba0e3b Mon Sep 17 00:00:00 2001 From: Stephen Thompson Date: Tue, 9 Dec 2025 10:31:13 +0000 Subject: [PATCH 08/35] Fix with string substition --- tests/test_file_writer.py | 2 +- waveform_controller/csv_writer.py | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/test_file_writer.py b/tests/test_file_writer.py index 53d26e4..514e8e3 100644 --- a/tests/test_file_writer.py +++ b/tests/test_file_writer.py @@ -7,7 +7,7 @@ "units, expected_filename", [ ("uV", "2025-01-01.12345678.11.uV.csv"), - ("mL/s", "2025-01-01.12345678.11.mL/s.csv"), + ("mL/s", "2025-01-01.12345678.11.mLps.csv"), ], ) def test_create_file_name_handles_units(units, expected_filename, tmp_path): diff --git a/waveform_controller/csv_writer.py b/waveform_controller/csv_writer.py index 3b8a9b1..6a0a3b5 100644 --- a/waveform_controller/csv_writer.py +++ b/waveform_controller/csv_writer.py @@ -11,6 +11,7 @@ def create_file_name( """Create a unique file name based on the patient contact serial number (csn) the date, and the source system.""" datestring = observationTime.strftime("%Y-%m-%d") + units = units.replace("/", "p") return f"{datestring}.{csn}.{sourceSystem}.{units}.csv" From 702d818c1878c67e5b735a3368dc0f7b72285fa1 Mon Sep 17 00:00:00 2001 From: Stephen Thompson Date: Tue, 9 Dec 2025 11:19:36 +0000 Subject: [PATCH 09/35] Also change % to percent --- tests/test_file_writer.py | 1 + waveform_controller/csv_writer.py | 1 + 2 files changed, 2 insertions(+) diff --git a/tests/test_file_writer.py b/tests/test_file_writer.py index 514e8e3..b1ccf2c 100644 --- a/tests/test_file_writer.py +++ b/tests/test_file_writer.py @@ -8,6 +8,7 @@ [ ("uV", "2025-01-01.12345678.11.uV.csv"), ("mL/s", "2025-01-01.12345678.11.mLps.csv"), + ("%", "2025-01-01.12345678.11.percent.csv"), ], ) def test_create_file_name_handles_units(units, expected_filename, tmp_path): diff --git a/waveform_controller/csv_writer.py b/waveform_controller/csv_writer.py index 6a0a3b5..0b0483b 100644 --- a/waveform_controller/csv_writer.py +++ b/waveform_controller/csv_writer.py @@ -12,6 +12,7 @@ def create_file_name( (csn) the date, and the source system.""" datestring = observationTime.strftime("%Y-%m-%d") units = units.replace("/", "p") + units = units.replace("%", "percent") return f"{datestring}.{csn}.{sourceSystem}.{units}.csv" From e2d88ea4a86f37dbf57e4ae6212ff8ca461b21c6 Mon Sep 17 00:00:00 2001 From: Stephen Thompson Date: Tue, 9 Dec 2025 11:39:32 +0000 Subject: [PATCH 10/35] With uv --- .github/workflows/pytest.yml | 22 ++++++++++------------ 1 file changed, 10 insertions(+), 12 deletions(-) diff --git a/.github/workflows/pytest.yml b/.github/workflows/pytest.yml index 65af8da..a1adbb9 100644 --- a/.github/workflows/pytest.yml +++ b/.github/workflows/pytest.yml @@ -12,17 +12,15 @@ jobs: build: name: Run tests runs-on: ubuntu-latest - strategy: - matrix: - python-version: ["3.11"] steps: - - uses: actions/checkout@v4 - - name: Set up Python ${{ matrix.python-version }} - uses: actions/setup-python@v4 - with: - python-version: ${{ matrix.python-version }} - - name: Install dependencies - run: pip install pytest - - name: run pytest - run: pytest tests/* + - uses: actions/checkout@v5 + - name: Install uv + uses: astral-sh/setup-uv@v7 + + - name: Install the project + run: uv sync --locked --all-extras --dev + + - name: Run tests + # For example, using `pytest` + run: uv run pytest tests From 338af4ea068fb93ef4e3c9a74fef47e3ea00c607 Mon Sep 17 00:00:00 2001 From: Stephen Thompson Date: Tue, 9 Dec 2025 11:43:55 +0000 Subject: [PATCH 11/35] Tidy workflow --- .github/workflows/pytest.yml | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/.github/workflows/pytest.yml b/.github/workflows/pytest.yml index a1adbb9..e85121a 100644 --- a/.github/workflows/pytest.yml +++ b/.github/workflows/pytest.yml @@ -1,4 +1,4 @@ -name: Run Python tests +name: Run pytest on: push: @@ -10,7 +10,7 @@ on: jobs: build: - name: Run tests + name: Run pytest runs-on: ubuntu-latest steps: @@ -18,9 +18,8 @@ jobs: - name: Install uv uses: astral-sh/setup-uv@v7 - - name: Install the project + - name: Install dependencies run: uv sync --locked --all-extras --dev - - name: Run tests - # For example, using `pytest` + - name: Run the tests run: uv run pytest tests From 7b81b051ee01d57f62f432e6f783063672ac0f02 Mon Sep 17 00:00:00 2001 From: Stephen Thompson Date: Thu, 11 Dec 2025 08:46:37 +0000 Subject: [PATCH 12/35] Tidied up module layout. Implemented a connection pool to avoid creating a new connection for each message. Added nack messages for some failures --- .pre-commit-config.yaml | 2 +- pyproject.toml | 2 +- {waveform_controller => src}/__init__.py | 0 {waveform_controller => src}/controller.py | 5 +- {waveform_controller => src}/csv_writer.py | 0 {waveform_controller => src}/db.py | 48 +++++++++++++------ {waveform_controller => src}/settings.py | 0 .../sql/mrn_based_on_bed_and_datetime.sql | 0 8 files changed, 39 insertions(+), 18 deletions(-) rename {waveform_controller => src}/__init__.py (100%) rename {waveform_controller => src}/controller.py (94%) rename {waveform_controller => src}/csv_writer.py (100%) rename {waveform_controller => src}/db.py (61%) rename {waveform_controller => src}/settings.py (100%) rename {waveform_controller => src}/sql/mrn_based_on_bed_and_datetime.sql (100%) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index a991089..e14bfa0 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -10,7 +10,7 @@ repos: - id: ruff args: - --fix - - waveform_controller + - ./ - id: ruff-format # Type-checking python code. - repo: https://github.com/pre-commit/mirrors-mypy diff --git a/pyproject.toml b/pyproject.toml index fc9c7bb..c055252 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -11,4 +11,4 @@ dependencies = [ ] [project.scripts] -emap-extract-waveform = "waveform_controller.controller:receiver" +emap-extract-waveform = "controller:receiver" diff --git a/waveform_controller/__init__.py b/src/__init__.py similarity index 100% rename from waveform_controller/__init__.py rename to src/__init__.py diff --git a/waveform_controller/controller.py b/src/controller.py similarity index 94% rename from waveform_controller/controller.py rename to src/controller.py index fd8d150..d1a7e81 100644 --- a/waveform_controller/controller.py +++ b/src/controller.py @@ -6,11 +6,12 @@ import functools import threading import pika -import waveform_controller.db as db -import waveform_controller.settings as settings +import db as db +import settings as settings emap_db = db.starDB() emap_db.init_query() +emap_db.create_connection_pool() def on_message(ch, method_frame, _header_frame, body, args): diff --git a/waveform_controller/csv_writer.py b/src/csv_writer.py similarity index 100% rename from waveform_controller/csv_writer.py rename to src/csv_writer.py diff --git a/waveform_controller/db.py b/src/db.py similarity index 61% rename from waveform_controller/db.py rename to src/db.py index 5839ba8..64f2eaf 100644 --- a/waveform_controller/db.py +++ b/src/db.py @@ -1,11 +1,15 @@ import psycopg2 -from psycopg2 import sql +from psycopg2 import sql, pool import json from datetime import datetime, timedelta import functools +import logging -import waveform_controller.settings as settings -import waveform_controller.csv_writer as writer +import settings as settings +import csv_writer as writer + +logging.basicConfig(format="%(levelname)s:%(asctime)s: %(message)s") +logger = logging.getLogger(__name__) def ack_message(ch, delivery_tag): @@ -14,9 +18,14 @@ def ack_message(ch, delivery_tag): if ch.is_open: ch.basic_ack(delivery_tag) else: - # Channel is already closed, so we can't ACK this message; - # log and/or do something that makes sense for your app in this case. - pass + logger.warning("Attempting to acknowledge a message on a closed channel.") + + +def nack_message(ch, delivery_tag): + if ch.is_open: + ch.basic_nack(delivery_tag) + else: + logger.warning("Attempting to not acknowledge a message on a closed channel.") class starDB: @@ -28,11 +37,13 @@ class starDB: settings.UDS_HOST, # type:ignore settings.UDS_PORT, # type:ignore ) + connection_pool: pool.ThreadedConnectionPool + + def create_connection_pool(self): + self.connection_pool = pool.ThreadedConnectionPool(2, 5, self.connection_string) def init_query(self): - with open( - "waveform_controller/sql/mrn_based_on_bed_and_datetime.sql", "r" - ) as file: + with open("src/sql/mrn_based_on_bed_and_datetime.sql", "r") as file: self.sql_query = sql.SQL(file.read()) self.sql_query = self.sql_query.format( schema_name=sql.Identifier(settings.SCHEMA_NAME) @@ -45,11 +56,14 @@ def get_row(self, location_string: str, start_datetime: str, end_datetime: str): "end_datetime": end_datetime, } try: - with psycopg2.connect(self.connection_string) as db_connection: - with db_connection.cursor() as curs: - curs.execute(self.sql_query, parameters) - single_row = curs.fetchone() + db_connection = self.connection_pool.getconn() + with db_connection.cursor() as curs: + curs.execute(self.sql_query, parameters) + single_row = curs.fetchone() + self.connection_pool.putconn(db_connection) except psycopg2.errors.UndefinedTable: + logger.error("Failed to find required tables in database.") + self.connection_pool.putconn(db_connection) raise ConnectionError("There is no table in your data base") return single_row @@ -65,7 +79,13 @@ def waveform_callback(self, ch, delivery_tag, body): start_time = observation_time - timedelta(weeks=52) obs_time_str = observation_time.strftime("%Y-%m-%d:%H:%M:%S") start_time_str = start_time.strftime("%Y-%m-%d:%H:%M:%S") - matched_mrn = self.get_row(location_string, start_time_str, obs_time_str) + try: + matched_mrn = self.get_row(location_string, start_time_str, obs_time_str) + except ConnectionError: + cb = functools.partial(nack_message, ch, delivery_tag) + ch.connection.add_callback_threadsafe(cb) + return + if writer.write_frame(data, matched_mrn[2], matched_mrn[0]): cb = functools.partial(ack_message, ch, delivery_tag) ch.connection.add_callback_threadsafe(cb) diff --git a/waveform_controller/settings.py b/src/settings.py similarity index 100% rename from waveform_controller/settings.py rename to src/settings.py diff --git a/waveform_controller/sql/mrn_based_on_bed_and_datetime.sql b/src/sql/mrn_based_on_bed_and_datetime.sql similarity index 100% rename from waveform_controller/sql/mrn_based_on_bed_and_datetime.sql rename to src/sql/mrn_based_on_bed_and_datetime.sql From 8ec74b8bcbe00845ce41bc41d7dfa27c53334fc4 Mon Sep 17 00:00:00 2001 From: Stephen Thompson Date: Thu, 11 Dec 2025 09:13:00 +0000 Subject: [PATCH 13/35] Added location string to output to help with debugging. config for mypy and ruff --- .pre-commit-config.yaml | 2 +- src/controller.py | 4 ++-- src/csv_writer.py | 1 + src/db.py | 4 ++-- 4 files changed, 6 insertions(+), 5 deletions(-) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index e14bfa0..f202a2e 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -23,7 +23,7 @@ repos: "types-psycopg2", "types-pika" ] - files: waveform_controller/ + files: src/ # ---------- # Formats docstrings to comply with PEP257 - repo: https://github.com/PyCQA/docformatter diff --git a/src/controller.py b/src/controller.py index d1a7e81..884e55e 100644 --- a/src/controller.py +++ b/src/controller.py @@ -6,8 +6,8 @@ import functools import threading import pika -import db as db -import settings as settings +import db as db # type:ignore +import settings as settings # type:ignore emap_db = db.starDB() emap_db.init_query() diff --git a/src/csv_writer.py b/src/csv_writer.py index d9cce02..d0f1801 100644 --- a/src/csv_writer.py +++ b/src/csv_writer.py @@ -49,6 +49,7 @@ def write_frame(waveform_message: dict, csn: str, mrn: str) -> bool: units, waveform_message.get("samplingRate", ""), observationTime, + waveform_message.get("mappedLocationString", ""), waveform_data, ] ) diff --git a/src/db.py b/src/db.py index 64f2eaf..b3c78af 100644 --- a/src/db.py +++ b/src/db.py @@ -5,8 +5,8 @@ import functools import logging -import settings as settings -import csv_writer as writer +import settings as settings # type:ignore +import csv_writer as writer # type:ignore logging.basicConfig(format="%(levelname)s:%(asctime)s: %(message)s") logger = logging.getLogger(__name__) From 45d542a874f95b8987afbe8cbaadc187174141b5 Mon Sep 17 00:00:00 2001 From: Stephen Thompson Date: Thu, 11 Dec 2025 09:18:38 +0000 Subject: [PATCH 14/35] Tidied variable names --- src/csv_writer.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/csv_writer.py b/src/csv_writer.py index a44305b..d99a9e1 100644 --- a/src/csv_writer.py +++ b/src/csv_writer.py @@ -6,14 +6,14 @@ def create_file_name( - sourceSystem: str, observationTime: datetime, csn: str, units: str + sourceStreamId: str, observationTime: datetime, csn: str, units: str ) -> str: """Create a unique file name based on the patient contact serial number (csn) the date, and the source system.""" datestring = observationTime.strftime("%Y-%m-%d") units = units.replace("/", "p") units = units.replace("%", "percent") - return f"{datestring}.{csn}.{sourceSystem}.{units}.csv" + return f"{datestring}.{csn}.{sourceStreamId}.{units}.csv" def write_frame(waveform_message: dict, csn: str, mrn: str) -> bool: From f4c8e2d40e676d78bc4ad91eb66566badf6b1387 Mon Sep 17 00:00:00 2001 From: Stephen Thompson Date: Mon, 15 Dec 2025 09:15:16 +0000 Subject: [PATCH 15/35] Handle thread start runtime error --- src/controller.py | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/src/controller.py b/src/controller.py index 884e55e..e5a9f2b 100644 --- a/src/controller.py +++ b/src/controller.py @@ -5,10 +5,14 @@ import functools import threading +import logging import pika import db as db # type:ignore import settings as settings # type:ignore +logging.basicConfig(format="%(levelname)s:%(asctime)s: %(message)s") +logger = logging.getLogger(__name__) + emap_db = db.starDB() emap_db.init_query() emap_db.create_connection_pool() @@ -20,7 +24,15 @@ def on_message(ch, method_frame, _header_frame, body, args): t = threading.Thread( target=emap_db.waveform_callback, args=(ch, delivery_tag, body) ) - t.start() + try: + t.start() + except RuntimeError as e: + db.nack_message(ch, delivery_tag) + logger.error( + f"Failed to start thread, got {e}. nb. There are {len(thrds)} active threads." + ) + return + thrds.append(t) From e98b7ea70b5dfed0105596505daa2cea5638a3d7 Mon Sep 17 00:00:00 2001 From: Stephen Thompson Date: Mon, 15 Dec 2025 14:42:56 +0000 Subject: [PATCH 16/35] Rough queue --- src/controller.py | 70 +++++++++++++++++++++++++++++++++++++---------- src/db.py | 25 ++--------------- 2 files changed, 58 insertions(+), 37 deletions(-) diff --git a/src/controller.py b/src/controller.py index e5a9f2b..2a427f5 100644 --- a/src/controller.py +++ b/src/controller.py @@ -4,7 +4,10 @@ """ import functools +import json +from datetime import datetime, timedelta import threading +import queue import logging import pika import db as db # type:ignore @@ -17,23 +20,54 @@ emap_db.init_query() emap_db.create_connection_pool() +worker_queue = queue.Queue(maxsize = 1) + +class waveform_message (): + def __init__(self, ch, delivery_tag, body): + self.ch = ch + self.delivery_tag = delivery_tag + self.body = body + +def waveform_callback(): + message = worker_queue.get() + logger.warn(f"Got a message {message.delivery_tag}") + if message is not None: + data = json.loads(message.body) + location_string = data.get("mappedLocationString", "unknown") + observation_time = data.get("observationTime", "NaT") + observation_time = datetime.fromtimestamp(observation_time) + # I found in testing that to find the first patient I had to go back 7 months. I'm not sure this + # is expected, but I suppose an ICU patient could occupy a bed for a long time. Let's use + # 52 weeks for now. + start_time = observation_time - timedelta(weeks=52) + obs_time_str = observation_time.strftime("%Y-%m-%d:%H:%M:%S") + start_time_str = start_time.strftime("%Y-%m-%d:%H:%M:%S") + try: + logger.warn(f"Looking for mrn") + matched_mrn = emap_db.get_row(location_string, start_time_str, obs_time_str) + except ConnectionError: + cb = functools.partial(nack_message, message.ch, message.delivery_tag) + ch.connection.add_callback_threadsafe(cb) + return + + if writer.write_frame(data, matched_mrn[2], matched_mrn[0]): + cb = functools.partial(ack_message, ch, delivery_tag) + ch.connection.add_callback_threadsafe(cb) + + worker_queue.task_done() + else: + logger.warning("empty message") + def on_message(ch, method_frame, _header_frame, body, args): - thrds = args - delivery_tag = method_frame.delivery_tag - t = threading.Thread( - target=emap_db.waveform_callback, args=(ch, delivery_tag, body) - ) - try: - t.start() - except RuntimeError as e: - db.nack_message(ch, delivery_tag) - logger.error( - f"Failed to start thread, got {e}. nb. There are {len(thrds)} active threads." - ) - return + worker_queue = args - thrds.append(t) + logger.warn("Got a message") + wf_message = waveform_message(ch, method_frame.delivery_tag, body) + if not worker_queue.full(): + worker_queue.put(wf_message) + else: + logger.warning("Working is queue is full.") def receiver(): @@ -49,8 +83,14 @@ def receiver(): connection = pika.BlockingConnection(connection_parameters) channel = connection.channel() channel.basic_qos(prefetch_count=1) + threads = [] - on_message_callback = functools.partial(on_message, args=(threads)) + # I just want on thread, but in theory this should work for more + worker_thread = threading.Thread(target=waveform_callback) + worker_thread.start() + threads.append(worker_thread) + + on_message_callback = functools.partial(on_message, args=(worker_queue)) channel.basic_consume( queue=settings.RABBITMQ_QUEUE, auto_ack=False, diff --git a/src/db.py b/src/db.py index b3c78af..58b0ca2 100644 --- a/src/db.py +++ b/src/db.py @@ -58,34 +58,15 @@ def get_row(self, location_string: str, start_datetime: str, end_datetime: str): try: db_connection = self.connection_pool.getconn() with db_connection.cursor() as curs: + logger.error("I get data") curs.execute(self.sql_query, parameters) + logger.error("I got data") single_row = curs.fetchone() self.connection_pool.putconn(db_connection) except psycopg2.errors.UndefinedTable: + return ("null", "null", "null") logger.error("Failed to find required tables in database.") self.connection_pool.putconn(db_connection) raise ConnectionError("There is no table in your data base") return single_row - - def waveform_callback(self, ch, delivery_tag, body): - data = json.loads(body) - location_string = data.get("mappedLocationString", "unknown") - observation_time = data.get("observationTime", "NaT") - observation_time = datetime.fromtimestamp(observation_time) - # I found in testing that to find the first patient I had to go back 7 months. I'm not sure this - # is expected, but I suppose an ICU patient could occupy a bed for a long time. Let's use - # 52 weeks for now. - start_time = observation_time - timedelta(weeks=52) - obs_time_str = observation_time.strftime("%Y-%m-%d:%H:%M:%S") - start_time_str = start_time.strftime("%Y-%m-%d:%H:%M:%S") - try: - matched_mrn = self.get_row(location_string, start_time_str, obs_time_str) - except ConnectionError: - cb = functools.partial(nack_message, ch, delivery_tag) - ch.connection.add_callback_threadsafe(cb) - return - - if writer.write_frame(data, matched_mrn[2], matched_mrn[0]): - cb = functools.partial(ack_message, ch, delivery_tag) - ch.connection.add_callback_threadsafe(cb) From f6d7ba3a79379452c7a79f122b876992de016fb2 Mon Sep 17 00:00:00 2001 From: Stephen Thompson Date: Mon, 15 Dec 2025 17:30:51 +0000 Subject: [PATCH 17/35] Try single thread with a queue --- src/controller.py | 92 ++++++++++++++++++++++++++++------------------- src/db.py | 45 ++++++++--------------- 2 files changed, 69 insertions(+), 68 deletions(-) diff --git a/src/controller.py b/src/controller.py index 2a427f5..010c9ee 100644 --- a/src/controller.py +++ b/src/controller.py @@ -12,62 +12,81 @@ import pika import db as db # type:ignore import settings as settings # type:ignore +import csv_writer as writer # type:ignore +max_threads = 1 logging.basicConfig(format="%(levelname)s:%(asctime)s: %(message)s") logger = logging.getLogger(__name__) -emap_db = db.starDB() -emap_db.init_query() -emap_db.create_connection_pool() -worker_queue = queue.Queue(maxsize = 1) +worker_queue: queue.Queue = queue.Queue(maxsize=max_threads) -class waveform_message (): + +class waveform_message: def __init__(self, ch, delivery_tag, body): self.ch = ch self.delivery_tag = delivery_tag self.body = body + +def ack_message(ch, delivery_tag): + """Note that `ch` must be the same pika channel instance via which the + message being ACKed was retrieved (AMQP protocol constraint).""" + if ch.is_open: + ch.basic_ack(delivery_tag) + else: + logger.warning("Attempting to acknowledge a message on a closed channel.") + + +def nack_message(ch, delivery_tag): + if ch.is_open: + ch.basic_nack(delivery_tag) + else: + logger.warning("Attempting to not acknowledge a message on a closed channel.") + + def waveform_callback(): - message = worker_queue.get() - logger.warn(f"Got a message {message.delivery_tag}") - if message is not None: - data = json.loads(message.body) - location_string = data.get("mappedLocationString", "unknown") - observation_time = data.get("observationTime", "NaT") - observation_time = datetime.fromtimestamp(observation_time) - # I found in testing that to find the first patient I had to go back 7 months. I'm not sure this - # is expected, but I suppose an ICU patient could occupy a bed for a long time. Let's use - # 52 weeks for now. - start_time = observation_time - timedelta(weeks=52) - obs_time_str = observation_time.strftime("%Y-%m-%d:%H:%M:%S") - start_time_str = start_time.strftime("%Y-%m-%d:%H:%M:%S") - try: - logger.warn(f"Looking for mrn") - matched_mrn = emap_db.get_row(location_string, start_time_str, obs_time_str) - except ConnectionError: - cb = functools.partial(nack_message, message.ch, message.delivery_tag) - ch.connection.add_callback_threadsafe(cb) - return + emap_db = db.starDB() + emap_db.init_query() + emap_db.connect() + emap_db.create_connection_pool(5) + while True: + message = worker_queue.get() + if message is not None: + data = json.loads(message.body) + location_string = data.get("mappedLocationString", "unknown") + observation_time = data.get("observationTime", "NaT") + observation_time = datetime.fromtimestamp(observation_time) + # I found in testing that to find the first patient I had to go back 7 months. I'm not sure this + # is expected, but I suppose an ICU patient could occupy a bed for a long time. Let's use + # 52 weeks for now. + start_time = observation_time - timedelta(weeks=52) + obs_time_str = observation_time.strftime("%Y-%m-%d:%H:%M:%S") + start_time_str = start_time.strftime("%Y-%m-%d:%H:%M:%S") + try: + matched_mrn = emap_db.get_row( + location_string, start_time_str, obs_time_str + ) + except ConnectionError: + cb = functools.partial(nack_message, message.ch, message.delivery_tag) + message.ch.connection.add_callback_threadsafe(cb) + break if writer.write_frame(data, matched_mrn[2], matched_mrn[0]): - cb = functools.partial(ack_message, ch, delivery_tag) - ch.connection.add_callback_threadsafe(cb) + cb = functools.partial(ack_message, message.ch, message.delivery_tag) + message.ch.connection.add_callback_threadsafe(cb) worker_queue.task_done() - else: - logger.warning("empty message") - + else: + logger.warning("No message in queue.") -def on_message(ch, method_frame, _header_frame, body, args): - worker_queue = args - logger.warn("Got a message") +def on_message(ch, method_frame, _header_frame, body): wf_message = waveform_message(ch, method_frame.delivery_tag, body) if not worker_queue.full(): worker_queue.put(wf_message) else: - logger.warning("Working is queue is full.") + logger.warning("Working queue is full.") def receiver(): @@ -85,16 +104,15 @@ def receiver(): channel.basic_qos(prefetch_count=1) threads = [] - # I just want on thread, but in theory this should work for more + # I just want on thread, but in theory this should work for more worker_thread = threading.Thread(target=waveform_callback) worker_thread.start() threads.append(worker_thread) - on_message_callback = functools.partial(on_message, args=(worker_queue)) channel.basic_consume( queue=settings.RABBITMQ_QUEUE, auto_ack=False, - on_message_callback=on_message_callback, + on_message_callback=on_message, ) try: channel.start_consuming() diff --git a/src/db.py b/src/db.py index 58b0ca2..9df125a 100644 --- a/src/db.py +++ b/src/db.py @@ -1,33 +1,13 @@ import psycopg2 from psycopg2 import sql, pool -import json -from datetime import datetime, timedelta -import functools import logging import settings as settings # type:ignore -import csv_writer as writer # type:ignore logging.basicConfig(format="%(levelname)s:%(asctime)s: %(message)s") logger = logging.getLogger(__name__) -def ack_message(ch, delivery_tag): - """Note that `ch` must be the same pika channel instance via which the - message being ACKed was retrieved (AMQP protocol constraint).""" - if ch.is_open: - ch.basic_ack(delivery_tag) - else: - logger.warning("Attempting to acknowledge a message on a closed channel.") - - -def nack_message(ch, delivery_tag): - if ch.is_open: - ch.basic_nack(delivery_tag) - else: - logger.warning("Attempting to not acknowledge a message on a closed channel.") - - class starDB: sql_query: str = "" connection_string: str = "dbname={} user={} password={} host={} port={}".format( @@ -39,8 +19,15 @@ class starDB: ) connection_pool: pool.ThreadedConnectionPool - def create_connection_pool(self): - self.connection_pool = pool.ThreadedConnectionPool(2, 5, self.connection_string) + def create_connection_pool(self, max_connections): + # connection pool may not be necessary, but use a simplepool as this process + # will be single threaded. + self.connection_pool = pool.SimpleConnectionPool( + 1, max_connections, self.connection_string + ) + + def connect(self): + self.db_connection = psycopg2.connect(self.connection_string) def init_query(self): with open("src/sql/mrn_based_on_bed_and_datetime.sql", "r") as file: @@ -56,17 +43,13 @@ def get_row(self, location_string: str, start_datetime: str, end_datetime: str): "end_datetime": end_datetime, } try: - db_connection = self.connection_pool.getconn() - with db_connection.cursor() as curs: - logger.error("I get data") - curs.execute(self.sql_query, parameters) - logger.error("I got data") - single_row = curs.fetchone() - self.connection_pool.putconn(db_connection) + with self.connection_pool.getconn() as db_connection: + with db_connection.cursor() as curs: + curs.execute(self.sql_query, parameters) + single_row = curs.fetchone() except psycopg2.errors.UndefinedTable: - return ("null", "null", "null") - logger.error("Failed to find required tables in database.") self.connection_pool.putconn(db_connection) + logger.error("Failed to find required tables in database.") raise ConnectionError("There is no table in your data base") return single_row From 5a25da7e5fb12d0af5223b494b1f1858cf4d73c6 Mon Sep 17 00:00:00 2001 From: Stephen Thompson Date: Tue, 16 Dec 2025 11:20:15 +0000 Subject: [PATCH 18/35] Should only need one connection --- src/controller.py | 2 +- src/db.py | 15 ++++----------- waveform_controller.py | 4 ---- 3 files changed, 5 insertions(+), 16 deletions(-) delete mode 100644 waveform_controller.py diff --git a/src/controller.py b/src/controller.py index 010c9ee..4d40471 100644 --- a/src/controller.py +++ b/src/controller.py @@ -49,7 +49,6 @@ def waveform_callback(): emap_db = db.starDB() emap_db.init_query() emap_db.connect() - emap_db.create_connection_pool(5) while True: message = worker_queue.get() if message is not None: @@ -70,6 +69,7 @@ def waveform_callback(): except ConnectionError: cb = functools.partial(nack_message, message.ch, message.delivery_tag) message.ch.connection.add_callback_threadsafe(cb) + worker_queue.task_done() break if writer.write_frame(data, matched_mrn[2], matched_mrn[0]): diff --git a/src/db.py b/src/db.py index 9df125a..69b5a9f 100644 --- a/src/db.py +++ b/src/db.py @@ -19,15 +19,8 @@ class starDB: ) connection_pool: pool.ThreadedConnectionPool - def create_connection_pool(self, max_connections): - # connection pool may not be necessary, but use a simplepool as this process - # will be single threaded. - self.connection_pool = pool.SimpleConnectionPool( - 1, max_connections, self.connection_string - ) - def connect(self): - self.db_connection = psycopg2.connect(self.connection_string) + self.connection_pool = pool.ThreadedConnectionPool(1, 1, self.connection_string) def init_query(self): with open("src/sql/mrn_based_on_bed_and_datetime.sql", "r") as file: @@ -47,9 +40,9 @@ def get_row(self, location_string: str, start_datetime: str, end_datetime: str): with db_connection.cursor() as curs: curs.execute(self.sql_query, parameters) single_row = curs.fetchone() - except psycopg2.errors.UndefinedTable: + except psycopg2.errors.UndefinedTable as e: self.connection_pool.putconn(db_connection) - logger.error("Failed to find required tables in database.") - raise ConnectionError("There is no table in your data base") + logger.error(f"Failed to find required tables in database: {e}") + raise ConnectionError("Missing tables in database.") return single_row diff --git a/waveform_controller.py b/waveform_controller.py deleted file mode 100644 index 5b5eacb..0000000 --- a/waveform_controller.py +++ /dev/null @@ -1,4 +0,0 @@ -import waveform_controller.controller as controller - -if __name__ == "__main__": - controller.receiver() From 9a8be8ba9214c95bda1e895d32a5dd086995f8ae Mon Sep 17 00:00:00 2001 From: Stephen Thompson Date: Tue, 16 Dec 2025 13:14:48 +0000 Subject: [PATCH 19/35] Simplified SQL and improved error handling --- src/controller.py | 23 +++++++++++------------ src/db.py | 17 ++++++++++------- src/sql/mrn_based_on_bed_and_datetime.sql | 6 ++---- 3 files changed, 23 insertions(+), 23 deletions(-) diff --git a/src/controller.py b/src/controller.py index 4d40471..6c151ac 100644 --- a/src/controller.py +++ b/src/controller.py @@ -5,7 +5,7 @@ import functools import json -from datetime import datetime, timedelta +from datetime import datetime import threading import queue import logging @@ -56,21 +56,20 @@ def waveform_callback(): location_string = data.get("mappedLocationString", "unknown") observation_time = data.get("observationTime", "NaT") observation_time = datetime.fromtimestamp(observation_time) - # I found in testing that to find the first patient I had to go back 7 months. I'm not sure this - # is expected, but I suppose an ICU patient could occupy a bed for a long time. Let's use - # 52 weeks for now. - start_time = observation_time - timedelta(weeks=52) - obs_time_str = observation_time.strftime("%Y-%m-%d:%H:%M:%S") - start_time_str = start_time.strftime("%Y-%m-%d:%H:%M:%S") try: - matched_mrn = emap_db.get_row( - location_string, start_time_str, obs_time_str - ) - except ConnectionError: + matched_mrn = emap_db.get_row(location_string, observation_time) + except ConnectionError as e: cb = functools.partial(nack_message, message.ch, message.delivery_tag) message.ch.connection.add_callback_threadsafe(cb) + logger.error(f"Failed to find required tables in database: {e}") worker_queue.task_done() - break + continue + except ValueError as e: + cb = functools.partial(nack_message, message.ch, message.delivery_tag) + message.ch.connection.add_callback_threadsafe(cb) + logger.error(f"Ambiguous or non existent match: {e}") + worker_queue.task_done() + continue if writer.write_frame(data, matched_mrn[2], matched_mrn[0]): cb = functools.partial(ack_message, message.ch, message.delivery_tag) diff --git a/src/db.py b/src/db.py index 69b5a9f..7e03d6f 100644 --- a/src/db.py +++ b/src/db.py @@ -1,3 +1,4 @@ +from datetime import datetime import psycopg2 from psycopg2 import sql, pool import logging @@ -29,20 +30,22 @@ def init_query(self): schema_name=sql.Identifier(settings.SCHEMA_NAME) ) - def get_row(self, location_string: str, start_datetime: str, end_datetime: str): + def get_row(self, location_string: str, observation_datetime: datetime): parameters = { "location_string": location_string, - "start_datetime": start_datetime, - "end_datetime": end_datetime, + "observation_datetime": observation_datetime, } try: with self.connection_pool.getconn() as db_connection: with db_connection.cursor() as curs: curs.execute(self.sql_query, parameters) - single_row = curs.fetchone() + rows = curs.fetchall() + if len(rows) != 1: + raise ValueError( + f"Wrong number of rows returned from database. {len(rows)} != 1, for {location_string}:{observation_datetime}" + ) except psycopg2.errors.UndefinedTable as e: self.connection_pool.putconn(db_connection) - logger.error(f"Failed to find required tables in database: {e}") - raise ConnectionError("Missing tables in database.") + raise ConnectionError(f"Missing tables in database: {e}") - return single_row + return rows diff --git a/src/sql/mrn_based_on_bed_and_datetime.sql b/src/sql/mrn_based_on_bed_and_datetime.sql index ff76e8f..e473be1 100644 --- a/src/sql/mrn_based_on_bed_and_datetime.sql +++ b/src/sql/mrn_based_on_bed_and_datetime.sql @@ -14,7 +14,5 @@ INNER JOIN {schema_name}.location_visit lv INNER JOIN {schema_name}.location loc ON lv.location_id = loc.location_id WHERE loc.location_string = %(location_string)s - AND lv.admission_datetime BETWEEN %(start_datetime)s AND %(end_datetime)s - AND ( lv.discharge_datetime > %(end_datetime)s OR lv.discharge_datetime is NULL ) -ORDER by lv.admission_datetime DESC -LIMIT 1 + AND lv.admission_datetime <= %(observation_datetime)s + AND ( lv.discharge_datetime >= %(observation_datetime)s OR lv.discharge_datetime IS NULL ) From 7dbb0dd77c5f12b8f00c5bf20ecb35c7ce429cc6 Mon Sep 17 00:00:00 2001 From: Stephen Thompson Date: Tue, 16 Dec 2025 13:22:51 +0000 Subject: [PATCH 20/35] pytest is a dev dependency --- pyproject.toml | 4 +++- tests/test_file_writer.py | 2 +- uv.lock | 7 ++++++- 3 files changed, 10 insertions(+), 3 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index ab7df2f..24d837c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -8,8 +8,10 @@ dependencies = [ "pika>=1.3.2", "pre-commit>=4.5.0", "psycopg2-binary>=2.9.11", - "pytest>=9.0.2", ] +[project.optional-dependencies] +dev = ["pytest>=9.0.2"] + [project.scripts] emap-extract-waveform = "controller:receiver" diff --git a/tests/test_file_writer.py b/tests/test_file_writer.py index b1ccf2c..112fda7 100644 --- a/tests/test_file_writer.py +++ b/tests/test_file_writer.py @@ -1,5 +1,5 @@ import pytest -from waveform_controller.csv_writer import create_file_name +from src.csv_writer import create_file_name from datetime import datetime, timezone diff --git a/uv.lock b/uv.lock index 922bbd2..37c1376 100644 --- a/uv.lock +++ b/uv.lock @@ -271,6 +271,10 @@ dependencies = [ { name = "pika" }, { name = "pre-commit" }, { name = "psycopg2-binary" }, +] + +[package.optional-dependencies] +dev = [ { name = "pytest" }, ] @@ -279,5 +283,6 @@ requires-dist = [ { name = "pika", specifier = ">=1.3.2" }, { name = "pre-commit", specifier = ">=4.5.0" }, { name = "psycopg2-binary", specifier = ">=2.9.11" }, - { name = "pytest", specifier = ">=9.0.2" }, + { name = "pytest", marker = "extra == 'dev'", specifier = ">=9.0.2" }, ] +provides-extras = ["dev"] From e1e48b884aad69a87f1da7a9adbc4c57a931f036 Mon Sep 17 00:00:00 2001 From: Stephen Thompson Date: Tue, 16 Dec 2025 13:56:15 +0000 Subject: [PATCH 21/35] Log error if missing data in waveform message --- src/controller.py | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/src/controller.py b/src/controller.py index 6c151ac..74fffe8 100644 --- a/src/controller.py +++ b/src/controller.py @@ -53,8 +53,16 @@ def waveform_callback(): message = worker_queue.get() if message is not None: data = json.loads(message.body) - location_string = data.get("mappedLocationString", "unknown") - observation_time = data.get("observationTime", "NaT") + try: + location_string = data["mappedLocationString"] + observation_time = data["observationTime"] + except IndexError as e: + cb = functools.partial(nack_message, message.ch, message.delivery_tag) + message.ch.connection.add_callback_threadsafe(cb) + logger.error(f"Waveform message is missing required data {e}") + worker_queue.task_done() + continue + observation_time = datetime.fromtimestamp(observation_time) try: matched_mrn = emap_db.get_row(location_string, observation_time) From b1b6f7e59b40e3f18651c9626f7d86461c8a2836 Mon Sep 17 00:00:00 2001 From: Stephen Thompson Date: Tue, 16 Dec 2025 15:25:36 +0000 Subject: [PATCH 22/35] Kill the worker thread if there is a no database error --- src/controller.py | 6 ------ 1 file changed, 6 deletions(-) diff --git a/src/controller.py b/src/controller.py index 74fffe8..cb4588b 100644 --- a/src/controller.py +++ b/src/controller.py @@ -66,12 +66,6 @@ def waveform_callback(): observation_time = datetime.fromtimestamp(observation_time) try: matched_mrn = emap_db.get_row(location_string, observation_time) - except ConnectionError as e: - cb = functools.partial(nack_message, message.ch, message.delivery_tag) - message.ch.connection.add_callback_threadsafe(cb) - logger.error(f"Failed to find required tables in database: {e}") - worker_queue.task_done() - continue except ValueError as e: cb = functools.partial(nack_message, message.ch, message.delivery_tag) message.ch.connection.add_callback_threadsafe(cb) From 1d8695494613036c670501b4d850d9d673eefee0 Mon Sep 17 00:00:00 2001 From: Stephen Thompson Date: Tue, 16 Dec 2025 15:33:12 +0000 Subject: [PATCH 23/35] Check rows after we've left the connection context, so the we don't exhaust the connections. --- src/db.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/src/db.py b/src/db.py index 7e03d6f..bd0e9e4 100644 --- a/src/db.py +++ b/src/db.py @@ -40,12 +40,13 @@ def get_row(self, location_string: str, observation_datetime: datetime): with db_connection.cursor() as curs: curs.execute(self.sql_query, parameters) rows = curs.fetchall() - if len(rows) != 1: - raise ValueError( - f"Wrong number of rows returned from database. {len(rows)} != 1, for {location_string}:{observation_datetime}" - ) except psycopg2.errors.UndefinedTable as e: self.connection_pool.putconn(db_connection) raise ConnectionError(f"Missing tables in database: {e}") + if len(rows) != 1: + raise ValueError( + f"Wrong number of rows returned from database. {len(rows)} != 1, for {location_string}:{observation_datetime}" + ) + return rows From d4d48996dd4ce0af7ac7dade9e6cec2f05ed6136 Mon Sep 17 00:00:00 2001 From: Stephen Thompson Date: Tue, 16 Dec 2025 16:26:52 +0000 Subject: [PATCH 24/35] Context doesn't seem to handle putconn --- src/db.py | 1 + 1 file changed, 1 insertion(+) diff --git a/src/db.py b/src/db.py index bd0e9e4..cdc10cd 100644 --- a/src/db.py +++ b/src/db.py @@ -40,6 +40,7 @@ def get_row(self, location_string: str, observation_datetime: datetime): with db_connection.cursor() as curs: curs.execute(self.sql_query, parameters) rows = curs.fetchall() + self.connection_pool.putconn(db_connection) except psycopg2.errors.UndefinedTable as e: self.connection_pool.putconn(db_connection) raise ConnectionError(f"Missing tables in database: {e}") From 8fe2751b5f5c1005b6c7686680d36a70c0a984c5 Mon Sep 17 00:00:00 2001 From: Stephen Thompson Date: Tue, 16 Dec 2025 16:30:42 +0000 Subject: [PATCH 25/35] If we can't match patient remove message from queue but add to an umatched csn file so we don't lose data yet' --- src/controller.py | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/src/controller.py b/src/controller.py index cb4588b..5b7d8d2 100644 --- a/src/controller.py +++ b/src/controller.py @@ -38,9 +38,9 @@ def ack_message(ch, delivery_tag): logger.warning("Attempting to acknowledge a message on a closed channel.") -def nack_message(ch, delivery_tag): +def nack_message(ch, delivery_tag, requeue): if ch.is_open: - ch.basic_nack(delivery_tag) + ch.basic_nack(delivery_tag, requeue) else: logger.warning("Attempting to not acknowledge a message on a closed channel.") @@ -57,7 +57,9 @@ def waveform_callback(): location_string = data["mappedLocationString"] observation_time = data["observationTime"] except IndexError as e: - cb = functools.partial(nack_message, message.ch, message.delivery_tag) + cb = functools.partial( + nack_message, message.ch, message.delivery_tag, True + ) message.ch.connection.add_callback_threadsafe(cb) logger.error(f"Waveform message is missing required data {e}") worker_queue.task_done() @@ -67,11 +69,12 @@ def waveform_callback(): try: matched_mrn = emap_db.get_row(location_string, observation_time) except ValueError as e: - cb = functools.partial(nack_message, message.ch, message.delivery_tag) + cb = functools.partial( + nack_message, message.ch, message.delivery_tag, False + ) message.ch.connection.add_callback_threadsafe(cb) logger.error(f"Ambiguous or non existent match: {e}") - worker_queue.task_done() - continue + matched_mrn = ("unmatched_mrn", "unmatched_nhs", "unmatched_csn") if writer.write_frame(data, matched_mrn[2], matched_mrn[0]): cb = functools.partial(ack_message, message.ch, message.delivery_tag) From 28502f2bf5051741c4f8ab35930e90cbe57f1335 Mon Sep 17 00:00:00 2001 From: Stephen Thompson Date: Tue, 16 Dec 2025 17:38:22 +0000 Subject: [PATCH 26/35] Return as tuple --- src/db.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/db.py b/src/db.py index cdc10cd..396f442 100644 --- a/src/db.py +++ b/src/db.py @@ -50,4 +50,4 @@ def get_row(self, location_string: str, observation_datetime: datetime): f"Wrong number of rows returned from database. {len(rows)} != 1, for {location_string}:{observation_datetime}" ) - return rows + return rows[0] From ee2f9a55fd73e93a08564f12a99cecfd2936918f Mon Sep 17 00:00:00 2001 From: Stephen Thompson Date: Tue, 16 Dec 2025 17:45:24 +0000 Subject: [PATCH 27/35] Remove callbacks for ack and reject as this is now a single threaded process --- src/controller.py | 20 +++++++------------- 1 file changed, 7 insertions(+), 13 deletions(-) diff --git a/src/controller.py b/src/controller.py index 5b7d8d2..c2921dc 100644 --- a/src/controller.py +++ b/src/controller.py @@ -3,7 +3,6 @@ based on https://www.rabbitmq.com/tutorials/tutorial-one-python """ -import functools import json from datetime import datetime import threading @@ -38,9 +37,9 @@ def ack_message(ch, delivery_tag): logger.warning("Attempting to acknowledge a message on a closed channel.") -def nack_message(ch, delivery_tag, requeue): +def reject_message(ch, delivery_tag, requeue): if ch.is_open: - ch.basic_nack(delivery_tag, requeue) + ch.basic_reject(delivery_tag, requeue) else: logger.warning("Attempting to not acknowledge a message on a closed channel.") @@ -57,11 +56,10 @@ def waveform_callback(): location_string = data["mappedLocationString"] observation_time = data["observationTime"] except IndexError as e: - cb = functools.partial( - nack_message, message.ch, message.delivery_tag, True + reject_message(message.ch, message.delivery_tag, False) + logger.error( + f"Waveform message {message.delivery_tag} is missing required data {e}." ) - message.ch.connection.add_callback_threadsafe(cb) - logger.error(f"Waveform message is missing required data {e}") worker_queue.task_done() continue @@ -69,16 +67,12 @@ def waveform_callback(): try: matched_mrn = emap_db.get_row(location_string, observation_time) except ValueError as e: - cb = functools.partial( - nack_message, message.ch, message.delivery_tag, False - ) - message.ch.connection.add_callback_threadsafe(cb) + reject_message(message.ch, message.delivery_tag, False) logger.error(f"Ambiguous or non existent match: {e}") matched_mrn = ("unmatched_mrn", "unmatched_nhs", "unmatched_csn") if writer.write_frame(data, matched_mrn[2], matched_mrn[0]): - cb = functools.partial(ack_message, message.ch, message.delivery_tag) - message.ch.connection.add_callback_threadsafe(cb) + ack_message(message.ch, message.delivery_tag) worker_queue.task_done() else: From 268bd77288524fa5df5c75c86e83e2b8425234e4 Mon Sep 17 00:00:00 2001 From: Stephen Thompson Date: Tue, 16 Dec 2025 19:07:28 +0000 Subject: [PATCH 28/35] Avoid double message acknol=wlement --- src/controller.py | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/src/controller.py b/src/controller.py index c2921dc..3c362f5 100644 --- a/src/controller.py +++ b/src/controller.py @@ -13,7 +13,7 @@ import settings as settings # type:ignore import csv_writer as writer # type:ignore -max_threads = 1 +max_threads = 1 # this needs to stay at 1 as pika is not thread safe. logging.basicConfig(format="%(levelname)s:%(asctime)s: %(message)s") logger = logging.getLogger(__name__) @@ -64,15 +64,19 @@ def waveform_callback(): continue observation_time = datetime.fromtimestamp(observation_time) + lookup_success = True try: matched_mrn = emap_db.get_row(location_string, observation_time) except ValueError as e: - reject_message(message.ch, message.delivery_tag, False) + lookup_success = False logger.error(f"Ambiguous or non existent match: {e}") matched_mrn = ("unmatched_mrn", "unmatched_nhs", "unmatched_csn") if writer.write_frame(data, matched_mrn[2], matched_mrn[0]): - ack_message(message.ch, message.delivery_tag) + if lookup_success: + ack_message(message.ch, message.delivery_tag) + else: + reject_message(message.ch, message.delivery_tag, False) worker_queue.task_done() else: @@ -85,6 +89,7 @@ def on_message(ch, method_frame, _header_frame, body): worker_queue.put(wf_message) else: logger.warning("Working queue is full.") + reject_message(ch, method_frame.delivery_tag, True) def receiver(): From 8e8785a8aa1dd58f22f450cddae2826a54bb021e Mon Sep 17 00:00:00 2001 From: Stephen Thompson Date: Tue, 16 Dec 2025 21:05:11 +0000 Subject: [PATCH 29/35] try without threading --- src/controller.py | 89 +++++++++++++++-------------------------------- 1 file changed, 29 insertions(+), 60 deletions(-) diff --git a/src/controller.py b/src/controller.py index 3c362f5..598da1d 100644 --- a/src/controller.py +++ b/src/controller.py @@ -5,20 +5,19 @@ import json from datetime import datetime -import threading -import queue import logging import pika import db as db # type:ignore import settings as settings # type:ignore import csv_writer as writer # type:ignore -max_threads = 1 # this needs to stay at 1 as pika is not thread safe. logging.basicConfig(format="%(levelname)s:%(asctime)s: %(message)s") logger = logging.getLogger(__name__) -worker_queue: queue.Queue = queue.Queue(maxsize=max_threads) +emap_db = db.starDB() +emap_db.init_query() +emap_db.connect() class waveform_message: @@ -44,52 +43,32 @@ def reject_message(ch, delivery_tag, requeue): logger.warning("Attempting to not acknowledge a message on a closed channel.") -def waveform_callback(): - emap_db = db.starDB() - emap_db.init_query() - emap_db.connect() - while True: - message = worker_queue.get() - if message is not None: - data = json.loads(message.body) - try: - location_string = data["mappedLocationString"] - observation_time = data["observationTime"] - except IndexError as e: - reject_message(message.ch, message.delivery_tag, False) - logger.error( - f"Waveform message {message.delivery_tag} is missing required data {e}." - ) - worker_queue.task_done() - continue - - observation_time = datetime.fromtimestamp(observation_time) - lookup_success = True - try: - matched_mrn = emap_db.get_row(location_string, observation_time) - except ValueError as e: - lookup_success = False - logger.error(f"Ambiguous or non existent match: {e}") - matched_mrn = ("unmatched_mrn", "unmatched_nhs", "unmatched_csn") - - if writer.write_frame(data, matched_mrn[2], matched_mrn[0]): - if lookup_success: - ack_message(message.ch, message.delivery_tag) - else: - reject_message(message.ch, message.delivery_tag, False) - - worker_queue.task_done() +def waveform_callback(ch, method_frame, _header_frame, body): + data = json.loads(body) + try: + location_string = data["mappedLocationString"] + observation_time = data["observationTime"] + except IndexError as e: + reject_message(ch, method_frame.delivery_tag, False) + logger.error( + f"Waveform message {method_frame.delivery_tag} is missing required data {e}." + ) + return + + observation_time = datetime.fromtimestamp(observation_time) + lookup_success = True + try: + matched_mrn = emap_db.get_row(location_string, observation_time) + except ValueError as e: + lookup_success = False + logger.error(f"Ambiguous or non existent match: {e}") + matched_mrn = ("unmatched_mrn", "unmatched_nhs", "unmatched_csn") + + if writer.write_frame(data, matched_mrn[2], matched_mrn[0]): + if lookup_success: + ack_message(ch, method_frame.delivery_tag) else: - logger.warning("No message in queue.") - - -def on_message(ch, method_frame, _header_frame, body): - wf_message = waveform_message(ch, method_frame.delivery_tag, body) - if not worker_queue.full(): - worker_queue.put(wf_message) - else: - logger.warning("Working queue is full.") - reject_message(ch, method_frame.delivery_tag, True) + reject_message(ch, method_frame.delivery_tag, False) def receiver(): @@ -106,24 +85,14 @@ def receiver(): channel = connection.channel() channel.basic_qos(prefetch_count=1) - threads = [] - # I just want on thread, but in theory this should work for more - worker_thread = threading.Thread(target=waveform_callback) - worker_thread.start() - threads.append(worker_thread) - channel.basic_consume( queue=settings.RABBITMQ_QUEUE, auto_ack=False, - on_message_callback=on_message, + on_message_callback=waveform_callback, ) try: channel.start_consuming() except KeyboardInterrupt: channel.stop_consuming() - # Wait for all to complete - for thread in threads: - thread.join() - connection.close() From c77835087641828ba2321e5aa013efa8d1554c94 Mon Sep 17 00:00:00 2001 From: Stephen Thompson Date: Wed, 17 Dec 2025 09:52:12 +0000 Subject: [PATCH 30/35] Add db connection and statement time outs and handle any resulting errors --- settings.env.EXAMPLE | 2 ++ src/controller.py | 4 ++++ src/db.py | 10 ++++++---- src/settings.py | 2 ++ 4 files changed, 14 insertions(+), 4 deletions(-) diff --git a/settings.env.EXAMPLE b/settings.env.EXAMPLE index 35027b6..bd1964a 100644 --- a/settings.env.EXAMPLE +++ b/settings.env.EXAMPLE @@ -5,6 +5,8 @@ UDS_USERNAME="inform_user" UDS_PASSWORD="inform" UDS_HOST="localhost" UDS_PORT="5433" +UDS_CONNECT_TIMEOUT="10" # in seconds +UDS_QUERY_TIMEOUT="3000" # in milliseconds SCHEMA_NAME="star_dev" RABBITMQ_USERNAME="my_name" RABBITMQ_PASSWORD="my_pw" diff --git a/src/controller.py b/src/controller.py index 598da1d..a3ba219 100644 --- a/src/controller.py +++ b/src/controller.py @@ -63,6 +63,10 @@ def waveform_callback(ch, method_frame, _header_frame, body): lookup_success = False logger.error(f"Ambiguous or non existent match: {e}") matched_mrn = ("unmatched_mrn", "unmatched_nhs", "unmatched_csn") + except ConnectionError as e: + logger.error(f"Database error, will try again: {e}") + reject_message(ch, method_frame.delivery_tag, True) + return if writer.write_frame(data, matched_mrn[2], matched_mrn[0]): if lookup_success: diff --git a/src/db.py b/src/db.py index 396f442..ebab871 100644 --- a/src/db.py +++ b/src/db.py @@ -11,17 +11,19 @@ class starDB: sql_query: str = "" - connection_string: str = "dbname={} user={} password={} host={} port={}".format( + connection_string: str = "dbname={} user={} password={} host={} port={} connect_timeout={} options='-c statement_timeout={}'".format( settings.UDS_DBNAME, # type:ignore settings.UDS_USERNAME, # type:ignore settings.UDS_PASSWORD, # type:ignore settings.UDS_HOST, # type:ignore settings.UDS_PORT, # type:ignore + settings.UDS_CONNECT_TIMEOUT, # type:ignore + settings.UDS_QUERY_TIMEOUT, # type:ignore ) connection_pool: pool.ThreadedConnectionPool def connect(self): - self.connection_pool = pool.ThreadedConnectionPool(1, 1, self.connection_string) + self.connection_pool = pool.SimpleConnectionPool(1, 1, self.connection_string) def init_query(self): with open("src/sql/mrn_based_on_bed_and_datetime.sql", "r") as file: @@ -41,9 +43,9 @@ def get_row(self, location_string: str, observation_datetime: datetime): curs.execute(self.sql_query, parameters) rows = curs.fetchall() self.connection_pool.putconn(db_connection) - except psycopg2.errors.UndefinedTable as e: + except psycopg2.errors.OperationalError as e: self.connection_pool.putconn(db_connection) - raise ConnectionError(f"Missing tables in database: {e}") + raise ConnectionError(f"Data base error: {e}") if len(rows) != 1: raise ValueError( diff --git a/src/settings.py b/src/settings.py index 2ec92c9..c452f99 100644 --- a/src/settings.py +++ b/src/settings.py @@ -13,6 +13,8 @@ def get_from_env(env_var, setting_name=None): get_from_env("UDS_PASSWORD") get_from_env("UDS_HOST") get_from_env("UDS_PORT") +get_from_env("UDS_CONNECT_TIMEOUT") +get_from_env("UDS_QUERY_TIMEOUT") get_from_env("SCHEMA_NAME") get_from_env("RABBITMQ_USERNAME") get_from_env("RABBITMQ_PASSWORD") From b4059ae615d2745add20e34141424b8cc7561000 Mon Sep 17 00:00:00 2001 From: Stephen Thompson Date: Wed, 17 Dec 2025 16:36:40 +0000 Subject: [PATCH 31/35] Fix variable name. Co-authored-by: Jeremy Stein --- tests/test_file_writer.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/test_file_writer.py b/tests/test_file_writer.py index 112fda7..460d14f 100644 --- a/tests/test_file_writer.py +++ b/tests/test_file_writer.py @@ -12,11 +12,11 @@ ], ) def test_create_file_name_handles_units(units, expected_filename, tmp_path): - sourceSystem = "11" + sourceStreamId = "11" observationTime = datetime(2025, 1, 1, tzinfo=timezone.utc) csn = "12345678" - filename = create_file_name(sourceSystem, observationTime, csn, units) + filename = create_file_name(sourceStreamId, observationTime, csn, units) assert filename == expected_filename From 45e56637d1f06c8d52eda9465a83195bd28f2600 Mon Sep 17 00:00:00 2001 From: Stephen Thompson Date: Wed, 17 Dec 2025 16:48:33 +0000 Subject: [PATCH 32/35] Specify timezone in datetime conversion. Co-authored-by: Jeremy Stein --- src/controller.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/controller.py b/src/controller.py index a3ba219..30a2071 100644 --- a/src/controller.py +++ b/src/controller.py @@ -55,7 +55,7 @@ def waveform_callback(ch, method_frame, _header_frame, body): ) return - observation_time = datetime.fromtimestamp(observation_time) + observation_time = datetime.fromtimestamp(observation_time, tz=timezone.utc) lookup_success = True try: matched_mrn = emap_db.get_row(location_string, observation_time) From 31a88b170ff1d103b78f7ff4d9cb281fe1d3ba27 Mon Sep 17 00:00:00 2001 From: Stephen Thompson Date: Thu, 18 Dec 2025 08:24:12 +0000 Subject: [PATCH 33/35] Impor timezone Co-authored-by: Jeremy Stein --- src/controller.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/controller.py b/src/controller.py index 30a2071..d79b2ff 100644 --- a/src/controller.py +++ b/src/controller.py @@ -4,7 +4,7 @@ """ import json -from datetime import datetime +from datetime import datetime, timezone import logging import pika import db as db # type:ignore From f49f850a63146e68d343970a06da4b080b928038 Mon Sep 17 00:00:00 2001 From: Stephen Thompson Date: Thu, 18 Dec 2025 08:25:02 +0000 Subject: [PATCH 34/35] Improved test Co-authored-by: Jeremy Stein --- tests/test_file_writer.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_file_writer.py b/tests/test_file_writer.py index 460d14f..1d15b51 100644 --- a/tests/test_file_writer.py +++ b/tests/test_file_writer.py @@ -13,7 +13,7 @@ ) def test_create_file_name_handles_units(units, expected_filename, tmp_path): sourceStreamId = "11" - observationTime = datetime(2025, 1, 1, tzinfo=timezone.utc) + observationTime = datetime(2025, 1, 1, 10, 10, 10, tzinfo=timezone.utc) csn = "12345678" filename = create_file_name(sourceStreamId, observationTime, csn, units) From 5f396d755bae9e44a748295989769a5386431542 Mon Sep 17 00:00:00 2001 From: Stephen Thompson Date: Thu, 18 Dec 2025 09:15:25 +0000 Subject: [PATCH 35/35] Add check for required data in waveform message and log and reject if not present --- src/controller.py | 20 +++++++++++++++++--- src/csv_writer.py | 44 ++++++++++++++++++++++---------------------- 2 files changed, 39 insertions(+), 25 deletions(-) diff --git a/src/controller.py b/src/controller.py index d79b2ff..15a7e60 100644 --- a/src/controller.py +++ b/src/controller.py @@ -47,7 +47,12 @@ def waveform_callback(ch, method_frame, _header_frame, body): data = json.loads(body) try: location_string = data["mappedLocationString"] - observation_time = data["observationTime"] + observation_timestamp = data["observationTime"] + source_stream_id = data["sourceStreamId"] + sampling_rate = data["samplingRate"] + units = data["unit"] + waveform_data = data["numericValues"] + mapped_location_string = data["mappedLocationString"] except IndexError as e: reject_message(ch, method_frame.delivery_tag, False) logger.error( @@ -55,7 +60,7 @@ def waveform_callback(ch, method_frame, _header_frame, body): ) return - observation_time = datetime.fromtimestamp(observation_time, tz=timezone.utc) + observation_time = datetime.fromtimestamp(observation_timestamp, tz=timezone.utc) lookup_success = True try: matched_mrn = emap_db.get_row(location_string, observation_time) @@ -68,7 +73,16 @@ def waveform_callback(ch, method_frame, _header_frame, body): reject_message(ch, method_frame.delivery_tag, True) return - if writer.write_frame(data, matched_mrn[2], matched_mrn[0]): + if writer.write_frame( + waveform_data, + source_stream_id, + observation_timestamp, + units, + sampling_rate, + mapped_location_string, + matched_mrn[2], + matched_mrn[0], + ): if lookup_success: ack_message(ch, method_frame.delivery_tag) else: diff --git a/src/csv_writer.py b/src/csv_writer.py index d99a9e1..ff9942a 100644 --- a/src/csv_writer.py +++ b/src/csv_writer.py @@ -6,52 +6,52 @@ def create_file_name( - sourceStreamId: str, observationTime: datetime, csn: str, units: str + source_stream_id: str, observation_time: datetime, csn: str, units: str ) -> str: """Create a unique file name based on the patient contact serial number (csn) the date, and the source system.""" - datestring = observationTime.strftime("%Y-%m-%d") + datestring = observation_time.strftime("%Y-%m-%d") units = units.replace("/", "p") units = units.replace("%", "percent") - return f"{datestring}.{csn}.{sourceStreamId}.{units}.csv" - - -def write_frame(waveform_message: dict, csn: str, mrn: str) -> bool: + return f"{datestring}.{csn}.{source_stream_id}.{units}.csv" + + +def write_frame( + waveform_data: dict, + source_stream_id: str, + observation_timestamp: float, + units: str, + sampling_rate: int, + mapped_location_string: str, + csn: str, + mrn: str, +) -> bool: """Appends a frame of waveform data to a csv file (creates file if it doesn't exist. :return: True if write was successful. """ - sourceStreamId = waveform_message.get("sourceStreamId", None) - observationTime = waveform_message.get("observationTime", False) - - if not observationTime: - raise ValueError("waveform_message is missing observationTime") - - observation_datetime = datetime.fromtimestamp(observationTime) - units = waveform_message.get("unit", "") + observation_datetime = datetime.fromtimestamp(observation_timestamp) out_path = "waveform-export/" Path(out_path).mkdir(exist_ok=True) filename = out_path + create_file_name( - sourceStreamId, observation_datetime, csn, units + source_stream_id, observation_datetime, csn, units ) with open(filename, "a") as fileout: wv_writer = csv.writer(fileout, delimiter=",") - waveform_data = waveform_message.get("numericValues", "") - if waveform_data != "": - waveform_data = waveform_data.get("value", "") + waveform_data = waveform_data.get("value", "") wv_writer.writerow( [ csn, mrn, - sourceStreamId, + source_stream_id, units, - waveform_message.get("samplingRate", ""), - observationTime, - waveform_message.get("mappedLocationString", ""), + sampling_rate, + observation_timestamp, + mapped_location_string, waveform_data, ] )