Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions apps/report-execution/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ dev = [
"pytest>=9.0.2,<10.0.0",
"pytest-mock>=3.15.1",
"ruff>=0.15.0,<1.0.0",
"tablefaker>=1.8.0",
"testcontainers>=4.14.1",
]

Expand Down
4 changes: 3 additions & 1 deletion apps/report-execution/pytest.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,6 @@ addopts = [
"--strict-config",
"--showlocals",
"--import-mode=importlib"
]
]
log_cli = true
log_cli_level = "INFO"
25 changes: 18 additions & 7 deletions apps/report-execution/src/db_transaction.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,25 +9,33 @@ class Transaction:
"""A database transaction abstraction for use in libraries."""

def __init__(self, cursor):
self.cursor = cursor
self._cursor = cursor

def execute(self, query: str, parameters: tuple = ()) -> Table:
def query(self, query: str, parameters: tuple = ()) -> Table:
"""Execute a query and have the data returned as a Table.

DO NOT EXECUTE ANY PERMANENT CREATE, UPDATE, OR DELETE STATEMENTS

Positional `?` placeholders can be used in the query and values passed as
parameters in a tuple.

If the query inserts or updates a temporary table, then the returned table
will be empty.
"""
data = self.cursor.execute(query, parameters).fetchall()
data = self._cursor.execute(query, parameters).fetchall()
columns = self._column_names()
return Table(columns=columns, data=data)

def execute(self, query: str, parameters: tuple = ()) -> None:
"""Execute a SQL statement and do not return any result.

DO NOT EXECUTE ANY PERMANENT CREATE, UPDATE, OR DELETE STATEMENTS

Positional `?` placeholders can be used in the query and values passed as
parameters in a tuple.
"""
self._cursor.execute(query, parameters)
return None

def _column_names(self) -> list[str]:
return [c[0] for c in self.cursor.description]
return [c[0] for c in self._cursor.description]


@contextmanager
Expand All @@ -40,3 +48,6 @@ def db_transaction(connection_string):
with connection.cursor() as cursor:
trx = Transaction(cursor)
yield trx

