Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ jobs:
docker network create --driver bridge delphi-net
docker run --rm -d -p 13306:3306 --network delphi-net --name delphi_database_epidata --cap-add=sys_nice delphi_database_epidata
docker run --rm -d -p 6379:6379 --network delphi-net --env "REDIS_PASSWORD=1234" --name delphi_redis delphi_redis


- run: |
wget https://raw.githubusercontent.com/eficode/wait-for/master/wait-for
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ def setUp(self):
cur.execute('truncate table covid_hosp_facility_key')
cur.execute('truncate table covid_hosp_meta')
cur.execute('delete from api_user')
cur.execute('insert into api_user(api_key, email) values ("key", "emai")')
cur.execute('insert into api_user(api_key, email) values ("key", "email")')

@freeze_time("2021-03-16")
def test_acquire_dataset(self):
Expand Down
243 changes: 183 additions & 60 deletions integrations/acquisition/rvdss/test_scenarios.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,30 @@
"""Integration tests for acquisition of rvdss data."""
# standard library
import unittest
from unittest.mock import MagicMock
from unittest.mock import MagicMock, patch
from copy import copy

# first party
from delphi.epidata.client.delphi_epidata import Epidata
from delphi.epidata.acquisition.rvdss.database import update
from delphi.epidata.acquisition.rvdss.database import update, rvdss_cols, get_num_rows
import delphi.operations.secrets as secrets
from delphi_utils import get_structured_logger

# third party
import mysql.connector
import mysql.connector
from mysql.connector.errors import IntegrityError
import pandas as pd
import numpy as np
from pathlib import Path
import pdb

# py3tester coverage target (equivalent to `import *`)
# __test_target__ = 'delphi.epidata.acquisition.covid_hosp.facility.update'

NEWLINE="\n"

class AcquisitionTests(unittest.TestCase):
logger = get_structured_logger()

def setUp(self):
"""Perform per-test setup."""
Expand All @@ -25,74 +33,189 @@
# self.test_utils = UnitTestUtils(__file__)

# use the local instance of the Epidata API
Epidata.BASE_URL = 'https://delphi_web_epidata/epidata'
Epidata.BASE_URL = 'http://delphi_web_epidata/epidata'
Epidata.auth = ('epidata', 'key')

# use the local instance of the epidata database
secrets.db.host = 'delphi_database_epidata'
secrets.db.epi = ('user', 'pass')

# clear relevant tables
u, p = secrets.db.epi
cnx = mysql.connector.connect(user=u, password=p, database="epidata")
cur = cnx.cursor()
epidata_cnx = mysql.connector.connect(
user='user',
password='pass',
host='delphi_database_epidata',
database='epidata')
epidata_cur = epidata_cnx.cursor()

epidata_cur.execute('truncate table rvdss')
epidata_cur.execute('DELETE from api_user')
epidata_cur.execute('INSERT INTO api_user(api_key, email) VALUES ("key", "email")')
epidata_cnx.commit()
epidata_cur.close()
#epidata_cnx.close()

# make connection and cursor available to test cases
self.cnx = epidata_cnx
self.cur = epidata_cnx.cursor()

def tearDown(self):
"""Perform per-test teardown."""
self.cur.close()
self.cnx.close()

@patch("mysql.connector.connect")
def test_rvdss_repiratory_detections(self, mock_sql):
connection_mock = MagicMock()

TEST_DIR = Path(__file__).parent.parent.parent.parent
detection_data = pd.read_csv(str(TEST_DIR) + "/testdata/acquisition/rvdss/RVD_CurrentWeekTable_Formatted.csv")
detection_data['time_type'] = "week"

# get the index of the subset of data we want to use
subset_index = detection_data[(detection_data['geo_value'].isin(['nl', 'nb'])) &
(detection_data['time_value'].isin([20240831, 20240907]))].index


# change issue so the data has more than one
detection_data.loc[subset_index,"issue"] = 20250227

# take a small subset just for testing insertion
detection_subset = detection_data.loc[subset_index]

# get the expected response when calling the API
# the dataframe needs to add the missing columns and replace nan with None
# since that is what is returned from the API
df = detection_subset.reindex(rvdss_cols,axis=1)
df = df.replace({np.nan: None}).sort_values(by=["epiweek","geo_value"])
df = df.to_dict(orient = "records")

expected_response = {"epidata": df,
"result": 1,
"message": "success",
}

# get another subset of the data not in the subset to test more calling options
detection_subset2 = detection_data[(detection_data['geo_value'].isin(['nu', 'nt'])) & (detection_data['time_value'].isin([20240831, 20240907])) ]

df2 = detection_subset2.reindex(rvdss_cols,axis=1)
df2 = df2.replace({np.nan: None}).sort_values(by=["epiweek","geo_value"])
df2 = df2.to_dict(orient = "records")

expected_response2 = {"epidata": df2,
"result": 1,
"message": "success",
}

# get another subset of the data for a single geo_value with multiple issues
subset_index2 = detection_data[(detection_data['geo_value'].isin(['ouest du québec'])) &
(detection_data['time_value'].isin([20240831, 20240907]))].index

detection_data.loc[subset_index2,"issue"] = [20250220,20250227]
detection_data.loc[subset_index2,"epiweek"] = [202435,202435]
detection_data.loc[subset_index2,"time_value"] = [20240831,20240831]

