From aca94eeb5b7196655350cce2320e921c247f6b52 Mon Sep 17 00:00:00 2001 From: Kairsten Fay Date: Fri, 26 Jun 2020 15:40:32 -0700 Subject: [PATCH 1/4] Deprecate enrollments ETL The enrollments ETL (`id3c etl enrollments`) was used in Y1 for Audere data. We no longer partner with Audere, so deprecate this ETL. --- lib/id3c/cli/command/etl/__init__.py | 1 - lib/id3c/cli/command/etl/enrollments.py | 410 ------------------------ 2 files changed, 411 deletions(-) delete mode 100644 lib/id3c/cli/command/etl/enrollments.py diff --git a/lib/id3c/cli/command/etl/__init__.py b/lib/id3c/cli/command/etl/__init__.py index c311697be..613b6b63c 100644 --- a/lib/id3c/cli/command/etl/__init__.py +++ b/lib/id3c/cli/command/etl/__init__.py @@ -21,7 +21,6 @@ def etl(): # Load all ETL subcommands. __all__ = [ - "enrollments", "manifest", "presence_absence", "kit", diff --git a/lib/id3c/cli/command/etl/enrollments.py b/lib/id3c/cli/command/etl/enrollments.py deleted file mode 100644 index 0c1f8b97c..000000000 --- a/lib/id3c/cli/command/etl/enrollments.py +++ /dev/null @@ -1,410 +0,0 @@ -""" -Process enrollment documents into the relational warehouse -""" -import click -import logging -from datetime import datetime, timezone -from itertools import groupby -from operator import itemgetter -from typing import Any, Optional -from id3c.cli.command import with_database_session -from id3c.db import find_identifier -from id3c.db.session import DatabaseSession -from id3c.db.datatypes import Json -from . import ( - etl, - - find_or_create_site, - find_location, - upsert_individual, - upsert_encounter, - upsert_location, - upsert_encounter_location, - upsert_sample -) - - -LOG = logging.getLogger(__name__) - - -# The revision number and etl name are stored in the processing_log of each -# enrollment record when the enrollment is successfully processed by -# this ETL routine. The routine finds new-to-it records to process by looking -# for enrollments lacking this etl revision number and etl name in their log. -# If a change to the ETL routine necessitates re-processing all enrollments, -# this revision number should be incremented. -# The etl name has been added to allow multiple etls to process the same -# receiving table -REVISION = 5 -ETL_NAME = "enrollments" - - -# XXX TODO: Stop hardcoding valid identifier sets. Instead, accept them as -# an option or config (and validate option choices against what's actually -# in the database). We won't want to validate using click.option(), -# because that would necessitate a database connection simply to run -# bin/id3c at all. -# -trs, 13 May 2019 -EXPECTED_COLLECTION_IDENTIFIER_SETS = { - "collections-seattleflu.org", - "collections-fluathome.org", -} - - -@etl.command("enrollments", help = __doc__) -@with_database_session - -def etl_enrollments(*, db: DatabaseSession): - LOG.debug(f"Starting the enrollment ETL routine, revision {REVISION}") - - # Fetch and iterate over enrollments that aren't processed - # - # Use a server-side cursor by providing a name. This ensures we limit how - # much data we fetch at once, to limit local process size. Each enrollment - # document is ~10 KB and the default fetch size (cursor.itersize) is 2,000, - # thus we'll get ~20 MB on each fetch of 2,000 enrollments. - # - # Rows we fetch are locked for update so that two instances of this - # command don't try to process the same enrollments. - LOG.debug("Fetching unprocessed enrollments") - - enrollments = db.cursor("enrollments") - enrollments.execute(""" - select enrollment_id as id, document - from receiving.enrollment - where not processing_log @> %s - order by id - for update - """, (Json([{ "etl": ETL_NAME, "revision": REVISION }]),)) - - for enrollment in enrollments: - with db.savepoint(f"enrollment {enrollment.id}"): - LOG.info(f"Processing enrollment {enrollment.id}") - - # Out of an abundance of caution, fail when the schema version - # of the enrollment document changes. This ensures manual - # intervention happens on document structure changes. After - # seeing how versions are handled over time, this restriction - # may be toned down a bit. - known_versions = {"1.1.0", "1.0.0"} - - assert enrollment.document["schemaVersion"] in known_versions, \ - f"Document schema version {enrollment.document['schemaVersion']} is not in {known_versions}" - - # Most of the time we expect to see existing sites so a - # select-first approach makes the most sense to avoid useless - # updates. - site = find_or_create_site(db, - identifier = enrollment.document["site"]["name"], - details = site_details(enrollment.document["site"])) - - # Most of the time we expect to see new individuals and new - # encounters, so an insert-first approach makes more sense. - # Encounters we see more than once are presumed to be - # corrections. - individual = upsert_individual(db, - identifier = enrollment.document["participant"], - sex = assigned_sex(enrollment.document)) - - encounter = upsert_encounter(db, - identifier = enrollment.document["id"], - encountered = enrollment.document["startTimestamp"], - individual_id = individual.id, - site_id = site.id, - age = age(enrollment.document), - details = encounter_details(enrollment.document)) - - process_samples(db, encounter.id, enrollment.document) - process_locations(db, encounter.id, enrollment.document) - - mark_processed(db, enrollment.id) - - LOG.info(f"Finished processing enrollment {enrollment.id}") - - -def process_samples(db: DatabaseSession, - encounter_id: int, - document: dict): - """ - Process an enrollment *document*'s samples. - - Find existing collected samples, or create skeletal sample records - containing just the collection barcode linked back to this *encounter_id*. - Sample manifests generated by the processing lab will usually be loaded - later and fill in the rest of the sample record. - """ - for sample in document["sampleCodes"]: - barcode = sample.get("code") - - if not barcode: - LOG.warning(f"Skipping collected sample with no barcode") - continue - - # XXX TODO: Stop hardcoding this and handle other types. - # ScannedSelfSwab and ManualSelfSwabbed are kit barcodes, - # not collection barcodes. TestStrip is an identifier - # UUID, not barcode. - # - trs, 17 May 2019 - if sample["type"] != "ClinicSwab": - LOG.warning(f"Skipping collected sample with unknown type {sample['type']}") - continue - - LOG.debug(f"Looking up collected sample code «{barcode}»") - identifier = find_identifier(db, barcode) - - if not identifier: - LOG.warning(f"Skipping collected {sample['type']} sample with unknown barcode «{barcode}»") - continue - - assert identifier.set_name in EXPECTED_COLLECTION_IDENTIFIER_SETS, \ - f"{sample['type']} sample with unexpected «{identifier.set_name}» barcode «{barcode}»" - - # XXX TODO: Relationally model sample type after we choose - # a standard vocabulary (LOINC or SNOMED or whatever FHIR - # normalizes?) - # -trs, 8 May 2019 - details = { - "type": sample["type"], - } - - sample = upsert_sample(db, - collection_identifier = identifier.uuid, - encounter_id = encounter_id, - details = details) - - # XXX TODO: Should this delete existing linked samples which - # weren't mentioned in this enrollment document? This would - # support the case of an incorrectly included sample from an - # earlier enrollment document being corrected by a later - # enrollment document. - # -trs, 8 May 2019 - - -def process_locations(db: DatabaseSession, encounter_id: int, document: dict): - """ - Process an enrollment *document*'s locations and attach them to *encounter_id*. - """ - locations = encounter_locations(document) - - for (use, location) in locations.items(): - # Find the tract, if we know it. Tracts are reasonably enumerable, so - # we require that they already exist. - tract_identifier = location.get("region") - - if tract_identifier: - tract = find_location(db, "tract", tract_identifier) - assert tract, f"Tract «{tract_identifier}» is unknown" - else: - tract = None - - # If we have an address identifier ("household id"), we upsert a - # location record for it. Addresses are not reasonably enumerable, so - # we don't require they exist. - address_identifier = location.get("id") - - if address_identifier: - address = upsert_location(db, - scale = "address", - identifier = address_identifier, - hierarchy = tract.hierarchy if tract else None) - else: - address = None - - if not (tract or address): - LOG.warning(f"No tract or address location available for «{use}»") - continue - - # Audere calls this "use", but I think "relation" is a more appropriate - # term. We map to preferred nomenclature based loosely on FHIR. - relation = { - "home": "residence", - "work": "workplace", - "temp": "lodging", - } - - upsert_encounter_location(db, - encounter_id = encounter_id, - relation = relation[use], - location_id = address.id if address else tract.id) - - -def site_details(site: dict) -> dict: - """ - Describe site details in a simple data structure designed to be used from - SQL. - """ - return { - "type": site.get("type"), - } - - -def age(document: dict) -> Optional[str]: - """ - Retrieves the age of the individual at the time of encounter from - *document*. - - Converts age value from int to string to fit interval format. - """ - age_dict = document.get("age") - if not age_dict: - return None - if age_dict.get("ninetyOrAbove"): - return "90 years" - age = float(age_dict.get("value")) - # XXX TODO: Determine how Audere will send age in months for < 1 year olds. - return f"{age} years" - - -def encounter_details(document: dict) -> dict: - """ - Describe encounter details in a simple data structure designed to be used - from SQL. - - Interpreting the contained question → answer ``responses`` map may require - the data dictionary. - """ - return { - "age": document.get("age"), # XXX TODO: Remove age from details - "locations": encounter_locations(document), # XXX TODO: Remove locations from details - "language": document["localeLanguageCode"], - "responses": { - response["question"]["token"]: decode_answer(response) - for response in document["responses"] - }, - } - - -def encounter_locations(document: dict) -> dict: - """ - Return the encounter *document*'s locations array as a dictionary keyed by - lowercase location use (``home``, ``work``, ``temp``). - - Raises an :class:`AssertionError` if there's more than one location for a - use type. - """ - locations = document["locations"] - - def use_of(location): - return location["use"].lower() - - duplicate_uses = [ - use for use, locations - in groupby(sorted(locations, key = use_of), key = use_of) - if len(list(locations)) > 1 - ] - - assert not duplicate_uses, \ - f"Document {document['id']} contains more than one location for uses: {duplicate_uses}" - - return { - use_of(location): location - for location in locations - } - - -def mark_processed(db, enrollment_id: int) -> None: - LOG.debug(f"Marking enrollment {enrollment_id} as processed") - - data = { - "enrollment_id": enrollment_id, - "log_entry": Json({ - "etl": ETL_NAME, - "revision": REVISION, - "timestamp": datetime.now(timezone.utc), - }), - } - - with db.cursor() as cursor: - cursor.execute(""" - update receiving.enrollment - set processing_log = processing_log || %(log_entry)s - where enrollment_id = %(enrollment_id)s - """, data) - - -def assigned_sex(document: dict) -> Any: - """ - Response value of one of the two questions about assigned sex, or None if - neither question is present in the *document* responses. - """ - def first_or_none(items): - return items[0] if items else None - - try: - return first_or_none(response("AssignedSex", document)) - except NoSuchQuestionError: - try: - return first_or_none(response("AssignedSexAirport", document)) - except NoSuchQuestionError: - LOG.warning(f"No assigned sex response found in document {document['id']}") - return None - - -def response(question_id: str, document: dict) -> Any: - """ - Response value for *question_id* in the enrollment *document*. - - Returns a string, number, tuple of strings, or None. - - Raises a :class:`NoSuchQuestionError` if *question_id* is not found in the - responses contained by *document*. - - Raises a :class:`TooManyResponsesError` if the *question_id* is not unique - among responses contained by *document*. - """ - responses = [ - response - for response in document["responses"] - if response["question"]["token"] == question_id ] - - if not responses: - raise NoSuchQuestionError(f"No question with id/token '{question_id}' in document {document['id']}") - - if len(responses) > 1: - raise TooManyResponsesError(f"Question id/token '{question_id}' is not unique in responses of document {document['id']}") - - return decode_answer(responses[0]) - - -def decode_answer(response_data: dict) -> Any: - """ - Decode the answer described by *response_data*, a substructure of an - enrollment document. - - Returns a string, number, tuple of strings, or None. - """ - answer = response_data["answer"] - - if answer["type"] in ["String", "Number"]: - return answer["value"] - - elif answer["type"] == "Option": - chosen_options = map(int, answer["chosenOptions"]) - option_tokens = [ - option["token"] - for option in response_data["options"] ] - - return tuple( - option_tokens[chosen] - for chosen in chosen_options) - - elif answer["type"] == "Declined": - return None - - else: - raise ValueError(f"Unknown response answer type {answer['type']}") - - -class NoSuchQuestionError(ValueError): - """ - Raised by :function:`response` if its provided *question_id* is not found - in the set of responses. - """ - pass - -class TooManyResponsesError(ValueError): - """ - Raised by :function:`response` if its provided *question_id* is not unique - among the set of responses. - """ - pass From fa70aa4a89372c8f8f8dc16cbff8bf400cbeb91e Mon Sep 17 00:00:00 2001 From: Kairsten Fay Date: Fri, 26 Jun 2020 15:47:34 -0700 Subject: [PATCH 2/4] Delete DEVELOPMENT.md Not only is this document is fairly outdated at this point, but also, the more logical place for it to live is seattleflu/documentation. --- DEVELOPMENT.md | 69 -------------------------------------------------- 1 file changed, 69 deletions(-) delete mode 100644 DEVELOPMENT.md diff --git a/DEVELOPMENT.md b/DEVELOPMENT.md deleted file mode 100644 index 3ae6fa6de..000000000 --- a/DEVELOPMENT.md +++ /dev/null @@ -1,69 +0,0 @@ -# Development -## ID3C Common Tasks - -Kairsten's notes. TODO: organize with README. -### Configuring your local development environment -#### Prerequisites -This tutorial assumes you have the following dependencies already installed. -* Git -* Python -* PostgreSQL - -1. Clone the git repo -```sh -git clone git@github.com:seattleflu/id3c.git -``` - -2. To fill the `seattleflu` database with the backup db copy, run -`pg_restore --data-only --single-transaction --schema receiving --table enrollment --dbname seattleflu seattleflu-production-.pgdb` - -3. To restore the presence/absence data on your local dev instance, you have two options. - 1. **Restore from a flat file backup**, e.g. `pg_restore --dbname seattleflu presence_absence.pgdb` - 2. **Restore from seattleflu-testing** - First, remove local records via - ```sh - psql seattleflu <<<"truncate receiving.presence_absence" - ``` - Then, run `pg_dump` against the testing database and re-fill your local database with the results. - - ```sh - pg_dump --data-only --table receiving.presence_absence --format custom service=seattleflu-testing | pg_restore --single-transaction --dbname seattleflu - ``` - -### Prevent wrapping within psql -Set pager environment variable to `less` and specify which `less` method to use. If there's less than a screen full of information, don't page it. -```sh -PAGER=less LESS=SFRXi psql seattleflu -``` -To save these settings, add the following lines to your `~/.psqlrc`. -```sql -\setenv PAGER less -\setenv LESS SRFXi -``` - -### Psql -Reset the processing log for all rows to be blank. When the enrollments pipeline is run, it will process all rows with blank processing logs for the current revision. These rows will be marked `for update`. -```sql -update receiving.enrollment set processing_log = '[]'; -``` - -### Running id3c -From within the `id3c` directory, run the following command to test your install. -```sh -PGDATABASE=seattleflu pipenv run ./bin/id3c etl enrollments --help -``` - -To process `enrollments`, run -```sh -PGDATABASE=seattleflu pipenv run ./bin/id3c etl enrollments --prompt -``` - -To process `clinical` run -```sh -PGDATABASE=seattleflu pipenv run ./bin/id3c etl clinical --prompt -``` - -If you don't have `LOG_LEVEL=debug` turned on, your logs should be available on your system via -```sh -grep seattleflu /var/log/syslog -``` From 209aa4252dbd69860aea230df2ff78a72b4e0b15 Mon Sep 17 00:00:00 2001 From: Kairsten Fay Date: Fri, 26 Jun 2020 15:57:46 -0700 Subject: [PATCH 3/4] Deprecate the kit ETL Deprecate the kit ETL(`id3c etl kit`). It was the original swab-n-send ETL, but we have REDCap for that now. --- lib/id3c/cli/command/etl/__init__.py | 1 - lib/id3c/cli/command/etl/kit.py | 539 --------------------------- 2 files changed, 540 deletions(-) delete mode 100644 lib/id3c/cli/command/etl/kit.py diff --git a/lib/id3c/cli/command/etl/__init__.py b/lib/id3c/cli/command/etl/__init__.py index 613b6b63c..84f147c7d 100644 --- a/lib/id3c/cli/command/etl/__init__.py +++ b/lib/id3c/cli/command/etl/__init__.py @@ -23,7 +23,6 @@ def etl(): __all__ = [ "manifest", "presence_absence", - "kit", "consensus_genome", "redcap_det", "fhir", diff --git a/lib/id3c/cli/command/etl/kit.py b/lib/id3c/cli/command/etl/kit.py deleted file mode 100644 index 842432b62..000000000 --- a/lib/id3c/cli/command/etl/kit.py +++ /dev/null @@ -1,539 +0,0 @@ -""" -Process both enrollment and manifest documents to extract kit related data -into the relational warehouse. - -The kit etl is completely dependent on the enrollments etl and the manifest etl. -It does not create encounters or samples, but must find them within the -relational warehouse. -""" -import click -import logging -from psycopg2 import sql -from datetime import datetime, timezone -from typing import Any, Optional, Tuple -from id3c.cli.command import with_database_session -from id3c.db import find_identifier -from id3c.db.session import DatabaseSession -from id3c.db.datatypes import Json -from id3c.db.types import KitRecord, SampleRecord -from . import etl, update_sample, find_sample_by_id - -LOG = logging.getLogger(__name__) - - -# The revision number and etl name are stored in the processing_log of each -# enrollment/manifest record when the record is processed or skipped by -# this ETL routine. The routine finds new-to-it records to process by looking -# for records lacking this etl revision number and etl name in their log. -# If a change to the ETL routine necessitates re-processing all enrollments, -# this revision number should be incremented. -# The etl name has been added to allow multiple etls to process the same -# receiving table. -ENROLLMENTS_REVISION = 1 -MANIFEST_REVISION = 1 -ETL_NAME = "kit" - -expected_identifier_sets = { - "kits": {"kits-fluathome.org"}, - "samples": {"samples", "collections-fluathome.org"}, - "test-strips": {"test-strips-fluathome.org"} -} - -@etl.group("kit", help = __doc__) -def kits(): - pass - -@kits.command("enrollments", help = __doc__) -@with_database_session - -def kit_enrollments(*, db: DatabaseSession): - LOG.debug(f"Starting the kit enrollments ETL routine, revision {ENROLLMENTS_REVISION}") - - expected_barcode_types = {"ScannedSelfSwab", "ManualSelfSwab"} - - LOG.debug("Fetching unprocessed enrollments") - enrollments = db.cursor("enrollments") - enrollments.execute(""" - select enrollment_id as id, document - from receiving.enrollment - where not processing_log @> %s - order by id - for update - """, (Json([{ "etl": ETL_NAME, "revision": ENROLLMENTS_REVISION }]),)) - - for enrollment in enrollments: - with db.savepoint(f"enrollment {enrollment.id}"): - LOG.info(f"Processing enrollment {enrollment.id}") - - # Find encounter that should have been created - # from this enrollment record through etl enrollments - encounter = find_encounter(db, enrollment.document["id"]) - - # Error out the kit etl process if no encounter found - # The kit etl process can try again starting with this record - # next time with the idea that the encounter will be - # created by then. - if not encounter: - raise EncounterNotFoundError(f"No encounter with identifier «{enrollment.document['id']}» found") - - # Skip and mark the enrollment document as processed if the - # encounter found is linked to a site that is not self-test - if encounter.site != "self-test": - LOG.debug(f"Found encounter {encounter.id} «{encounter.identifier}»" + - f"linked to site «{encounter.site}», not 'self-test'") - mark_enrollment_processed(db, enrollment.id) - continue - - for code in enrollment.document["sampleCodes"]: - barcode = code.get("code") - - # Kit must have a barcode - if not barcode: - LOG.warning(f"No barcode found in sampleCodes {code}") - continue - - # Barcode must be of expected barcode type - if code["type"] not in expected_barcode_types: - LOG.debug(f"Skipping barcode with type {code['type']}") - continue - - # Convert kit barcode to full identifier - kit_identifier = find_identifier(db, barcode) - - if not kit_identifier: - LOG.warning(f"Skipping kit with unknown barcode «{barcode}»") - continue - - if kit_identifier.set_name not in expected_identifier_sets["kits"]: - LOG.warning(f"Skipping kit with identifier found in " + - f"set «{kit_identifier.set_name}» not {expected_identifier_sets['kits']}") - continue - - details = { - "type": code["type"] - } - - kit, status = upsert_kit_with_encounter(db, - identifier = kit_identifier.uuid, - encounter_id = encounter.id, - additional_details = details) - - if status == "updated": - update_kit_samples(db, kit) - - mark_enrollment_processed(db, enrollment.id) - - LOG.info(f"Finished processing enrollment {enrollment.id}") - - -def find_encounter(db: DatabaseSession, identifier: str) -> Any: - """ - Given an *identifier* find the corresponding encounter within the - relational warehouse. - """ - LOG.debug(f"Looking up encounter with identifier «{identifier}»") - - encounter = db.fetch_row(""" - select encounter_id as id, - encounter.identifier as identifier, - site.identifier as site - from warehouse.encounter - join warehouse.site using (site_id) - where encounter.identifier = %s - """, (identifier,)) - - if not encounter: - return None - - LOG.info(f"Found encounter «{encounter.id}»") - return encounter - - -def upsert_kit_with_encounter(db: DatabaseSession, - identifier: str, - encounter_id: int, - additional_details: dict) -> Tuple[KitRecord, str]: - """ - Upsert kit by its *identifier* to include link to encounter. - - An existing kit has its *encounter_id* updated and the - provided *additional_details* are merged (at the top level only into) - the existing kit details, if any. - """ - LOG.debug(f"Upserting kit «{identifier}»") - - data = { - "identifier": identifier, - "encounter_id": encounter_id, - "additional_details": Json(additional_details) - } - - # Look for existing kit - kit = find_kit(db, identifier) - - # Nothing found → create - if not kit: - LOG.info("Creating new kit") - status = "created" - kit = db.fetch_row(""" - insert into warehouse.kit (identifier, encounter_id, details) - values(%(identifier)s, - %(encounter_id)s, - %(additional_details)s) - returning kit_id as id, identifier, encounter_id, null rdt_sample_id, null utm_sample_id - """, data) - - # Found kit → update - else: - status = "updated" - # Warn if kit is already linked to a different encounter! - if kit.encounter_id and kit.encounter_id != encounter_id: - LOG.warning(f"Kit «{kit.id}» already linked to another encounter «{kit.encounter_id}», linking with «{encounter_id}» instead") - - kit = db.fetch_row(""" - update warehouse.kit - set encounter_id = %(encounter_id)s, - details = coalesce(details, '{}') || %(additional_details)s - - where kit_id = %(kit_id)s - - returning kit_id as id, identifier, encounter_id, rdt_sample_id, utm_sample_id - """, { **data, "kit_id": kit.id }) - - assert kit.id, "Upsert affected no rows!" - - LOG.info(f"Upserted kit {kit.id} with identifier «{kit.identifier}» linked to encounter «{kit.encounter_id}»") - - return kit, status - - -def mark_enrollment_processed(db, enrollment_id: int) -> None: - LOG.debug(f"Marking enrollment {enrollment_id} as processed") - - data = { - "enrollment_id": enrollment_id, - "log_entry": Json({ - "revision": ENROLLMENTS_REVISION, - "etl": ETL_NAME, - "timestamp": datetime.now(timezone.utc), - }) - } - - with db.cursor() as cursor: - cursor.execute(""" - update receiving.enrollment - set processing_log = processing_log || %(log_entry)s - where enrollment_id = %(enrollment_id)s - """, data) - -@kits.command("manifest", help = __doc__) -@with_database_session - -def kit_manifests(*, db: DatabaseSession): - LOG.debug(f"Starting the kits manifests ETL routine, revision {MANIFEST_REVISION}") - - LOG.debug("Fetching unprocessed manifest records") - - manifest = db.cursor("manifest") - manifest.execute(""" - select manifest_id as id, document - from receiving.manifest - where not processing_log @> %s - order by id - for update - """, (Json([{ "etl": ETL_NAME, "revision": MANIFEST_REVISION }]),)) - - for manifest_record in manifest: - with db.savepoint(f"manifest record {manifest_record.id}"): - LOG.info(f"Processing record {manifest_record.id}") - - # Mark record as skipped - # if it does not contain a kit related sample - if "kit" not in manifest_record.document: - LOG.info(f"Skipping manifest record {manifest_record.id} without kit data") - mark_skipped(db, manifest_record.id) - continue - - sample_barcode = manifest_record.document.pop("sample") - sample_identifier = find_identifier(db, sample_barcode) - - # Mark record as skipped - # if it has an unknown sample barcode - if not sample_identifier: - LOG.warning(f"Skipping manifest record with unknown sample barcode «{sample_barcode}»") - mark_skipped(db, manifest_record.id) - continue - - # Mark record as skipped sample identifier set is unexpected - if sample_identifier.set_name not in expected_identifier_sets["samples"]: - LOG.warning(f"Skipping manifest record with sample identifier found in " + - f"set «{sample_identifier.set_name}», not {expected_identifier_sets['samples']}") - mark_skipped(db, manifest_record.id) - continue - - # Find sample that should have been created from this - # manifest record via etl manifest - sample = find_sample(db, sample_identifier.uuid) - - # Error out the kit etl process if no sample found - # The kit etl process can try again starting with this record - # next time with the idea that the sample will be - # created by then. - if not sample: - raise SampleNotFoundError(f"No sample with «{sample_identifier.uuid}» found") - - # Mark record as skipped if the sample does not have a - # sample type (utm or rdt) - if sample.type not in {"utm", "rdt"}: - LOG.info(f"Skipping manifest record {manifest_record.id} "+ - f"with unknown sample type {sample.type}") - mark_skipped(db, manifest_record.id) - continue - - kit_barcode = manifest_record.document.pop("kit") - kit_identifier = find_identifier(db, kit_barcode) - - # Mark record as skipped if it has an unknown kit barcode - if not kit_identifier: - LOG.warning(f"Skipping kit with unknown barcode «{kit_barcode}»") - mark_skipped(db, manifest_record.id) - continue - - # Mark record as skipped if kit identifier set is unexpected - if kit_identifier.set_name not in expected_identifier_sets["kits"]: - LOG.warning(f"Skipping kit with identifier found in " + - f"set «{kit_identifier.set_name}» not {expected_identifier_sets['kits']}") - mark_skipped(db, manifest_record.id) - continue - - # List of extra data not needed for kit record that can - # be removed before adding manifest document to kit details - extra_data = ["collection", "sample_type", - "aliquot_date", "aliquots", "racks"] - for key in extra_data: - manifest_record.document.pop(key, None) - - # Try to find identifier for the test-strip barcode for rdt samples - if sample.type == "rdt": - update_test_strip(db, manifest_record.document) - - kit, status = upsert_kit_with_sample(db, - identifier = kit_identifier.uuid, - sample = sample, - additional_details = manifest_record.document) - - if status == "updated": - update_sample(db, sample, kit.encounter_id) - - mark_loaded(db, manifest_record.id, status, kit.id) - - -def find_sample(db: DatabaseSession, identifier: str) -> Optional[SampleRecord]: - """ - Given an *identifier* find the corresponding sample within the - database. - """ - LOG.debug(f"Looking up sample with identifier «{identifier}»") - - sample = db.fetch_row(""" - select sample_id as id, - identifier, - encounter_id, - details ->> 'sample_type' as type - from warehouse.sample - where sample.identifier = %s - """, (identifier,)) - - if not sample: - return None - - LOG.info(f"Found sample «{sample.id}»") - return sample - - -def update_test_strip(db: DatabaseSession, document: dict): - """ - Find identifier that matches the test_strip barcode within *document*. - Updates *document* to have both the test_strip barcode and - the identifier if found. - """ - strip_barcode = document["test_strip"] - strip_identifier = find_identifier(db, strip_barcode) - - document["test_strip"] = { - "uuid": None, - "barcode": strip_barcode - } - - if not strip_identifier: - LOG.warning(f"Test strip has unknown barcode «{strip_barcode}»") - - elif strip_identifier.set_name not in expected_identifier_sets["test-strips"]: - LOG.warning(f"Test strip barcode found in unexpected identifier set «{strip_identifier.set_name}»") - - else: - document["test_strip"] = { - "uuid": strip_identifier.uuid, - "barcode": strip_identifier.barcode - } - - -def upsert_kit_with_sample(db: DatabaseSession, - identifier: str, - sample: SampleRecord, - additional_details: dict) -> Tuple[KitRecord, str]: - """ - Upsert kit by its *identifier* to include link to a sample. - - An existing kit has its *sample_id* updated and the provided - *additional_details* are merged (at the top level only into) - the existing kit details, if any. - """ - LOG.debug(f"Upserting kit «{identifier}»") - - data = { - "identifier": identifier, - "sample_id": sample.id, - "additional_details": Json(additional_details) - } - - if sample.type == 'utm': - sample_type = "utm_sample_id" - elif sample.type == 'rdt': - sample_type = "rdt_sample_id" - - # Look for existing kit - kit = find_kit(db, identifier) - - # Nothing found → create - if not kit: - LOG.info("Creating new kit") - status = "created" - kit = db.fetch_row(sql.SQL(""" - insert into warehouse.kit (identifier, {}, details) - values(%(identifier)s, - %(sample_id)s, - %(additional_details)s) - returning kit_id as id, - identifier, - encounter_id, - {} - """).format(sql.Identifier(sample_type), - sql.Identifier(sample_type)), data) - - # Found kit → update - else: - status = "updated" - kit_sample_id = getattr(kit, sample_type) - # Warn if kit is already linked to a different sample! - if (kit_sample_id and (sample.id != kit_sample_id)): - LOG.warning(f"Kit «{kit.id}» already linked to another " + - f"{sample_type} «{kit_sample_id}», linking with «{sample.id}» instead") - - kit = db.fetch_row(sql.SQL(""" - update warehouse.kit - set {} = %(sample_id)s, - details = coalesce(details, {}) || %(additional_details)s - - where kit_id = %(kit_id)s - - returning kit_id as id, - identifier, - encounter_id, - {} - """).format(sql.Identifier(sample_type), - sql.Literal(Json({})), - sql.Identifier(sample_type)), - { **data, "kit_id": kit.id }) - - assert kit.id, "Upsert affected no rows!" - - LOG.info(f"Upserted kit {kit.id} with identifier «{kit.identifier}» " + - f"linked to {sample_type} «{getattr(kit, sample_type)}»") - - return kit, status - - -def mark_loaded(db, manifest_id: int, status: str, kit_id: int) -> None: - LOG.debug(f"Marking kit sample manifest record {manifest_id} as loaded") - mark_manifest_processed(db, manifest_id, { "status": status, "kit_id": kit_id }) - - -def mark_skipped(db, manifest_id: int) -> None: - LOG.debug(f"Marking sample manifest record {manifest_id} as skipped") - mark_manifest_processed(db, manifest_id, { "status": "skipped" }) - - -def mark_manifest_processed(db, manifest_id: int, entry = {}) -> None: - LOG.debug(f"Marking manifest {manifest_id} as processed") - - data = { - "manifest_id": manifest_id, - "log_entry": Json({ - **entry, - "revision": MANIFEST_REVISION, - "etl": ETL_NAME, - "timestamp": datetime.now(timezone.utc) - }) - } - - with db.cursor() as cursor: - cursor.execute(""" - update receiving.manifest - set processing_log = processing_log || %(log_entry)s - where manifest_id = %(manifest_id)s - """, data) - - -def find_kit(db: DatabaseSession, identifier: str) -> KitRecord: - """ - Look for kit using *identifier* within the database - """ - kit: KitRecord = db.fetch_row(""" - select kit_id as id, identifier, encounter_id, rdt_sample_id, utm_sample_id - from warehouse.kit - where identifier = %s - for update - """, (identifier,)) - - return kit - - -def update_kit_samples(db: DatabaseSession, kit: KitRecord): - """ - After upserting kit, update the samples linked to the kit. - """ - if kit.rdt_sample_id: - - rdt_sample = find_sample_by_id(db, kit.rdt_sample_id) - - if rdt_sample: - update_sample(db, - sample = rdt_sample, - encounter_id = kit.encounter_id) - - if kit.utm_sample_id: - - utm_sample = find_sample_by_id(db, kit.utm_sample_id) - - if utm_sample: - - update_sample(db, - sample = utm_sample, - encounter_id = kit.encounter_id) - - -class EncounterNotFoundError(ValueError): - """ - Raised by the kit enrollments etl if it cannot find an encounter within - the relational warehouse using the provided *identifier* - """ - pass - - -class SampleNotFoundError(ValueError): - """ - Raised by the kit manifest etl if it cannot find a sample within - the relational warehouse using the provided *identifier* - """ - pass From 28ac563827bfb60024555bb53673c7dd79f5cdaf Mon Sep 17 00:00:00 2001 From: Kairsten Fay Date: Fri, 26 Jun 2020 16:06:24 -0700 Subject: [PATCH 4/4] api: Deprecate the /receiving/enrollment endpoint We no longer receive enrollment data from Audere. Delete this endpoint. --- lib/id3c/api/datastore.py | 21 --------------------- lib/id3c/api/routes.py | 22 ---------------------- lib/id3c/api/static/index.html | 11 +++++++---- 3 files changed, 7 insertions(+), 47 deletions(-) diff --git a/lib/id3c/api/datastore.py b/lib/id3c/api/datastore.py index f9889e64b..1fc82d793 100644 --- a/lib/id3c/api/datastore.py +++ b/lib/id3c/api/datastore.py @@ -51,27 +51,6 @@ def login(username: str, password: str) -> DatabaseSession: raise AuthenticationRequired() from None -@export -@catch_permission_denied -def store_enrollment(session: DatabaseSession, document: str) -> None: - """ - Store the given enrollment JSON *document* (a **string**) in the backing - database using *session*. - - Raises a :class:`BadRequestDatabaseError` exception if the given *document* - isn't valid and a :class:`Forbidden` exception if the database reports a - `permission denied` error. - """ - with session, session.cursor() as cursor: - try: - cursor.execute( - "INSERT INTO receiving.enrollment (document) VALUES (%s)", - (document,)) - - except (DataError, IntegrityError) as error: - raise BadRequestDatabaseError(error) from None - - @export @catch_permission_denied def store_presence_absence(session: DatabaseSession, document: str) -> None: diff --git a/lib/id3c/api/routes.py b/lib/id3c/api/routes.py index b582c482b..d0f13dc16 100644 --- a/lib/id3c/api/routes.py +++ b/lib/id3c/api/routes.py @@ -29,28 +29,6 @@ def index(): return send_file("static/index.html", "text/html; charset=UTF-8") -@api_v1.route("/receiving/enrollment", methods = ['POST']) -@api_unversioned.route("/enrollment", methods = ['POST']) -@content_types_accepted(["application/json"]) -@check_content_length -@authenticated_datastore_session_required -def receive_enrollment(*, session): - """ - Receive a new enrollment document. - - POST /enrollment with a JSON body. Note that we don't actually need to - parse the JSON body. The body is passed directly to the database which - will check its validity. - """ - document = request.get_data(as_text = True) - - LOG.debug(f"Received enrollment") - - datastore.store_enrollment(session, document) - - return "", 204 - - @api_v1.route("/receiving/presence-absence", methods = ['POST']) @api_unversioned.route("/presence-absence", methods = ['POST']) @content_types_accepted(["application/json"]) diff --git a/lib/id3c/api/static/index.html b/lib/id3c/api/static/index.html index 4d25a6b23..e27667684 100644 --- a/lib/id3c/api/static/index.html +++ b/lib/id3c/api/static/index.html @@ -36,10 +36,6 @@

Status codes

Routes

-

POST /v1/receiving/enrollment

-

Stores the request data as an enrollment document in a receiving area of - the study database. Accepts any JSON object. -

POST /v1/receiving/presence-absence

Stores the request data as presence/absence calls in a receiving area of the study database. Accepts any JSON object. @@ -113,5 +109,12 @@

POST /sequence-read-set

] } +

Deprecated Routes

+

+ These routes are no longer supported. +

+ +

POST /v1/receiving/enrollment

+