From 8ed53a4fddfd09d7eae5ca09ef78b326e885daba Mon Sep 17 00:00:00 2001 From: Stuart Fisher Date: Fri, 27 Jan 2023 16:18:23 +0100 Subject: [PATCH 1/2] add queued samples routes --- pyispyb/core/modules/containers.py | 256 ++++++++++++++++++++++++++++- pyispyb/core/routes/containers.py | 75 ++++++++- pyispyb/core/schemas/containers.py | 90 +++++++++- 3 files changed, 415 insertions(+), 6 deletions(-) diff --git a/pyispyb/core/modules/containers.py b/pyispyb/core/modules/containers.py index 194c7fae..a452c3af 100644 --- a/pyispyb/core/modules/containers.py +++ b/pyispyb/core/modules/containers.py @@ -1,11 +1,18 @@ +import enum from typing import Optional -from sqlalchemy import distinct, func +from sqlalchemy import distinct, func, and_ from sqlalchemy.orm import joinedload from ispyb import models from ...app.extensions.database.definitions import with_authorization -from ...app.extensions.database.utils import Paged, page, update_model, with_metadata +from ...app.extensions.database.utils import ( + Paged, + page, + update_model, + with_metadata, + order, +) from ...app.extensions.database.middleware import db from ..schemas import containers as schema @@ -34,10 +41,10 @@ def get_containers( ) if containerId: - query = query.filter(models.Continer.containerId == containerId) + query = query.filter(models.Container.containerId == containerId) if dewarId: - query = query.filter(models.Continer.dewarId == dewarId) + query = query.filter(models.Container.dewarId == dewarId) if proteinId: query = query.filter(models.Crystal.proteinId == proteinId) @@ -78,3 +85,244 @@ def update_container( db.session.commit() return get_containers(containerId=containerId, skip=0, limit=1).first + + +def get_queued_containers( + skip: int, + limit: int, + proposal: Optional[str] = None, + beamLineName: Optional[str] = None, + containerId: Optional[int] = None, + containerQueueId: Optional[int] = None, +) -> Paged[models.ContainerQueue]: + metadata = { + "samples": func.count( + distinct(models.ContainerQueueSample.containerQueueSampleId) + ) + } + query = ( + db.session.query(models.ContainerQueue, *metadata.values()) + .select_from(models.ContainerQueue) + .join(models.ContainerQueueSample) + .join(models.Container) + .options(joinedload(models.ContainerQueue.Container)) + .join(models.Dewar) + .options(joinedload(models.ContainerQueue.Container, models.Container.Dewar)) + .join(models.Shipping) + .options( + joinedload( + models.ContainerQueue.Container, + models.Container.Dewar, + models.Dewar.Shipping, + ) + ) + .join(models.Proposal, models.Proposal.proposalId == models.Shipping.proposalId) + .group_by(models.ContainerQueue.containerQueueId) + ) + + if containerQueueId: + query = query.filter(models.ContainerQueue.containerQueueId == containerQueueId) + + if proposal: + query = query.filter(models.Proposal.proposal == proposal) + + if beamLineName: + query = query.filter(models.Container.beamlineLocation == beamLineName) + + if containerId: + query = query.filter(models.Container.containerId == containerId) + + query = with_authorization(query, proposal) + total = query.count() + query = page(query, skip=skip, limit=limit) + results = with_metadata(query.all(), list(metadata.keys())) + + return Paged(total=total, results=results, skip=skip, limit=limit) + + +def update_queued_container( + containerQueueId: int, containerQueue: schema.ContainerQueueUpdate +) -> models.ContainerQueue: + new_container_queue = get_queued_containers( + containerQueueId=containerQueueId, skip=0, limit=1 + ).first + + if containerQueue.completed: + new_container_queue.completedTimeStamp = func.now() + db.session.commit() + + return get_queued_containers( + containerQueueId=containerQueueId, skip=0, limit=1 + ).first + + +QUEUED_SAMPLE_STATUS_FILTERS = { + "Queued": func.count(distinct(models.DataCollection.dataCollectionId)) == 0, + "Completed": func.count(distinct(models.DataCollection.dataCollectionId)) > 0, + "Failed": func.sum( + func.IF( + and_( + models.DataCollection.runStatus.notlike("%success%"), + models.DataCollection.runStatus != None, # noqa + ), + 1, + 0, + ) + ) + > 0, +} + +QUEUED_SAMPLE_STATUS_ENUM = enum.Enum( + "QueuedSampleStatus", {k: k for k in QUEUED_SAMPLE_STATUS_FILTERS.keys()} +) + +QUEUED_SAMPLE_ORDER_BY_MAP = { + "containerQueueSampleId": models.ContainerQueueSample.containerQueueSampleId, + "started": func.min(models.DataCollection.startTime), + "finished": func.max(models.DataCollection.endTime), +} + + +def get_queued_samples( + skip: int, + limit: int, + proposal: Optional[str] = None, + blSampleId: Optional[int] = None, + beamLineName: Optional[str] = None, + containerId: Optional[int] = None, + containerQueueSampleId: Optional[int] = None, + status: Optional[QUEUED_SAMPLE_STATUS_ENUM] = None, + sort_order: Optional[dict[str, str]] = None, +) -> Paged[models.ContainerQueueSample]: + metadata = { + "datacollections": func.group_concat( + distinct( + func.concat( + models.DataCollection.dataCollectionId, + ":", + models.DataCollection.runStatus, + ) + ) + ), + "dataCollectionGroupId": models.DataCollectionGroup.dataCollectionGroupId, + "sessionId": models.DataCollectionGroup.sessionId, + "proposal": models.Proposal.proposal, + "started": func.min(models.DataCollection.startTime), + "finished": func.max(models.DataCollection.endTime), + "types": func.group_concat(distinct(models.DataCollectionGroup.experimentType)), + } + + query = ( + db.session.query(models.ContainerQueueSample, *metadata.values()) + .select_from(models.ContainerQueueSample) + .join(models.DiffractionPlan) + .options( + joinedload( + models.ContainerQueueSample.DiffractionPlan, + ) + ) + .outerjoin( + models.BLSubSample, + models.BLSubSample.blSubSampleId + == models.ContainerQueueSample.blSubSampleId, + ) + .options( + joinedload( + models.ContainerQueueSample.BLSubSample, + ) + ) + .options( + joinedload( + models.ContainerQueueSample.BLSample, + ) + ) + .outerjoin( + models.BLSample, + models.BLSample.blSampleId == models.BLSubSample.blSampleId, + ) + .options( + joinedload( + models.ContainerQueueSample.BLSubSample, + models.BLSubSample.BLSample, + ) + ) + .outerjoin( + models.DataCollection, + models.DataCollection.dataCollectionPlanId + == models.ContainerQueueSample.dataCollectionPlanId, + ) + .outerjoin(models.DataCollectionGroup) + .join(models.ContainerQueue) + .join(models.Container) + .join(models.Dewar) + .join(models.Shipping) + .join(models.Proposal, models.Proposal.proposalId == models.Shipping.proposalId) + .group_by(models.ContainerQueueSample.containerQueueSampleId) + ) + + if containerQueueSampleId: + query = query.filter( + models.ContainerQueueSample.containerQueueSampleId == containerQueueSampleId + ) + + if proposal: + query = query.filter(models.Proposal.proposal == proposal) + + if beamLineName: + query = query.filter(models.Container.beamlineLocation == beamLineName) + + if containerId: + query = query.filter(models.Container.containerId == containerId) + + if blSampleId: + query = query.filter(models.BLSample.blSampleId == blSampleId) + + if status: + query = query.having(QUEUED_SAMPLE_STATUS_FILTERS[status.value]) + + if sort_order: + query = order( + query, + QUEUED_SAMPLE_ORDER_BY_MAP, + sort_order, + default={"order_by": "containerQueueSampleId", "order": "desc"}, + ) + + query = with_authorization(query, proposal) + total = query.count() + query = page(query, skip=skip, limit=limit) + results = with_metadata(query.all(), list(metadata.keys())) + dc_keys = ["dataCollectionId", "runStatus"] + + for result in results: + if result._metadata["types"]: + result._metadata["types"] = result._metadata["types"].split(",") + else: + result._metadata["types"] = [] + + if result._metadata["datacollections"]: + result._metadata["datacollections"] = [ + {dc_keys[i]: value for i, value in enumerate(dc.split(":"))} + for dc in result._metadata["datacollections"].split(",") + ] + else: + result._metadata["datacollections"] = [] + + return Paged(total=total, results=results, skip=skip, limit=limit) + + +def delete_queued_sample(containerQueueSampleId: int) -> None: + queued_sample = get_queued_samples( + containerQueueSampleId=containerQueueSampleId, skip=0, limit=1 + ).first + + if len(queued_sample._metadata["datacollections"]) > 0: + raise RuntimeError( + "Queued sample has related data collections so cannot be removed" + ) + + if queued_sample.DiffractionPlan: + db.session.delete(queued_sample.DiffractionPlan) + + db.session.delete(queued_sample) + db.session.commit() diff --git a/pyispyb/core/routes/containers.py b/pyispyb/core/routes/containers.py index c8cd878d..6358ec40 100644 --- a/pyispyb/core/routes/containers.py +++ b/pyispyb/core/routes/containers.py @@ -1,9 +1,10 @@ import logging +from typing import Optional from fastapi import Depends, HTTPException, status from ispyb import models -from ...dependencies import pagination +from ...dependencies import pagination, order_by_factory from ...app.extensions.database.utils import Paged from ...app.base import AuthenticatedAPIRouter from ... import filters @@ -17,6 +18,78 @@ router = AuthenticatedAPIRouter(prefix="/containers", tags=["Containers"]) +@router.get("/queue", response_model=paginated(schema.ContainerQueue)) +def get_queued_containers( + proposal: str = Depends(filters.proposal), + beamLineName: str = Depends(filters.beamLineName), + page: dict[str, int] = Depends(pagination), +) -> Paged[models.ContainerQueue]: + """Get a list of queued containers""" + return crud.get_queued_containers( + proposal=proposal, beamLineName=beamLineName, **page + ) + + +@router.patch("/queue/{containerQueueId}", response_model=schema.ContainerQueue) +def update_queued_container( + containerQueueId: int, + containerQueue: schema.ContainerQueueUpdate, +) -> models.ContainerQueue: + """Update a queued container""" + try: + return crud.update_queued_container(containerQueueId, containerQueue) + except IndexError: + raise HTTPException(status_code=404, detail="Container queue not found") + except Exception: + logger.exception( + f"Could not update container queue `{containerQueueId}` with payload `{containerQueue}`" + ) + raise HTTPException(status_code=400, detail="Could not update container queue") + + +QUEUED_SAMPLE_ORDER_BY = order_by_factory( + crud.QUEUED_SAMPLE_ORDER_BY_MAP, "QueuedSampleOrder" +) + + +@router.get("/queue/samples", response_model=paginated(schema.ContainerQueueSample)) +def get_queued_subsamples( + proposal: str = Depends(filters.proposal), + blSampleId: int = Depends(filters.blSampleId), + containerId: int = Depends(filters.containerId), + beamLineName: str = Depends(filters.beamLineName), + status: Optional[crud.QUEUED_SAMPLE_STATUS_ENUM] = None, + sort_order: dict = Depends(QUEUED_SAMPLE_ORDER_BY), + page: dict[str, int] = Depends(pagination), +) -> Paged[models.ContainerQueueSample]: + """Get a list of queued samples and sub samples""" + return crud.get_queued_samples( + proposal=proposal, + blSampleId=blSampleId, + containerId=containerId, + beamLineName=beamLineName, + status=status, + sort_order=sort_order, + **page, + ) + + +@router.delete( + "/queue/samples/{containerQueueSampleId}", status_code=status.HTTP_204_NO_CONTENT +) +def delete_queued_sample(containerQueueSampleId: int) -> None: + """Delete a queued sample""" + try: + crud.delete_queued_sample(containerQueueSampleId=containerQueueSampleId) + except IndexError: + raise HTTPException(status_code=404, detail="Queued sample not found") + except Exception as e: + logger.exception(f"Could not delete queued sample `{containerQueueSampleId}`") + raise HTTPException( + status_code=400, detail=f"Could not delete queued sample: {str(e)}" + ) + + @router.get("", response_model=paginated(schema.Container)) def get_containers( proposal: str = Depends(filters.proposal), diff --git a/pyispyb/core/schemas/containers.py b/pyispyb/core/schemas/containers.py index e56daab9..24808c1d 100644 --- a/pyispyb/core/schemas/containers.py +++ b/pyispyb/core/schemas/containers.py @@ -1,4 +1,6 @@ -from typing import Optional +import datetime +from typing import Any, Optional + from pydantic import BaseModel, Field from ispyb import models @@ -37,3 +39,89 @@ class Container(ContainerCreate): class Config: orm_mode = True + + +class ContainerQueueMetaData(BaseModel): + samples: Optional[int] = Field(description="Number of samples queued") + + +class ContainerQueue(BaseModel): + containerQueueId: int + createdTimeStamp: Optional[datetime.datetime] + completedTimeStamp: Optional[datetime.datetime] + + Container: Container + + metadata: ContainerQueueMetaData = Field(alias="_metadata") + + class Config: + orm_mode = True + + +class ContainerQueueUpdate(BaseModel): + completed: Optional[bool] + + +class ContainerQueueDataCollection(BaseModel): + dataCollectionId: int + runStatus: Optional[str] + + +class ContainerQueueSampleMetaData(BaseModel): + datacollections: list[ContainerQueueDataCollection] = Field( + description="Related data collections" + ) + dataCollectionGroupId: Optional[int] = Field( + description="Related dataCollectionGroupId" + ) + sessionId: Optional[int] = Field(description="Related sessionId") + proposal: Optional[str] = Field(description="Related proposal") + started: Optional[datetime.datetime] = Field( + description="Time first datacollection started" + ) + finished: Optional[datetime.datetime] = Field( + description="Time last datacollection ended" + ) + types: Optional[list[str]] = Field(description="Types of data collections") + + +class ContainerQueueBLSample(BaseModel): + blSampleId: int + name: str + + class Config: + orm_mode = True + + +class ContainerQueueBLSubSample(BaseModel): + type: str + + BLSample: ContainerQueueBLSample + + class Config: + orm_mode = True + + +class ContainerQueueDiffractionPlan(BaseModel): + diffractionPlanId: int + recordTimeStamp: datetime.datetime + scanParameters: Any # Optional[dict[str, Any]] + monoBandwidth: Optional[int] + + class Config: + orm_mode = True + + +class ContainerQueueSample(BaseModel): + containerQueueSampleId: int + dataCollectionPlanId: int + blSubSampleId: Optional[int] + + BLSample: Optional[ContainerQueueBLSample] + BLSubSample: Optional[ContainerQueueBLSubSample] + DiffractionPlan: Optional[ContainerQueueDiffractionPlan] + + metadata: ContainerQueueSampleMetaData = Field(alias="_metadata") + + class Config: + orm_mode = True From 36e4576f56b258a6e03932c5422c859396ce60fa Mon Sep 17 00:00:00 2001 From: Stuart Fisher Date: Wed, 8 Feb 2023 11:54:00 +0100 Subject: [PATCH 2/2] correct auth --- pyispyb/core/modules/containers.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyispyb/core/modules/containers.py b/pyispyb/core/modules/containers.py index a452c3af..4f8d1faf 100644 --- a/pyispyb/core/modules/containers.py +++ b/pyispyb/core/modules/containers.py @@ -288,7 +288,7 @@ def get_queued_samples( default={"order_by": "containerQueueSampleId", "order": "desc"}, ) - query = with_authorization(query, proposal) + query = with_authorization(query) total = query.count() query = page(query, skip=skip, limit=limit) results = with_metadata(query.all(), list(metadata.keys()))