detection_subset3 = detection_data.loc[subset_index2]
df3 = detection_subset3.reindex(rvdss_cols,axis=1)
df3 = df3.replace({np.nan: None}).sort_values(by=["epiweek","geo_value"])
df3 = df3.to_dict(orient = "records")

expected_response3 = {"epidata": df3,
"result": 1,
"message": "success",
}

cur.execute('truncate table rvdss_repiratory_detections')
cur.execute('delete from api_user')
cur.execute('insert into api_user(api_key, email) values ("key", "emai")')

def test_rvdss_repiratory_detections(self):
# make sure the data does not yet exist
with self.subTest(name='no data yet'):
response = Epidata.rvdss_repiratory_detections(
'450822', Epidata.range(20200101, 20210101))
response = Epidata.rvdss(geo_type='province',
time_values= [202435, 202436],
geo_value = ['nl','nb'])
self.assertEqual(response['result'], -2, response)

# acquire sample data into local database
# TODO: Define example data
with self.subTest(name='first acquisition'):
acquired = Update.run(network=mock_network)
#self.assertTrue(acquired)

# make sure the data now exists
with self.subTest(name='initial data checks'):
expected_spotchecks = {
"hospital_pk": "450822",
"collection_week": 20201030,
"publication_date": 20210315,
"previous_day_total_ed_visits_7_day_sum": 536,
"total_personnel_covid_vaccinated_doses_all_7_day_sum": 18,
"total_beds_7_day_avg": 69.3,
"previous_day_admission_influenza_confirmed_7_day_sum": -999999
}
response = Epidata.covid_hosp_facility(
'450822', Epidata.range(20200101, 20210101))
self.assertEqual(response['result'], 1)
self.assertEqual(len(response['epidata']), 2)
row = response['epidata'][0]
for k,v in expected_spotchecks.items():
self.assertTrue(
k in row,
f"no '{k}' in row:\n{NEWLINE.join(sorted(row.keys()))}"
)
if isinstance(v, float):
self.assertAlmostEqual(row[k], v, f"row[{k}] is {row[k]} not {v}")
else:
self.assertEqual(row[k], v, f"row[{k}] is {row[k]} not {v}")

# expect 113 fields per row (114 database columns, except `id`)
self.assertEqual(len(row), 113)

# re-acquisition of the same dataset should be a no-op
with self.subTest(name='second acquisition'):
acquired = Update.run(network=mock_network)
self.assertFalse(acquired)

# make sure the data still exists
with self.subTest(name='final data checks'):
response = Epidata.covid_hosp_facility(
'450822', Epidata.range(20200101, 20210101))
self.assertEqual(response['result'], 1)
self.assertEqual(len(response['epidata']), 2)


# When the MagicMock connection's `cursor()` method is called, return
# a real cursor made from the current open connection `cnx`.
connection_mock.cursor.return_value = self.cnx.cursor()
# Commit via the current open connection `cnx`, from which the cursor
# is derived
connection_mock.commit = self.cnx.commit
mock_sql.return_value = connection_mock

update(detection_subset, self.logger)

response = Epidata.rvdss(geo_type='province',
time_values= [202435, 202436],
geo_value = ['nl','nb'])

self.assertEqual(response,expected_response)

with self.subTest(name='duplicate aquisition'):
# The main run function checks if the update has already been fetched/updated
# so it should never run twice, and duplocate aquisitions should never
# occur. Running the update twice will result in an error

# When the MagicMock connection's `cursor()` method is called, return
# a real cursor made from the current open connection `cnx`.
connection_mock.cursor.return_value = self.cnx.cursor()
# Commit via the current open connection `cnx`, from which the cursor
# is derived
connection_mock.commit = self.cnx.commit
mock_sql.return_value = connection_mock

with self.assertRaises(mysql.connector.errors.IntegrityError):
update(detection_subset, self.logger)

# Request with exact column order
with self.subTest(name='exact column order'):
rvdss_cols_subset = [col for col in detection_subset2.columns if col in rvdss_cols]
ordered_cols = [col for col in rvdss_cols if col in rvdss_cols_subset]
ordered_df = detection_subset2[ordered_cols]

connection_mock.cursor.return_value = self.cnx.cursor()
connection_mock.commit = self.cnx.commit
mock_sql.return_value = connection_mock

update(ordered_df, self.logger)

response = Epidata.rvdss(geo_type='province',
time_values= [202435, 202436],
geo_value = ['nt','nu'])

self.assertEqual(response,expected_response2)


# request by issue
with self.subTest(name='issue request'):
response = Epidata.rvdss(geo_type='province',
time_values= [202435, 202436],
geo_value = ['nl','nb'],
issues = 20250227)

self.assertEqual(response,expected_response)


# check requesting lists vs single values
with self.subTest(name='duplicate aquisition'):
# * with geo_value, single geo_type, time_value, issue
connection_mock.cursor.return_value = self.cnx.cursor()
connection_mock.commit = self.cnx.commit
mock_sql.return_value = connection_mock

update(detection_subset3, self.logger)

response = Epidata.rvdss(geo_type='province',
time_values= [202435, 202436],
geo_value = "*",
issues = 20250227)

response2 = Epidata.rvdss(geo_type='lab',
time_values= 202435,
geo_value = 'ouest du québec',
issues = [20250220,20250227])

self.assertEqual(response,expected_response)
self.assertEqual(response2,expected_response3)



Loading
Loading