Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
688 changes: 0 additions & 688 deletions app/ms1/.~c9_invoke_7YosxX.py

This file was deleted.

1,207 changes: 0 additions & 1,207 deletions app/ms1/.~c9_invoke_x896LQ.py

This file was deleted.

145 changes: 123 additions & 22 deletions app/ms1/nta_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,12 @@ def __init__(
self.qnta_samples = None
self.val_df = val_df
self.spectra_inputs = ms2_inputs

# Build workflow steps dynamically based on parameters
self.workflow_steps = self._build_workflow_steps()
self.current_step_index = 0
self.current_step_progress = 0 # 0-100 within current step
self.total_workflow_weight = sum(step["weight"] for step in self.workflow_steps)
self.qnta_dfs_out = None
self.ests_out = [None, None]
self.qnta_occ_input = None
Expand Down Expand Up @@ -215,6 +221,45 @@ def __init__(
self.aq_plots = []
self.ecdf_plots = []

def _build_workflow_steps(self):
steps = [
{"name": "Processing", "weight": 2},
{"name": "Check for existence of required columns", "weight": 3},
{"name": "Flagging duplicates", "weight": 8},
{"name": "Calculating statistics", "weight": 10},
{"name": "Create heatmap", "weight": 5},
{"name": "Checking tracers", "weight": 7},
{"name": "Create scatterplot", "weight": 5},
]

# Add optional qNTA step if enabled
if self.parameters and self.parameters.get("do_qnta", [None, "no"])[1] == "yes":
steps.append({"name": "Performing qNTA", "weight": 12})

steps.extend([
{"name": "Cleaning features", "weight": 8},
{"name": "Merge detection counts onto tracers", "weight": 3},
{"name": "Combining modes", "weight": 5},
])

# Add optional database search steps if enabled
if self.parameters and self.parameters.get("search_dsstox", [None, "no"])[1] == "yes":
steps.append({"name": "Searching dsstox database", "weight": 20})

if self.parameters and self.parameters.get("search_hcd", [None, "no"])[1] == "yes":
steps.append({"name": "Searching Cheminformatics Hazard Module database", "weight": 10})

# Add optional MS2 step if enabled
if self.parameters and self.parameters.get("do_ms2", [None, "no"])[1] == "yes":
steps.append({"name": "Performing MS2", "weight": 15})

steps.extend([
{"name": "Storing data", "weight": 5},
{"name": "Displaying results", "weight": 2}
])

return steps

def log_memory_usage(self, step_name):
"""Logs the current memory usage."""
process = psutil.Process()
Expand Down Expand Up @@ -561,44 +606,100 @@ def create_run_sequence_sheets(self):

return

def set_status(self, status, create=False):
def set_status(self, status, create=False, step_progress=0):
"""
Accepts a string (e.g., "Processing", or "Step Completed") and, if create is TRUE,
post status to the logger with the Job ID and timestamp.

Args:
status (string)
create (Boolean)
step_progress (int): Progress within current step (0-100)
Notes:
In a future version, we would like to display status on the UI
Enhanced with step-based progress tracking
Returns:
None
"""
# Find step index and update progress tracking
step_index = next((i for i, step in enumerate(self.workflow_steps)
if step["name"] == status), self.current_step_index)

self.current_step_index = step_index
self.current_step_progress = step_progress

# Calculate overall percentage
total_weight = sum(step["weight"] for step in self.workflow_steps)

completed_weight = 0
current_step_weight = 0

for i, step in enumerate(self.workflow_steps):
if step["name"] == status:
current_step_weight = step["weight"]
break
completed_weight += step["weight"]

overall_percentage = ((completed_weight + (current_step_weight * step_progress / 100))
/ total_weight * 100) if total_weight > 0 else 0

posts = self.mongo.posts
# Get time stamp
time_stamp = datetime.utcnow()
# Generate ID with status
post_id = self.jobid + "_" + "status"

base_data = {
"_id": post_id,
"status": status,
"current_step_index": step_index,
"current_step_progress": step_progress,
"overall_percentage": f"{overall_percentage:.1f}",
"total_steps": len(self.workflow_steps),
"workflow_steps": [step["name"] for step in self.workflow_steps],
}

# If create, post update
if create:
posts.update_one(
{"_id": post_id},
{
"$set": {
"_id": post_id,
"date": time_stamp,
"status": status,
"error_info": "",
}
},
upsert=True,
)
else:
posts.update_one(
{"_id": post_id},
{"$set": {"_id": post_id, "status": status}},
upsert=True,
)
base_data.update({
"date": time_stamp,
"error_info": "",
})

posts.update_one(
{"_id": post_id},
{"$set": base_data},
upsert=True,
)

def update_step_progress(self, progress_percentage):
"""Update progress within the current step (0-100)"""
self.current_step_progress = progress_percentage

# Calculate overall percentage
total_weight = sum(step["weight"] for step in self.workflow_steps)

completed_weight = 0
current_step_weight = 0

for step in self.workflow_steps:
if step["name"] == self.step:
current_step_weight = step["weight"]
break
completed_weight += step["weight"]

overall_percentage = ((completed_weight + (current_step_weight * progress_percentage / 100))
/ total_weight * 100) if total_weight > 0 else 0

posts = self.mongo.posts
post_id = self.jobid + "_" + "status"
posts.update_one(
{"_id": post_id},
{"$set": {
"current_step_progress": progress_percentage,
"overall_percentage": f"{overall_percentage:.1f}",
}},
upsert=True,
)

def set_except_message(self, e):
"""
Expand Down Expand Up @@ -1654,7 +1755,7 @@ def perform_MS2(self):
self.spectra_inputs[0],
self.search_results,
mode="pos",
parameters=parameters,
parameters=self.parameters,
)
# Run execute function
ms2_pos.execute()
Expand All @@ -1675,7 +1776,7 @@ def perform_MS2(self):
self.spectra_inputs[1],
self.search_results,
mode="neg",
parameters=parameters,
parameters=self.parameters,
)
# Run execute function
ms2_neg.execute()
Expand Down
Loading
Loading