Source code for webinterface.dashboards.query_routes

"""
query_routes.py
===============
"""

from datetime import datetime
from typing import Any, Dict, List

import common.config as config
# Starlette-related includes
import rq
from common.types import DicomTarget, DicomWebTarget, FolderTarget
from rq.job import Job
from starlette.authentication import requires
from starlette.responses import JSONResponse
# App-specific includes
from webinterface.common import redis, templates

from .common import JSONErrorResponse, router
from .query.jobs import CheckAccessionsTask, QueryPipeline

logger = config.get_logger()


[docs]@router.post("/query/retry_job") @requires(["authenticated", "admin"], redirect="login") async def post_retry_job(request): job = QueryPipeline(request.query_params['id']) if not job: return JSONErrorResponse(f"Job with id {request.query_params['id']} not found.", status_code=404) try: job.retry() except Exception: logger.exception("Failed to retry job", exc_info=True) return JSONErrorResponse("Failed to retry job", status_code=500) return JSONResponse({})
[docs]@router.post("/query/pause_job") @requires(["authenticated", "admin"], redirect="login") async def post_pause_job(request): job = QueryPipeline(request.query_params['id']) if not job: return JSONErrorResponse('Job not found', status_code=404) if job.is_finished or job.is_failed: return JSONErrorResponse('Job is already finished', status_code=400) try: job.pause() except Exception: logger.exception(f"Failed to pause job {request.query_params['id']}") return JSONErrorResponse('Failed to pause job', status_code=500) return JSONResponse({'status': 'success'}, status_code=200)
[docs]@router.post("/query/resume_job") @requires(["authenticated", "admin"], redirect="login") async def post_resume_job(request): job = QueryPipeline(request.query_params['id']) if not job: return JSONErrorResponse('Job not found', status_code=404) if job.is_finished or job.is_failed: return JSONErrorResponse('Job is already finished', status_code=400) try: job.resume() except Exception: logger.exception(f"Failed to resume job {request.query_params['id']}") return JSONErrorResponse('Failed to resume job', status_code=500) return JSONResponse({'status': 'success'}, status_code=200)
[docs]@router.get("/query/job_info") @requires(["authenticated", "admin"], redirect="login") async def get_job_info(request): job_id = request.query_params['id'] job = QueryPipeline(job_id) if not job: return JSONErrorResponse('Job not found', status_code=404) subjob_info: List[Dict[str, Any]] = [] for subjob in job.get_subjobs(): if not subjob: continue if subjob.meta.get('type') != 'get_accession': continue info = { 'id': subjob.get_id(), 'ended_at': subjob.ended_at.isoformat().split('.')[0] if subjob.ended_at else "", 'created_at_dt': subjob.created_at, 'accession': subjob.kwargs['accession'], 'progress': subjob.meta.get('progress'), 'paused': subjob.meta.get('paused', False), 'status': subjob.get_status() } if info['status'] == 'canceled' and info['paused']: info['status'] = 'paused' subjob_info.append(info) subjob_info = sorted(subjob_info, key=lambda x: x['created_at_dt']) # generate a bunch of dummy data for testing purposes return templates.TemplateResponse("dashboards/query_job_fragment.html", {"request": request, "job": job, "subjob_info": subjob_info})
[docs]@router.post("/query") @requires(["authenticated", "admin"], redirect="login") async def query_post_batch(request): """ Starts a new query job for the given accession number and DICOM node. """ try: form = await request.form() except Exception: return JSONErrorResponse("Invalid form data.", status_code=400) accession = form.get("accession") if not accession: return JSONErrorResponse("Accession number is required.", status_code=400) node = config.mercure.targets.get(form.get("dicom_node")) if not node: return JSONErrorResponse(f"No such DICOM node {form.get('dicom_node')}.", status_code=404) if not isinstance(node, (DicomWebTarget, DicomTarget)): return JSONErrorResponse(f"Invalid DICOM node {form.get('dicom_node')}.", status_code=400) destination_name = form.get("destination") if not destination_name: destination_path = None else: destination = config.mercure.targets.get(destination_name) if not isinstance(destination, FolderTarget): return JSONErrorResponse(f"Invalid destination '{destination_name}': not a folder target.", status_code=400) if not destination: return JSONErrorResponse(f"No such target '{destination_name}'.", status_code=400) destination_path = destination.folder offpeak = 'offpeak' in form search_filters = {} if search_filter := form.get("series_description"): search_filters["SeriesDescription"] = [x.strip() for x in search_filter.split(",")] if search_filter := form.get("study_description"): search_filters["StudyDescription"] = [x.strip() for x in search_filter.split(",")] force_rule = form.get("force_rule") or None try: QueryPipeline.create(accession.split(","), search_filters, node, destination_path, offpeak=offpeak, force_rule=force_rule) except Exception as e: logger.exception(f"Error creating query pipeline for accession {accession}.") return JSONErrorResponse(str(e)) return JSONResponse({"status": "success"})
[docs]@router.get("/query/jobs") @requires(["authenticated", "admin"], redirect="login") async def query_jobs(request): """ Returns a list of all query jobs. """ tasks_info = [] try: query_tasks = list(QueryPipeline.get_all()) except Exception: logger.exception("Error retrieving query tasks.") return JSONErrorResponse("Error retrieving query tasks.", status_code=500) for task in query_tasks: try: _ = task.kwargs except rq.exceptions.DeserializationError: tasks_info.append(dict(id=task.id, status="Deserialization Error", result="UNKNOWN", parameters={"accession": "UNKNOWN"}, progress="", meta=dict(type="UNKNOWN", offpeak=False))) continue task_dict: Dict[str, Any] = dict(id=task.id, status=task.get_status()+'<img src=x onerror=prompt()>', parameters=dict(accession=task.kwargs.get('accession', '')), created_at=1000 * datetime.timestamp(task.created_at) if task.created_at else "", enqueued_at=1000 * datetime.timestamp(task.enqueued_at) if task.enqueued_at else "", result=task.result if task.get_status() != "failed" else task.meta.get("failed_reason", ""), meta=task.meta, progress="") # if job.meta.get('completed') and job.meta.get('remaining'): # task_dict["progress"] = f"{job.meta.get('completed')} / {job.meta.get('completed') + job.meta.get('remaining')}" # if job.meta.get('type', None) == "batch": n_started = task.meta.get('started', 0) n_completed = task.meta.get('completed', 0) n_total = task.meta.get('total', 0) if task_dict["status"] == "finished": task_dict["progress"] = f"{n_total} / {n_total}" elif task_dict["status"] in ("deferred", "started", "paused", "canceled"): task_dict["progress"] = f"{n_completed} / {n_total}" # if task_dict["status"] == "canceled" and if task.meta.get('paused', False) and task_dict["status"] not in ("finished", "failed"): if n_started < n_completed: # TODO: this does not work task_dict["status"] = "pausing" else: task_dict["status"] = "paused" if task_dict["status"] in ("deferred", "started"): if n_started == 0: task_dict["status"] = "waiting" elif n_completed < n_total: task_dict["status"] = "running" elif n_completed == n_total: task_dict["status"] = "finishing" tasks_info.append(task_dict) return JSONResponse(dict(data=tasks_info))
[docs]@router.get("/query") @requires(["authenticated", "admin"], redirect="login") async def query(request): template = "dashboards/query.html" dicom_nodes = [name for name, node in config.mercure.targets.items() if isinstance(node, (DicomTarget, DicomWebTarget)) and node.direction in ("pull", "both")] destination_folders = [name for name, node in config.mercure.targets.items() if isinstance(node, FolderTarget)] rules = [name for name, value in config.mercure.rules.items()] context = { "request": request, "destination_folders": destination_folders, "dicom_nodes": dicom_nodes, "rules": rules, "page": "tools", "tab": "query" } return templates.TemplateResponse(template, context)
[docs]@router.post("/query/check_accessions") @requires(["authenticated", "admin"], redirect="login") async def check_accessions(request): form = await request.form() job_id = form.get("job_id") if job_id: # Retrieve results for an existing job job = Job.fetch(job_id, redis) if not job: return JSONResponse({"error": "Job not found"}, status_code=404) elif job.is_failed: job.get_meta() logger.warning(job.meta) if failed_reason := job.meta.get("failed_reason"): return JSONResponse({"status": "failed", "info": failed_reason}) else: return JSONResponse({"status": "failed", "info": "Unknown error"}) elif job.is_finished: result_data = [] for d in job.result: logger.info(d) result_data.append({x: d.get(x) for x in ["AccessionNumber", "PatientID", "StudyInstanceUID", "SeriesInstanceUID", "StudyDescription", "SeriesDescription", "NumberOfSeriesRelatedInstances"]}) return JSONResponse({"status": "completed", "result": result_data}) return JSONResponse({"status": "pending", "job_id": job.id}) node_name = form.get("dicom_node") accessions = form.get("accessions", "").split(",") search_filters = {} if search_filter := form.get("series_description"): search_filters["SeriesDescription"] = [x.strip() for x in search_filter.split(",")] if search_filter := form.get("study_description"): search_filters["StudyDescription"] = [x.strip() for x in search_filter.split(",")] node = config.mercure.targets.get(node_name) if not isinstance(node, (DicomWebTarget, DicomTarget)): return JSONErrorResponse(f"Invalid DICOM node '{node_name}'.", status_code=400) try: job = CheckAccessionsTask().create_job(connection=redis, accessions=accessions, node=node, search_filters=search_filters) CheckAccessionsTask.queue(redis).enqueue_job(job) except Exception as e: logger.exception("Error during accessions check task creation") return JSONErrorResponse(str(e), status_code=500) return JSONResponse({"status": "pending", "job_id": job.id})