Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
256 changes: 252 additions & 4 deletions pyispyb/core/modules/containers.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,18 @@
import enum
from typing import Optional

from sqlalchemy import distinct, func
from sqlalchemy import distinct, func, and_
from sqlalchemy.orm import joinedload
from ispyb import models

from ...app.extensions.database.definitions import with_authorization
from ...app.extensions.database.utils import Paged, page, update_model, with_metadata
from ...app.extensions.database.utils import (
Paged,
page,
update_model,
with_metadata,
order,
)
from ...app.extensions.database.middleware import db
from ..schemas import containers as schema

Expand Down Expand Up @@ -34,10 +41,10 @@ def get_containers(
)

if containerId:
query = query.filter(models.Continer.containerId == containerId)
query = query.filter(models.Container.containerId == containerId)

if dewarId:
query = query.filter(models.Continer.dewarId == dewarId)
query = query.filter(models.Container.dewarId == dewarId)

if proteinId:
query = query.filter(models.Crystal.proteinId == proteinId)
Expand Down Expand Up @@ -78,3 +85,244 @@ def update_container(
db.session.commit()

return get_containers(containerId=containerId, skip=0, limit=1).first


def get_queued_containers(
skip: int,
limit: int,
proposal: Optional[str] = None,
beamLineName: Optional[str] = None,
containerId: Optional[int] = None,
containerQueueId: Optional[int] = None,
) -> Paged[models.ContainerQueue]:
metadata = {
"samples": func.count(
distinct(models.ContainerQueueSample.containerQueueSampleId)
)
}
query = (
db.session.query(models.ContainerQueue, *metadata.values())
.select_from(models.ContainerQueue)
.join(models.ContainerQueueSample)
.join(models.Container)
.options(joinedload(models.ContainerQueue.Container))
.join(models.Dewar)
.options(joinedload(models.ContainerQueue.Container, models.Container.Dewar))
.join(models.Shipping)
.options(
joinedload(
models.ContainerQueue.Container,
models.Container.Dewar,
models.Dewar.Shipping,
)
)
.join(models.Proposal, models.Proposal.proposalId == models.Shipping.proposalId)
.group_by(models.ContainerQueue.containerQueueId)
)

if containerQueueId:
query = query.filter(models.ContainerQueue.containerQueueId == containerQueueId)

if proposal:
query = query.filter(models.Proposal.proposal == proposal)

if beamLineName:
query = query.filter(models.Container.beamlineLocation == beamLineName)

if containerId:
query = query.filter(models.Container.containerId == containerId)

query = with_authorization(query, proposal)
total = query.count()
query = page(query, skip=skip, limit=limit)
results = with_metadata(query.all(), list(metadata.keys()))

return Paged(total=total, results=results, skip=skip, limit=limit)


def update_queued_container(
containerQueueId: int, containerQueue: schema.ContainerQueueUpdate
) -> models.ContainerQueue:
new_container_queue = get_queued_containers(
containerQueueId=containerQueueId, skip=0, limit=1
).first

if containerQueue.completed:
new_container_queue.completedTimeStamp = func.now()
db.session.commit()

return get_queued_containers(
containerQueueId=containerQueueId, skip=0, limit=1
).first


QUEUED_SAMPLE_STATUS_FILTERS = {
"Queued": func.count(distinct(models.DataCollection.dataCollectionId)) == 0,
"Completed": func.count(distinct(models.DataCollection.dataCollectionId)) > 0,
"Failed": func.sum(
func.IF(
and_(
models.DataCollection.runStatus.notlike("%success%"),
models.DataCollection.runStatus != None, # noqa
),
1,
0,
)
)
> 0,
}

QUEUED_SAMPLE_STATUS_ENUM = enum.Enum(
"QueuedSampleStatus", {k: k for k in QUEUED_SAMPLE_STATUS_FILTERS.keys()}
)

QUEUED_SAMPLE_ORDER_BY_MAP = {
"containerQueueSampleId": models.ContainerQueueSample.containerQueueSampleId,
"started": func.min(models.DataCollection.startTime),
"finished": func.max(models.DataCollection.endTime),
}


def get_queued_samples(
skip: int,
limit: int,
proposal: Optional[str] = None,
blSampleId: Optional[int] = None,
beamLineName: Optional[str] = None,
containerId: Optional[int] = None,
containerQueueSampleId: Optional[int] = None,
status: Optional[QUEUED_SAMPLE_STATUS_ENUM] = None,
sort_order: Optional[dict[str, str]] = None,
) -> Paged[models.ContainerQueueSample]:
metadata = {
"datacollections": func.group_concat(
distinct(
func.concat(
models.DataCollection.dataCollectionId,
":",
models.DataCollection.runStatus,
)
)
),
"dataCollectionGroupId": models.DataCollectionGroup.dataCollectionGroupId,
"sessionId": models.DataCollectionGroup.sessionId,
"proposal": models.Proposal.proposal,
"started": func.min(models.DataCollection.startTime),
"finished": func.max(models.DataCollection.endTime),
"types": func.group_concat(distinct(models.DataCollectionGroup.experimentType)),
}

query = (
db.session.query(models.ContainerQueueSample, *metadata.values())
.select_from(models.ContainerQueueSample)
.join(models.DiffractionPlan)
.options(
joinedload(
models.ContainerQueueSample.DiffractionPlan,
)
)
.outerjoin(
models.BLSubSample,
models.BLSubSample.blSubSampleId
== models.ContainerQueueSample.blSubSampleId,
)
.options(
joinedload(
models.ContainerQueueSample.BLSubSample,
)
)
.options(
joinedload(
models.ContainerQueueSample.BLSample,
)
)
.outerjoin(
models.BLSample,
models.BLSample.blSampleId == models.BLSubSample.blSampleId,
)
.options(
joinedload(
models.ContainerQueueSample.BLSubSample,
models.BLSubSample.BLSample,
)
)
.outerjoin(
models.DataCollection,
models.DataCollection.dataCollectionPlanId
== models.ContainerQueueSample.dataCollectionPlanId,
)
.outerjoin(models.DataCollectionGroup)
.join(models.ContainerQueue)
.join(models.Container)
.join(models.Dewar)
.join(models.Shipping)
.join(models.Proposal, models.Proposal.proposalId == models.Shipping.proposalId)
.group_by(models.ContainerQueueSample.containerQueueSampleId)
)

if containerQueueSampleId:
query = query.filter(
models.ContainerQueueSample.containerQueueSampleId == containerQueueSampleId
)

if proposal:
query = query.filter(models.Proposal.proposal == proposal)

if beamLineName:
query = query.filter(models.Container.beamlineLocation == beamLineName)

if containerId:
query = query.filter(models.Container.containerId == containerId)

if blSampleId:
query = query.filter(models.BLSample.blSampleId == blSampleId)

if status:
query = query.having(QUEUED_SAMPLE_STATUS_FILTERS[status.value])

if sort_order:
query = order(
query,
QUEUED_SAMPLE_ORDER_BY_MAP,
sort_order,
default={"order_by": "containerQueueSampleId", "order": "desc"},
)

query = with_authorization(query)
total = query.count()
query = page(query, skip=skip, limit=limit)
results = with_metadata(query.all(), list(metadata.keys()))
dc_keys = ["dataCollectionId", "runStatus"]

for result in results:
if result._metadata["types"]:
result._metadata["types"] = result._metadata["types"].split(",")
else:
result._metadata["types"] = []

if result._metadata["datacollections"]:
result._metadata["datacollections"] = [
{dc_keys[i]: value for i, value in enumerate(dc.split(":"))}
for dc in result._metadata["datacollections"].split(",")
]
else:
result._metadata["datacollections"] = []

return Paged(total=total, results=results, skip=skip, limit=limit)


def delete_queued_sample(containerQueueSampleId: int) -> None:
queued_sample = get_queued_samples(
containerQueueSampleId=containerQueueSampleId, skip=0, limit=1
).first

if len(queued_sample._metadata["datacollections"]) > 0:
raise RuntimeError(
"Queued sample has related data collections so cannot be removed"
)

if queued_sample.DiffractionPlan:
db.session.delete(queued_sample.DiffractionPlan)

db.session.delete(queued_sample)
db.session.commit()
75 changes: 74 additions & 1 deletion pyispyb/core/routes/containers.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import logging
from typing import Optional

from fastapi import Depends, HTTPException, status
from ispyb import models

from ...dependencies import pagination
from ...dependencies import pagination, order_by_factory
from ...app.extensions.database.utils import Paged
from ...app.base import AuthenticatedAPIRouter
from ... import filters
Expand All @@ -17,6 +18,78 @@
router = AuthenticatedAPIRouter(prefix="/containers", tags=["Containers"])


@router.get("/queue", response_model=paginated(schema.ContainerQueue))
def get_queued_containers(
proposal: str = Depends(filters.proposal),
beamLineName: str = Depends(filters.beamLineName),
page: dict[str, int] = Depends(pagination),
) -> Paged[models.ContainerQueue]:
"""Get a list of queued containers"""
return crud.get_queued_containers(
proposal=proposal, beamLineName=beamLineName, **page
)


@router.patch("/queue/{containerQueueId}", response_model=schema.ContainerQueue)
def update_queued_container(
containerQueueId: int,
containerQueue: schema.ContainerQueueUpdate,
) -> models.ContainerQueue:
"""Update a queued container"""
try:
return crud.update_queued_container(containerQueueId, containerQueue)
except IndexError:
raise HTTPException(status_code=404, detail="Container queue not found")
except Exception:
logger.exception(
f"Could not update container queue `{containerQueueId}` with payload `{containerQueue}`"
)
raise HTTPException(status_code=400, detail="Could not update container queue")


QUEUED_SAMPLE_ORDER_BY = order_by_factory(
crud.QUEUED_SAMPLE_ORDER_BY_MAP, "QueuedSampleOrder"
)


@router.get("/queue/samples", response_model=paginated(schema.ContainerQueueSample))
def get_queued_subsamples(
proposal: str = Depends(filters.proposal),
blSampleId: int = Depends(filters.blSampleId),
containerId: int = Depends(filters.containerId),
beamLineName: str = Depends(filters.beamLineName),
status: Optional[crud.QUEUED_SAMPLE_STATUS_ENUM] = None,
sort_order: dict = Depends(QUEUED_SAMPLE_ORDER_BY),
page: dict[str, int] = Depends(pagination),
) -> Paged[models.ContainerQueueSample]:
"""Get a list of queued samples and sub samples"""
return crud.get_queued_samples(
proposal=proposal,
blSampleId=blSampleId,
containerId=containerId,
beamLineName=beamLineName,
status=status,
sort_order=sort_order,
**page,
)


@router.delete(
"/queue/samples/{containerQueueSampleId}", status_code=status.HTTP_204_NO_CONTENT
)
def delete_queued_sample(containerQueueSampleId: int) -> None:
"""Delete a queued sample"""
try:
crud.delete_queued_sample(containerQueueSampleId=containerQueueSampleId)
except IndexError:
raise HTTPException(status_code=404, detail="Queued sample not found")
except Exception as e:
logger.exception(f"Could not delete queued sample `{containerQueueSampleId}`")
raise HTTPException(
status_code=400, detail=f"Could not delete queued sample: {str(e)}"
)


@router.get("", response_model=paginated(schema.Container))
def get_containers(
proposal: str = Depends(filters.proposal),
Expand Down
Loading