Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,12 @@
Change Log
==========

3.14.0
=====
* Add `purge_meta_workflow_run` function to `wrangler_utils` to delete a MetaWorkflowRun and its associated files.
* Add support for new statuses


3.13.0
=====
* Add `conversion-bam-to-cram` MetaWorkflowRun creation function for SMaHT
Expand Down
54 changes: 49 additions & 5 deletions magma_smaht/commands/wrangler_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -218,10 +218,8 @@ def merge_qc_items(file_accessions, mode, auth_env):
@click.option(
"-r",
"--release",
default=False,
is_flag=True,
show_default=True,
help="Release the remaining QC item",
type=str,
help="If set, release the remaining QC item with the given status",
)
@click.option(
"-e",
Expand All @@ -235,7 +233,7 @@ def replace_qc_item(file_accession, keep_index, release, auth_env):
Replace the QC item of a file with the one at the given index.
If a file has multiple QC items, this command will remove all but the one with given index.
Can be useful if QC has been rerun and the old QC item is no longer needed. This function also
releases the remaining QC item if the release flag is set.
releases the remaining QC item if the release argument is set.
"""
smaht_key = get_auth_key(auth_env)
wrangler_utils.replace_qc_item(file_accession, keep_index, release, smaht_key)
Expand Down Expand Up @@ -395,6 +393,52 @@ def purge_fileset(
)


@cli.command()
@click.help_option("--help", "-h")
@click.option(
"-m",
"--mwfr-accession",
required=True,
type=str,
help="MetaWorkflowRun accessions",
)
@click.option(
"-d",
"--dry-run",
default=False,
is_flag=True,
show_default=True,
help="Dry run",
)
@click.option(
"-y",
"--assume-yes",
default=False,
is_flag=True,
show_default=True,
help="Assume yes to all prompts",
)
@click.option(
"-e",
"--auth-env",
required=True,
type=str,
help="Name of environment in smaht-keys file",
)
def purge_meta_workflow_run(mwfr_accession, dry_run, assume_yes, auth_env):
"""
Delete all files in a MetaWorkflowRun, delete MetaWorkflowRun itself.
Use with caution!
"""
smaht_key = get_auth_key(auth_env)
wrangler_utils.purge_meta_workflow_run(
mwfr_accession,
dry_run,
assume_yes,
smaht_key,
)


@cli.command()
@click.help_option("--help", "-h")
@click.option(
Expand Down
7 changes: 7 additions & 0 deletions magma_smaht/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,13 @@
DISPLAY_TITLE = "display_title"
ALIASES = "aliases"
UPLOADED = "uploaded"
RELEASED = "released"
OPEN = "open"
OPEN_NETWORK = "open-network"
OPEN_EARLY = "open-early"
PROTECTED = "protected"
PROTECTED_NETWORK = "protected-network"
PROTECTED_EARLY = "protected-early"
DELETED = "deleted"
STATUS = "status"
FIRST_STRANDED = "First Stranded"
Expand Down
40 changes: 38 additions & 2 deletions magma_smaht/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
################################################
# Libraries
################################################
import pprint
import functools
import json, uuid
from pathlib import Path
from typing import Any, Dict, Sequence
Expand Down Expand Up @@ -314,12 +316,42 @@ def get_donors_from_mwfr(mwfr, smaht_key):
print(f"Can't get donor from sample source {sample_source['uuid']}")
continue

donors_list = get_all_donors("object", smaht_key)
donors_from_portal = {}
for donor in donors_list:
donors_from_portal[donor[UUID]] = donor

donors = []
donor_ids = list(set(donor_ids))
for donor_id in donor_ids:
donor = get_item(donor_id, smaht_key, frame='object')
donors.append(donor)
donors.append(donors_from_portal[donor_id])
return donors

def get_all_donors(frame, smaht_key):
"""Get all donors in the portal (cached version)

Args:
frame (str): Frame type for the request
smaht_key (dict): SMaHT key

Returns:
list: List of donor items from portal
"""
serialized_key = _serialize_key(smaht_key)
return _get_all_donors_cached(frame, serialized_key)

@functools.lru_cache(maxsize=128)
def _get_all_donors_cached(frame, serialized_key):
"""Internal cached function that works with hashable parameters."""
smaht_key = json.loads(serialized_key)

query = f"/search/?type=Donor&field=uuid&limit=2000"
search_results = ff_utils.search_metadata(query, key=smaht_key)

donors = []
for donor in search_results:
donor_item = get_item(donor[UUID], smaht_key, frame=frame)
donors.append(donor_item)
return donors


Expand Down Expand Up @@ -550,3 +582,7 @@ def get_item_es(identifier, key, frame="raw"):
return ff_utils.get_metadata(
identifier, add_on=f"frame={frame}", key=key
)

def _serialize_key(key_dict):
"""Convert dictionary key to a hashable string for caching."""
return json.dumps(key_dict, sort_keys=True)
109 changes: 102 additions & 7 deletions magma_smaht/wrangler_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
get_latest_somalier_run_for_donor,
generate_input_structure,
mwfr_from_input,
get_all_donors
)

from magma_smaht.constants import (
Expand All @@ -36,6 +37,13 @@
TAGS,
STATUS,
DELETED,
RELEASED,
OPEN,
OPEN_NETWORK,
OPEN_EARLY,
PROTECTED,
PROTECTED_NETWORK,
PROTECTED_EARLY,
FAILED_JOBS,
)

Expand All @@ -46,6 +54,8 @@

SUPPORTED_MWF = [MWF_NAME_CRAM_TO_FASTQ_PAIRED_END, WF_BAM_TO_FASTQ_PAIRED_END]

RELEASED_STATUSES = [RELEASED, OPEN, OPEN_NETWORK, OPEN_EARLY, PROTECTED, PROTECTED_NETWORK, PROTECTED_EARLY]


def associate_conversion_output_with_fileset(
mwfr_identifier: str, smaht_key: dict
Expand Down Expand Up @@ -290,7 +300,7 @@ def merge_qc_items(file_accession: str, mode: str, smaht_key: dict):


def replace_qc_item(
file_accession: str, keep_index: int, release: bool, smaht_key: dict
file_accession: str, keep_index: int, release: str, smaht_key: dict
):
file = ff_utils.get_metadata(file_accession, smaht_key)
file_uuid = file["uuid"]
Expand All @@ -312,7 +322,7 @@ def replace_qc_item(
zip_url = qm_item["url"]
zip_uuid = zip_url.split("/")[3]
try:
patch_body = {"status": "released"}
patch_body = {"status": release}
ff_utils.patch_metadata(patch_body, obj_id=qm_uuid_to_keep, key=smaht_key)
ff_utils.patch_metadata(patch_body, obj_id=zip_uuid, key=smaht_key)
print(f"QC item {qm_item[ACCESSION]} released.")
Expand Down Expand Up @@ -424,18 +434,46 @@ def sample_identity_check_status(num_files: int, smaht_key: dict):
"&sequencing_center.display_title=NYGC+GCC"
"&sequencing_center.display_title=BCM+GCC"
"&status=uploaded&status=released"
"&status=open&status=open-network&status=open-early"
"&status=protected&status=protected-network&status=protected-early"
"&file_format.display_title=bam"
"&file_format.display_title=cram"
"&output_status=Final Output"
"&quality_metrics.display_title%21=No+value"
f"&limit={num_files}"
"&sort=date_created"
"&meta_workflow_run_inputs.meta_workflow.name%21=sample_identity_check"
"&description%21=Annotated FLNC output BAM" # Exclude Kinnex FLNC BAMs as they don't show high relatedness values
)
output_files = ff_utils.search_metadata(f"/search/{search_filter}", key=smaht_key)

# Get all donors at once. That's faster than looking them up one by one
donors_list = get_all_donors("object", smaht_key)
donors = {}
for donor in donors_list:
donors[donor[UUID]] = donor

status = {}
for output in output_files:
print(f"Checking file {output['display_title']}")

# if the file is released, the donor is just on the item
if output.get("status") in RELEASED_STATUSES:
donors_from_file = output.get("donors", [])
if len(donors_from_file) != 1:
print(
f"Warning: Expected 1 donor but found {len(donors_from_file)} for file {output['accession']}"
)
continue
donor = donors_from_file[0]
donor_uuid = donor[UUID]

if donor_uuid not in status:
status[donor_uuid] = []
status[donor_uuid].append(output[ACCESSION])
continue


mwfrs = output.get("meta_workflow_run_outputs")
if not mwfrs:
print(
Expand All @@ -445,10 +483,10 @@ def sample_identity_check_status(num_files: int, smaht_key: dict):
mwfr = mwfrs[0]
mwfr = get_item(mwfr[UUID], smaht_key, frame="embedded")

# Only consider files that are outputs for alignment workflows
if "Alignment" not in mwfr["meta_workflow"]["category"]:
# Only consider files that are outputs for alignment workflows or bam2cram conversions
if "Alignment" not in mwfr["meta_workflow"]["category"] and mwfr["meta_workflow"]["name"] != "bam_to_cram":
print(
f"Warning: File {output['accession']} is not result of an alignment MWF. Skipping."
f"Warning: File {output['accession']} is not result of an alignment or bam2cram MWF. Skipping."
)
continue

Expand All @@ -466,7 +504,6 @@ def sample_identity_check_status(num_files: int, smaht_key: dict):
continue
donor = donors_from_mwf[0]
donor_uuid = donor[UUID]
donors[donor_uuid] = donor

if donor_uuid not in status:
status[donor_uuid] = []
Expand Down Expand Up @@ -578,6 +615,64 @@ def purge_fileset(
print(f"Error deleting item {item_uuid}: {str(e)}")


def purge_meta_workflow_run(
mwfr_accession: str,
dry_run: bool,
assume_yes: bool,
smaht_key: dict,
):
"""Delete all files in a MetaWorkflowRun, delete MetaWorkflowRun itself.

Args:
mwfr_accession (str): MetaWorkflowRun accession
dry_run (bool): If True, do not delete files but print what would be deleted
smaht_key (dict): SMaHT key
"""

items_to_delete = []

print(f"Deleting MetaWorkflowRun {mwfr_accession}:")
mwfr_item = get_item(mwfr_accession, smaht_key, frame="embedded")
print(f" - MetaWorkflowRun {mwfr_accession}")
items_to_delete.append(mwfr_item[UUID])

workflow_runs = mwfr_item.get("workflow_runs", [])
for wfr in workflow_runs:
wfr_display_title = wfr["workflow_run"][DISPLAY_TITLE]
wfr_uuid = wfr["workflow_run"][UUID]
print(f" - WorkflowRun {wfr_display_title}")
items_to_delete.append(wfr_uuid)
outputs = wfr.get("output", [])
for output in outputs:
file = output.get("file", {})
if file and file.get(STATUS) != DELETED:
print(f" - File {file[DISPLAY_TITLE]}")
items_to_delete.append(file[UUID])

if dry_run:
print(f"\nDRY RUN: Nothing deleted.")
return
else:
confirm = (
input("Are you sure you want to continue? [y/N]: ").strip().lower()
if not assume_yes
else "y"
)

if confirm != "y":
print(f"Aborted deletion of MetaWorkflowRun {mwfr_accession} and its files.")
return

for item_uuid in items_to_delete:
try:
ff_utils.patch_metadata(
{STATUS: DELETED}, obj_id=item_uuid, key=smaht_key
)
print(f"Deleted item {item_uuid}.")
except Exception as e:
print(f"Error deleting item {item_uuid}: {str(e)}")


def print_error_and_exit(error):
print(error)
exit()
Expand All @@ -591,7 +686,7 @@ def set_property(uuid: str, prop_key: str, prop_value: Any, smaht_key: Dict[str,
print(f"Set item {uuid} property {prop_key} to {prop_value}.")
except Exception as e:
raise Exception(f"Item could not be PATCHed: {str(e)}")


def remove_property(uuid: str, property: str, smaht_key: Dict[str, Any]):
""" "Removes a property from an item."""
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "magma-suite"
version = "3.13.0"
version = "3.14.0"
description = "Collection of tools to manage meta-workflows automation."
authors = ["Michele Berselli <berselli.michele@gmail.com>", "Doug Rioux", "Soo Lee", "CGAP team"]
license = "MIT"
Expand Down