Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
863ccd9
A threaded implemented with blocking connection
thompson318 Dec 4, 2025
9ca869e
Use source stream ID for file name, sourceSystem is not very interesting
thompson318 Dec 8, 2025
72f3293
Add the sourceSystemId to the csv
thompson318 Dec 8, 2025
3416fa8
Update query to use location visit admission times
thompson318 Dec 8, 2025
5cb051b
Check location discharge datetime too
thompson318 Dec 8, 2025
40f2f49
Added failing test
thompson318 Dec 9, 2025
41d9dce
Added ci
thompson318 Dec 9, 2025
b4b5b20
Fix with string substition
thompson318 Dec 9, 2025
702d818
Also change % to percent
thompson318 Dec 9, 2025
e2d88ea
With uv
thompson318 Dec 9, 2025
338af4e
Tidy workflow
thompson318 Dec 9, 2025
7b81b05
Tidied up module layout. Implemented a connection pool to avoid creat…
thompson318 Dec 11, 2025
8ec74b8
Added location string to output to help with debugging. config for my…
thompson318 Dec 11, 2025
bd835ab
Merge branch '17-file-writer-fix' into 16-blocking_with_threading
thompson318 Dec 11, 2025
45d542a
Tidied variable names
thompson318 Dec 11, 2025
f4c8e2d
Handle thread start runtime error
thompson318 Dec 15, 2025
e98b7ea
Rough queue
thompson318 Dec 15, 2025
f6d7ba3
Try single thread with a queue
thompson318 Dec 15, 2025
5a25da7
Should only need one connection
thompson318 Dec 16, 2025
9a8be8b
Simplified SQL and improved error handling
thompson318 Dec 16, 2025
7dbb0dd
pytest is a dev dependency
thompson318 Dec 16, 2025
e1e48b8
Log error if missing data in waveform message
thompson318 Dec 16, 2025
b1b6f7e
Kill the worker thread if there is a no database error
thompson318 Dec 16, 2025
1d86954
Check rows after we've left the connection context, so the we don't e…
thompson318 Dec 16, 2025
d4d4899
Context doesn't seem to handle putconn
thompson318 Dec 16, 2025
8fe2751
If we can't match patient remove message from queue but add to an uma…
thompson318 Dec 16, 2025
28502f2
Return as tuple
thompson318 Dec 16, 2025
ee2f9a5
Remove callbacks for ack and reject as this is now a single threaded …
thompson318 Dec 16, 2025
268bd77
Avoid double message acknol=wlement
thompson318 Dec 16, 2025
8e8785a
try without threading
thompson318 Dec 16, 2025
c778350
Add db connection and statement time outs and handle any resulting er…
thompson318 Dec 17, 2025
b4059ae
Fix variable name.
thompson318 Dec 17, 2025
45e5663
Specify timezone in datetime conversion.
thompson318 Dec 17, 2025
31a88b1
Impor timezone
thompson318 Dec 18, 2025
f49f850
Improved test
thompson318 Dec 18, 2025
5f396d7
Add check for required data in waveform message and log and reject if…
thompson318 Dec 18, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions .github/workflows/pytest.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
name: Run pytest

on:
push:
branches: [main, dev]
pull_request:
branches: [main, dev]
types: ["opened", "reopened", "synchronize", "ready_for_review", "draft"]
workflow_dispatch:

jobs:
build:
name: Run pytest
runs-on: ubuntu-latest

steps:
- uses: actions/checkout@v5
- name: Install uv
uses: astral-sh/setup-uv@v7

- name: Install dependencies
run: uv sync --locked --all-extras --dev

- name: Run the tests
run: uv run pytest tests
4 changes: 2 additions & 2 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ repos:
- id: ruff
args:
- --fix
- waveform_controller
- ./
- id: ruff-format
# Type-checking python code.
- repo: https://github.com/pre-commit/mirrors-mypy
Expand All @@ -23,7 +23,7 @@ repos:
"types-psycopg2",
"types-pika"
]
files: waveform_controller/
files: src/
# ----------
# Formats docstrings to comply with PEP257
- repo: https://github.com/PyCQA/docformatter
Expand Down
5 changes: 4 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,8 @@ dependencies = [
"psycopg2-binary>=2.9.11",
]

[project.optional-dependencies]
dev = ["pytest>=9.0.2"]