# not sure why this is needed - it shouldn't be per docs
connection.commit()
2 changes: 1 addition & 1 deletion apps/report-execution/src/libraries/nbs_custom.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ def execute(
* Lifted the size check to a global check and refined the env var names
* Date formatting still needs to be figured out
"""
content = trx.execute(subset_query)
content = trx.query(subset_query)

header = f'Custom Report For Table: {data_source_name}'
subheader = None
Expand Down
137 changes: 137 additions & 0 deletions apps/report-execution/src/libraries/nbs_sr_05.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
from src.db_transaction import Transaction
from src.models import ReportResult, TimeRange


def execute(
trx: Transaction,
subset_query: str,
data_source_name: str,
time_range: TimeRange | None = None,
**kwargs,
):
"""Standard Report 05: Cases of Reportable Diseases for a specific state.

Each row is a disease with columns for the:
* Current month total
* YTD total
* Prior YTD total
* 5 Year Median YTD total
* % change Current YTD vs 5 Year Median YTD

Conversion notes:
* Export included the year as a column
* Export has columns in different order
"""
content = trx.query(
# State filtering is assumed to happen in the filters
f'WITH subset as ({subset_query})\n'
# base_data CTE
', base_data as ('
'SELECT phc_code_short_desc, MONTH(event_date) as month, '
'YEAR(event_date) as year, sum(group_case_cnt) as cases\n'
'FROM subset\n'
'AND event_date is not NULL\n'
'AND DATEPART(dayofyear, event_date) <= '
' DATEPART(dayofyear, CURRENT_TIMESTAMP)\n'
'AND YEAR(event_date) >= (YEAR(CURRENT_TIMESTAMP) - 5)\n'
'GROUP BY phc_code_short_desc, MONTH(event_date), YEAR(event_date)\n'
')\n'
# diseases CTE
', diseases as (\n'
'SELECT DISTINCT phc_code_short_desc\n'
'FROM base_data\n'
')\n'
# year_data CTE
', year_data as (\n'
'SELECT phc_code_short_desc, year, SUM(cases) as cases\n'
'FROM base_data\n'
'GROUP BY phc_code_short_desc, year'
')\n'
# this_month CTE
', this_month as (\n'
'SELECT phc_code_short_desc, SUM(cases) as curr_month\n'
'FROM base_data\n'
'WHERE month = MONTH(CURRENT_TIMESTAMP)\n'
'AND year = YEAR(CURRENT_TIMESTAMP)\n'
'GROUP BY phc_code_short_desc'
')\n'
# this_year CTE
', this_year as (\n'
'SELECT phc_code_short_desc, SUM(cases) as curr_ytd\n'
'FROM year_data\n'
'WHERE year = YEAR(CURRENT_TIMESTAMP)\n'
'GROUP BY phc_code_short_desc'
')\n'
# last_year CTE
', last_year as (\n'
'SELECT phc_code_short_desc, SUM(cases) as last_ytd\n'
'FROM year_data\n'
'WHERE year = (YEAR(CURRENT_TIMESTAMP) - 1)\n'
'GROUP BY phc_code_short_desc'
')\n'
# median_year CTE
', median_year as (\n'
'SELECT DISTINCT phc_code_short_desc, PERCENTILE_CONT(0.5) WITHIN GROUP '
'(ORDER BY cases) OVER (PARTITION BY phc_code_short_desc) as median_ytd\n'
'FROM year_data\n'
')\n'
# Result select
'SELECT d.phc_code_short_desc, COALESCE(curr_month, 0) as curr_month, \n'
'COALESCE(curr_ytd, 0) as curr_ytd, COALESCE(last_ytd, 0) as last_ytd, \n'
'COALESCE(median_ytd, 0) as median_ytd, \n'
'IIF('
' COALESCE(median_ytd, 0) = 0, '
' 0, '
' COALESCE((curr_ytd - median_ytd) / median_ytd, 0)) as pct_chg\n'
'FROM diseases d\n'
'LEFT JOIN this_month tm on tm.phc_code_short_desc = d.phc_code_short_desc\n'
'LEFT JOIN this_year ty on ty.phc_code_short_desc = d.phc_code_short_desc\n'
'LEFT JOIN last_year ly on ly.phc_code_short_desc = d.phc_code_short_desc\n'
'LEFT JOIN median_year my on my.phc_code_short_desc = d.phc_code_short_desc\n'
'ORDER BY d.phc_code_short_desc asc'
)

# TODO: # noqa: FIX002
# column header names
# sub header

header = 'SR5: Cases of Reportable Diseases by State'
subheader = None
if time_range is not None:
subheader = f'{time_range.start} - {time_range.end}'

description = (
'*Data Source:* nbs_ods.PHCDemographic (publichealthcasefact)\n'
'*Output:* Report demonstrates, in table form, the total number of '
'Investigation(s) [both Individual and Summary] irrespective of Case Status.\n'
'Output:\n'
'* Does not include Investigation(s) that have been logically deleted\n'
'* Is filtered based on the state, disease(s) and advanced criteria selected '
'by user\n'
'* Will not include Investigation(s) that do not have a value for the State '
'selected by the user\n'
'* Is based on month and year of the calculated Event Date\n'
'*Calculations:*'
'* *Current Month Totals by disease:* Total Investigation(s) [both Individual '
'and Summary] where the Year and Month of the Event Date equal the current '
'Year and Month\n'
'* *Current Year Totals by disease:* Total Investigation(s) [both Individual '
'and Summary] where the Year of the Event Date equal the current Year\n'
'* *Previous Year Totals by disease:* Total Investigation(s) [both '
'Individual and Summary] where the Year of the Event Date equal last Year\n'
'* *5-Year median:* Median number of Investigation(s) [both Individual and '
'Summary] for the past five years\n'
'* *Percentage change (current year vs. 5 year median):* Percentage change '
'between the Current Year Totals by disease and the 5-Year median\n'
' * *Event Date:* Derived using the hierarchy of Onset Date, Diagnosis Date, '
'Report to County, Report to State and Date the Investigation was created in '
'the NBS.\n'
)

return ReportResult(
content_type='table',
content=content,
header=header,
subheader=subheader,
description=description,
)
106 changes: 101 additions & 5 deletions apps/report-execution/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,11 @@
from contextlib import contextmanager

import pytest
from testcontainers.compose import DockerCompose
import tablefaker
from testcontainers.compose import ContainerIsNotRunning, DockerCompose

from src import utils
from src.db_transaction import db_transaction
from src.models import Table


Expand Down Expand Up @@ -81,27 +84,120 @@
"""Set up DB and report execution containers."""
logging.info('Setting up containers tests...')
compose_path = os.path.join(os.path.dirname(__file__), '../../../cdc-sandbox')
services = ['report-execution', 'nbs-mssql']
compose_file_names = [
'docker-compose.yml',
'../apps/report-execution/tests/integration/docker-compose.yml',
]
containers = DockerCompose(
compose_path,
compose_file_name=compose_file_names,
services=['report-execution', 'nbs-mssql'],
build=True,
services=services,
env_file=['../sample.env', '../apps/report-execution/sample.env'],
build=True,
)
report_exec_url = 'http://0.0.0.0:8001/status'

def maybe_get_container(name):
try:
containers.get_container(name)
except ContainerIsNotRunning:
return None

containers_to_stop = [
maybe_get_container(service)
for service in services
if maybe_get_container(service) is not None
]

containers.start()
containers.wait_for(report_exec_url)
logging.info('Ingestion ready to test!')

def teardown():
logging.info('Service logs...\n')
logging.info(containers.get_logs())
logging.info('Tests finished! Tearing down.')
containers.stop()
for container in containers_to_stop:
container.stop()

Check failure on line 121 in apps/report-execution/tests/conftest.py

View workflow job for this annotation

GitHub Actions / Lint & Unit Test

Pyrefly missing-attribute

Object of class `NoneType` has no attribute `stop`

request.addfinalizer(teardown)


def get_faker_sql(schema_name: str) -> str:
"""Process a fakertable schema and return the sql as a string."""
faker_path = os.path.join(
os.path.dirname(__file__),
'integration',
'assets',
'tablefaker_schema',
schema_name,
)
target_file_path = os.path.join(os.path.dirname(__file__), 'fake.sql')
tablefaker.to_sql(faker_path, target_file_path=target_file_path)
with open(target_file_path) as f:
result = f.read()

os.remove(target_file_path)

# KLUDGE: NULL writing is not always correct
result = result.replace(' nan,', ' NULL,')
result = result.replace(' nan)', ' NULL)')
result = result.replace(' <NA>,', ' NULL,')
result = result.replace(' <NA>)', ' NULL)')
return result


def temp_name(table_name: str) -> str:
"""Assumes `[schema].[dbo].[table name]` format.

Not using temp tables as the usage spans connections.
"""
return table_name[0:-1] + '_temp]'


@pytest.fixture(scope='class')
def fake_db_table(request):
"""Replace a DB table with fake table per the tablefaker schema."""
db_table = request.module.db_table
fk_tables = getattr(request.module, 'db_fk_tables', [])
faker_schema = request.module.faker_schema
faker_sql = get_faker_sql(faker_schema)

conn_string = utils.get_env_or_error('DATABASE_CONN_STRING')

# swap out original data for fake data
with db_transaction(conn_string) as trx:
# Tables with foreign keys pointing to the table we want to replace need to
# be backed up and cleared out to avoid FK constraint violations
for fk_table in fk_tables:
temp_fk_table = temp_name(fk_table)
trx.execute(
f"IF OBJECT_ID('{temp_fk_table}') IS NOT NULL "
f'DROP TABLE {temp_fk_table}'
)
trx.execute(f'SELECT * INTO {temp_fk_table} FROM {fk_table}')
trx.execute(f'DELETE {fk_table}')
logging.info(f'cleared FK table: {fk_table}')

temp_db_table = temp_name(db_table)
trx.execute(
f"IF OBJECT_ID('{temp_db_table}') IS NOT NULL DROP TABLE {temp_db_table}"
)
trx.execute(f'SELECT * INTO {temp_db_table} FROM {db_table}')
trx.execute(f'DELETE {db_table}')
logging.info(f'cleared table: {db_table}')
trx.execute(faker_sql)
logging.info(f'Inserted fake data: {db_table}')

# avoid connection inside connection
yield

# restore the original data
with db_transaction(conn_string) as trx:
trx.execute(f'DELETE {db_table}')
trx.execute(f'INSERT INTO {db_table} SELECT * FROM {temp_db_table}')
logging.info(f'Restored table: {db_table}')

for fk_table in fk_tables:
trx.execute(f'INSERT INTO {fk_table} SELECT * FROM {temp_name(fk_table)}')
logging.info(f'Restored FK table: {fk_table}')
Loading
Loading