[project.scripts]
emap-extract-waveform = "waveform_controller.controller:receiver"
emap-extract-waveform = "controller:receiver"
2 changes: 2 additions & 0 deletions settings.env.EXAMPLE
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ UDS_USERNAME="inform_user"
UDS_PASSWORD="inform"
UDS_HOST="localhost"
UDS_PORT="5433"
UDS_CONNECT_TIMEOUT="10" # in seconds
UDS_QUERY_TIMEOUT="3000" # in milliseconds
SCHEMA_NAME="star_dev"
RABBITMQ_USERNAME="my_name"
RABBITMQ_PASSWORD="my_pw"
Expand Down
File renamed without changes.
116 changes: 116 additions & 0 deletions src/controller.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
"""
A script to receive messages in the waveform queue and write them to stdout,
based on https://www.rabbitmq.com/tutorials/tutorial-one-python
"""

import json
from datetime import datetime, timezone
import logging
import pika
import db as db # type:ignore
import settings as settings # type:ignore
import csv_writer as writer # type:ignore

logging.basicConfig(format="%(levelname)s:%(asctime)s: %(message)s")
logger = logging.getLogger(__name__)


emap_db = db.starDB()
emap_db.init_query()
emap_db.connect()


class waveform_message:
def __init__(self, ch, delivery_tag, body):
self.ch = ch
self.delivery_tag = delivery_tag
self.body = body


def ack_message(ch, delivery_tag):
"""Note that `ch` must be the same pika channel instance via which the
message being ACKed was retrieved (AMQP protocol constraint)."""
if ch.is_open:
ch.basic_ack(delivery_tag)
else:
logger.warning("Attempting to acknowledge a message on a closed channel.")


def reject_message(ch, delivery_tag, requeue):
if ch.is_open:
ch.basic_reject(delivery_tag, requeue)
else:
logger.warning("Attempting to not acknowledge a message on a closed channel.")


def waveform_callback(ch, method_frame, _header_frame, body):
data = json.loads(body)
try:
location_string = data["mappedLocationString"]
observation_timestamp = data["observationTime"]
source_stream_id = data["sourceStreamId"]
sampling_rate = data["samplingRate"]
units = data["unit"]
waveform_data = data["numericValues"]
mapped_location_string = data["mappedLocationString"]
except IndexError as e:
reject_message(ch, method_frame.delivery_tag, False)
logger.error(
f"Waveform message {method_frame.delivery_tag} is missing required data {e}."
)
return

observation_time = datetime.fromtimestamp(observation_timestamp, tz=timezone.utc)
lookup_success = True
try:
matched_mrn = emap_db.get_row(location_string, observation_time)
except ValueError as e:
lookup_success = False
logger.error(f"Ambiguous or non existent match: {e}")
matched_mrn = ("unmatched_mrn", "unmatched_nhs", "unmatched_csn")
except ConnectionError as e:
logger.error(f"Database error, will try again: {e}")
reject_message(ch, method_frame.delivery_tag, True)
return

if writer.write_frame(
waveform_data,
source_stream_id,
observation_timestamp,
units,
sampling_rate,
mapped_location_string,
matched_mrn[2],
matched_mrn[0],
):
if lookup_success:
ack_message(ch, method_frame.delivery_tag)
else:
reject_message(ch, method_frame.delivery_tag, False)


def receiver():
# set up database connection
rabbitmq_credentials = pika.PlainCredentials(
username=settings.RABBITMQ_USERNAME, password=settings.RABBITMQ_PASSWORD
)
connection_parameters = pika.ConnectionParameters(
credentials=rabbitmq_credentials,
host=settings.RABBITMQ_HOST,
port=settings.RABBITMQ_PORT,
)
connection = pika.BlockingConnection(connection_parameters)
channel = connection.channel()
channel.basic_qos(prefetch_count=1)

channel.basic_consume(
queue=settings.RABBITMQ_QUEUE,
auto_ack=False,
on_message_callback=waveform_callback,
)
try:
channel.start_consuming()
except KeyboardInterrupt:
channel.stop_consuming()

connection.close()
59 changes: 59 additions & 0 deletions src/csv_writer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
"""Writes a frame of waveform data to a csv file."""

import csv
from datetime import datetime
from pathlib import Path


def create_file_name(
source_stream_id: str, observation_time: datetime, csn: str, units: str
) -> str:
"""Create a unique file name based on the patient contact serial number
(csn) the date, and the source system."""
datestring = observation_time.strftime("%Y-%m-%d")
units = units.replace("/", "p")
units = units.replace("%", "percent")
return f"{datestring}.{csn}.{source_stream_id}.{units}.csv"


def write_frame(
waveform_data: dict,
source_stream_id: str,
observation_timestamp: float,
units: str,
sampling_rate: int,
mapped_location_string: str,
csn: str,
mrn: str,
) -> bool:
"""Appends a frame of waveform data to a csv file (creates file if it
doesn't exist.

:return: True if write was successful.
"""
observation_datetime = datetime.fromtimestamp(observation_timestamp)

out_path = "waveform-export/"
Path(out_path).mkdir(exist_ok=True)

filename = out_path + create_file_name(
source_stream_id, observation_datetime, csn, units
)
with open(filename, "a") as fileout:
wv_writer = csv.writer(fileout, delimiter=",")
waveform_data = waveform_data.get("value", "")

wv_writer.writerow(
[
csn,
mrn,
source_stream_id,
units,
sampling_rate,
observation_timestamp,
mapped_location_string,
waveform_data,
]
)

return True
55 changes: 55 additions & 0 deletions src/db.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
from datetime import datetime
import psycopg2
from psycopg2 import sql, pool
import logging

import settings as settings # type:ignore

logging.basicConfig(format="%(levelname)s:%(asctime)s: %(message)s")
logger = logging.getLogger(__name__)


class starDB:
sql_query: str = ""
connection_string: str = "dbname={} user={} password={} host={} port={} connect_timeout={} options='-c statement_timeout={}'".format(
settings.UDS_DBNAME, # type:ignore
settings.UDS_USERNAME, # type:ignore
settings.UDS_PASSWORD, # type:ignore
settings.UDS_HOST, # type:ignore
settings.UDS_PORT, # type:ignore
settings.UDS_CONNECT_TIMEOUT, # type:ignore
settings.UDS_QUERY_TIMEOUT, # type:ignore
)
connection_pool: pool.ThreadedConnectionPool

def connect(self):
self.connection_pool = pool.SimpleConnectionPool(1, 1, self.connection_string)

def init_query(self):
with open("src/sql/mrn_based_on_bed_and_datetime.sql", "r") as file:
self.sql_query = sql.SQL(file.read())
self.sql_query = self.sql_query.format(
schema_name=sql.Identifier(settings.SCHEMA_NAME)
)

def get_row(self, location_string: str, observation_datetime: datetime):
parameters = {
"location_string": location_string,
"observation_datetime": observation_datetime,
}
try:
with self.connection_pool.getconn() as db_connection:
with db_connection.cursor() as curs:
curs.execute(self.sql_query, parameters)
rows = curs.fetchall()
self.connection_pool.putconn(db_connection)
except psycopg2.errors.OperationalError as e:
self.connection_pool.putconn(db_connection)
raise ConnectionError(f"Data base error: {e}")

if len(rows) != 1:
raise ValueError(
f"Wrong number of rows returned from database. {len(rows)} != 1, for {location_string}:{observation_datetime}"
)

return rows[0]
2 changes: 2 additions & 0 deletions waveform_controller/settings.py → src/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ def get_from_env(env_var, setting_name=None):
get_from_env("UDS_PASSWORD")
get_from_env("UDS_HOST")
get_from_env("UDS_PORT")
get_from_env("UDS_CONNECT_TIMEOUT")
get_from_env("UDS_QUERY_TIMEOUT")
get_from_env("SCHEMA_NAME")
get_from_env("RABBITMQ_USERNAME")
get_from_env("RABBITMQ_PASSWORD")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,5 @@ INNER JOIN {schema_name}.location_visit lv
INNER JOIN {schema_name}.location loc
ON lv.location_id = loc.location_id
WHERE loc.location_string = %(location_string)s
AND hv.valid_from BETWEEN %(start_datetime)s AND %(end_datetime)s
ORDER by hv.valid_from DESC
AND lv.admission_datetime <= %(observation_datetime)s
AND ( lv.discharge_datetime >= %(observation_datetime)s OR lv.discharge_datetime IS NULL )
Empty file added tests/__init__.py
Empty file.
25 changes: 25 additions & 0 deletions tests/test_file_writer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
import pytest
from src.csv_writer import create_file_name
from datetime import datetime, timezone


@pytest.mark.parametrize(
"units, expected_filename",
[
("uV", "2025-01-01.12345678.11.uV.csv"),
("mL/s", "2025-01-01.12345678.11.mLps.csv"),
("%", "2025-01-01.12345678.11.percent.csv"),
],
)
def test_create_file_name_handles_units(units, expected_filename, tmp_path):
sourceStreamId = "11"
observationTime = datetime(2025, 1, 1, 10, 10, 10, tzinfo=timezone.utc)
csn = "12345678"

filename = create_file_name(sourceStreamId, observationTime, csn, units)

assert filename == expected_filename

# check we can write to it
with open(f"{tmp_path}/{filename}", "w") as fileout:
fileout.write("Test string")
Loading