"""
query.py
========
Entry functions of the bookkeeper for querying processing information.
"""
import ast
import datetime
import json
from pathlib import Path
# Standard python includes
from typing import Dict
# App-specific includes
import bookkeeping.database as db
import pydicom
import sqlalchemy
from bookkeeping.helper import CustomJSONResponse, json
from common import config
from decoRouter import Router as decoRouter
from pydicom.datadict import keyword_for_tag
from sqlalchemy import select
# Starlette-related includes
from starlette.applications import Starlette
from starlette.authentication import requires
from starlette.responses import JSONResponse
router = decoRouter()
logger = config.get_logger()
tz_conversion = ""
[docs]def set_timezone_conversion() -> None:
global tz_conversion
tz_conversion = ""
if config.mercure.server_time != config.mercure.local_time:
tz_conversion = f" AT time zone '{config.mercure.server_time}' at time zone '{config.mercure.local_time}' "
###################################################################################
# Query endpoints
###################################################################################
[docs]@router.get("/series")
@requires("authenticated")
async def get_series(request) -> JSONResponse:
"""Endpoint for retrieving series in the database."""
series_uid = request.query_params.get("series_uid", "")
query = db.dicom_series.select()
if series_uid:
query = query.where(db.dicom_series.c.series_uid == series_uid)
result = await db.database.fetch_all(query)
series = [dict(row) for row in result]
for i, line in enumerate(series):
series[i] = {
k: line[k] for k in line if k in ("id", "time", "series_uid", "tag_seriesdescription", "tag_modality")
}
return CustomJSONResponse(series)
[docs]@router.get("/tasks")
@requires("authenticated")
async def get_tasks(request) -> JSONResponse:
"""Endpoint for retrieving tasks in the database."""
query = (
sqlalchemy.select(
db.tasks_table.c.id, db.tasks_table.c.time, db.dicom_series.c.tag_seriesdescription, db.dicom_series.c.tag_modality
)
.where(db.tasks_table.c.parent_id.is_(None)) # only show tasks without parents
.join(
db.dicom_series,
# sqlalchemy.or_(
# (dicom_series.c.study_uid == tasks_table.c.study_uid),
(db.dicom_series.c.series_uid == db.tasks_table.c.series_uid),
# ),
isouter=True,
)
)
# query = sqlalchemy.text(
# """ select tasks.id as task_id, tasks.time, tasks.series_uid, tasks.study_uid,
# "tag_seriesdescription", "tag_modality" from tasks
# join dicom_series on tasks.study_uid = dicom_series.study_uid
# or tasks.series_uid = dicom_series.series_uid """
# )
results = await db.database.fetch_all(query)
return CustomJSONResponse(results)
[docs]@router.get("/tests")
@requires("authenticated")
async def get_test_task(request) -> JSONResponse:
query = db.tests_table.select().order_by(db.tests_table.c.time_begin.desc())
# query = (
# sqlalchemy.select(
# tasks_table.c.id, tasks_table.c.time, dicom_series.c.tag_seriesdescription, dicom_series.c.tag_modality
# )
# .join(
# dicom_series,
# sqlalchemy.or_(
# (dicom_series.c.study_uid == tasks_table.c.study_uid),
# (dicom_series.c.series_uid == tasks_table.c.series_uid),
# ),
# )
# .where(dicom_series.c.tag_seriesdescription == "self_test_series " + request.query_params.get("id", ""))
# )
result_rows = await db.database.fetch_all(query)
results = [dict(row) for row in result_rows]
for k in results:
if not k["time_end"]:
if k["time_begin"] < datetime.datetime.now() - datetime.timedelta(minutes=10):
k["status"] = "failed"
return CustomJSONResponse(results)
[docs]@router.get("/task-events")
@requires("authenticated")
async def get_task_events(request) -> JSONResponse:
"""Endpoint for getting all events related to one task."""
task_id = request.query_params.get("task_id", "")
subtask_query = sqlalchemy.select(db.tasks_table.c.id).where(db.tasks_table.c.parent_id == task_id)
# Note: The space at the end is needed for the case that there are no subtasks
subtask_ids_str = ""
for row in await db.database.fetch_all(subtask_query):
subtask_ids_str += f"'{row[0]}',"
subtask_ids_filter = ""
if subtask_ids_str:
subtask_ids_filter = "or task_events.task_id in (" + subtask_ids_str[:-1] + ")"
# Get all the task_events from task `task_id` or any of its subtasks
# subtask_ids = [row[0] for row in await database.fetch_all(subtask_query)]
# query = (
# task_events.select()
# .order_by(task_events.c.task_id, task_events.c.time)
# .where(sqlalchemy.or_(task_events.c.task_id == task_id, task_events.c.task_id.in_(subtask_ids)))
# )
query_string = f"""select *, time {tz_conversion} as local_time from task_events
where task_events.task_id = '{task_id}' {subtask_ids_filter}
order by task_events.task_id, task_events.time
"""
# print("SQL Query = " + query_string)
query = sqlalchemy.text(query_string)
results = await db.database.fetch_all(query)
return CustomJSONResponse(results)
[docs]@router.get("/dicom-files")
@requires("authenticated")
async def get_dicom_files(request) -> JSONResponse:
"""Endpoint for getting all events related to one series."""
series_uid = request.query_params.get("series_uid", "")
query = db.dicom_files.select().order_by(db.dicom_files.c.time)
if series_uid:
query = query.where(db.dicom_files.c.series_uid == series_uid)
results = await db.database.fetch_all(query)
return CustomJSONResponse(results)
[docs]@router.get("/task_process_logs")
@requires("authenticated")
async def get_task_process_logs(request) -> JSONResponse:
"""Endpoint for getting all processing logs related to one series."""
task_id = request.query_params.get("task_id", "")
subtask_query = (
db.tasks_table.select()
.order_by(db.tasks_table.c.id)
.where(sqlalchemy.or_(db.tasks_table.c.id == task_id, db.tasks_table.c.parent_id == task_id))
)
subtasks = await db.database.fetch_all(subtask_query)
subtask_ids = [row[0] for row in subtasks]
query = (db.processor_logs_table.select(db.processor_logs_table.c.task_id.in_(subtask_ids))
.order_by(db.processor_logs_table.c.id))
results = [dict(r) for r in await db.database.fetch_all(query)]
for result in results:
if result["logs"] is None:
if logs_folder := config.mercure.processing_logs.logs_file_store:
result["logs"] = (
Path(logs_folder) / result["task_id"] / f"{result['module_name']}.{result['id']}.txt"
).read_text(encoding="utf-8")
return CustomJSONResponse(results)
[docs]@router.get("/task_process_results")
@requires("authenticated")
async def get_task_process_results(request) -> JSONResponse:
"""Endpoint for getting all processing results from a task."""
task_id = request.query_params.get("task_id", "")
query = (db.processor_outputs_table.select()
.where(db.processor_outputs_table.c.task_id == task_id)
.order_by(db.processor_outputs_table.c.id))
results = [dict(r) for r in await db.database.fetch_all(query)]
return CustomJSONResponse(results)
[docs]@router.get("/find_task")
@requires("authenticated")
async def find_task(request) -> JSONResponse:
# Extract DataTables parameters
draw = int(request.query_params.get("draw", "1"))
start = int(request.query_params.get("start", "0"))
length = int(request.query_params.get("length", "10"))
search_term = request.query_params.get("search[value]", "") # Global search value
study_filter = request.query_params.get("study_filter", "false")
# Extract ordering information
order_column_index = request.query_params.get("order[0][column]", "4") # Default to time column (index 4)
order_direction = request.query_params.get("order[0][dir]", "desc") # Default to descending
# Map datatable column index to database column
column_mapping = {
"0": "tag_accessionnumber", # ACC
"1": "tag_patientid", # MRN
"2": "parent_tasks.data->'info'->>'uid_type'", # Scope
"3": "7", # Rule
"4": "parent_tasks.time" # Default fallback
}
order_column = column_mapping.get(order_column_index, column_mapping["4"])
order_sql = f"{order_column} {order_direction.upper()}, parent_tasks.id {order_direction.upper()}"
having_term = (f"""HAVING (
(tag_accessionnumber ilike :search_term || '%')
or (tag_patientid ilike :search_term || '%')
or (tag_patientname ilike '%' || :search_term || '%')
or bool_or(child_tasks.data->'info'->>'applied_rule'::text ilike '%' || :search_term || '%')
or bool_or(
array(
select jsonb_object_keys(
child_tasks.data->'info'->'triggered_rules'
)
)::text ilike '%' || :search_term || '%'
)
)
""") if search_term else ""
study_filter_term = ""
if study_filter == "true":
study_filter_term = "and parent_tasks.study_uid is not null"
# Count query (for recordsTotal and recordsFiltered)
count_query_string = f"""
with base as (
SELECT
parent_tasks.id AS task_id,
tag_accessionnumber, tag_patientid, tag_patientname
FROM
tasks as parent_tasks
LEFT JOIN dicom_series ON dicom_series.series_uid = parent_tasks.series_uid
LEFT JOIN tasks as child_tasks ON (child_tasks.parent_id = parent_tasks.id)
WHERE parent_tasks.parent_id is null {study_filter_term}
GROUP BY 1,2,3,4
{having_term}
)
SELECT
COUNT(DISTINCT task_id) as total_count
FROM base
"""
# Main data query with pagination
query_string = f"""
SELECT
tag_accessionnumber AS acc,
tag_patientid AS mrn,
tag_patientname AS name,
parent_tasks.id AS task_id,
parent_tasks.data->'info'->>'uid_type' AS scope,
parent_tasks.time::timestamp AS time,
STRING_AGG(case when coalesce(child_tasks.data->'info'->>'applied_rule','') != '' then array[child_tasks.data->'info'->>'applied_rule']::text else array(select jsonb_object_keys((child_tasks.data->'info'->'triggered_rules')))::text end, ', ' ORDER BY child_tasks.id)
AS rule
FROM
tasks as parent_tasks
LEFT JOIN dicom_series ON dicom_series.series_uid = parent_tasks.series_uid
LEFT JOIN tasks as child_tasks ON (child_tasks.parent_id = parent_tasks.id)
WHERE parent_tasks.parent_id is null {study_filter_term}
GROUP BY
tag_accessionnumber, tag_patientid, tag_patientname, parent_tasks.id, scope, parent_tasks.time
{having_term}
ORDER BY
{order_sql}
LIMIT :length OFFSET :start
"""
# Get total count before filtering
params = {"search_term": search_term} if search_term else {}
count_result = await db.database.fetch_one(count_query_string, params)
total_count = count_result["total_count"] if count_result else 0
filtered_count = total_count # In this case, total and filtered are the same since we're not implementing separate filtering
# Execute main query with pagination parameters
params.update({"start": start if start is not None else 0, "length": length if length > 0 else None})
result_rows = await db.database.fetch_all(query_string, params)
results = [dict(row) for row in result_rows]
# Format data for DataTables
data = []
for item in results:
task_id = item["task_id"]
time = item["time"]
acc = item["acc"] or ""
mrn = item["mrn"] or ""
job_scope = "STUDY" if item.get("scope") == "study" else "SERIES"
# if item.get("rule"):
# item["rule"] = item["rule"].strip()
# if item["rule"] == ",":
# item["rule"] = ""
# rule_information = ""
# if item.get("rule"):
# rule_information = item["rule"]
# else:
# if item.get("triggered_rules"):
# try:
# json_data = json.loads("[" + item["triggered_rules"] + "]")
# for entry in json_data:
# rule_information += ", ".join(list(entry.keys())) + ", "
# if rule_information:
# rule_information = rule_information[:-2]
# except json.JSONDecodeError:
# rule_information = "ERROR"
# Format row as an array for DataTables
data.append({
"DT_RowId": f"task_{task_id}", # Add DataTables row identifier
"ACC": acc,
"MRN": mrn,
"Scope": job_scope,
"Time": time.isoformat(timespec='seconds') if isinstance(time, datetime.datetime) else str(time),
"Rule": item.get("rule", "").replace("{", "").replace("}", ""),
"task_id": task_id # Include task_id for actions/links
})
# Return response in DataTables expected format
response = {
"draw": draw, # Echo back the draw parameter
"recordsTotal": total_count, # Total records before filtering
"recordsFiltered": filtered_count, # Total records after filtering
"data": data # The data to be displayed
}
return CustomJSONResponse(response)
[docs]def convert_key(tag_key):
# Remove any leading/trailing whitespace and parentheses
tag_key = tag_key.strip('()')
# Convert tag string to integer tuple format
try:
# Get human-readable keyword
keyword = keyword_for_tag(tag_key)
return keyword if keyword else tag_key
except:
logger.exception(f"Error converting tag {tag_key} to keyword")
return tag_key
[docs]def dicom_to_readable_json(ds: pydicom.Dataset):
"""
Converts a DICOM file to a human-readable JSON format.
Args:
file_path (str): Path to the DICOM file.
output_file_path (str): Path to save the JSON output.
"""
try:
result = json.dumps(ds, default=convert_to_serializable)
return json.loads(result)
except Exception as e:
logger.exception(f"Error converting DICOM to readable JSON: {e}")
return {}
[docs]def convert_to_serializable(obj):
"""
Converts non-serializable objects to serializable types.
"""
if isinstance(obj, pydicom.dataset.Dataset):
return {keyword_for_tag(el.tag) or el.tag.json_key[:4]+","+el.tag.json_key[4:]: obj[el.tag] for el in obj.elements()}
if isinstance(obj, pydicom.dataelem.DataElement):
try:
obj.maxBytesToDisplay = 500
obj.descripWidth = 500
# see if the representation of this element can be converted to JSON
# this will convert eg lists to python lists, numbers to python numbers, etc
json.dumps(evaled := ast.literal_eval(obj.repval))
return evaled
except:
return obj.repval
raise TypeError(f"Object of type {type(obj)} is not JSON serializable")
[docs]@router.get("/get_task_info")
@requires("authenticated")
async def get_task_info(request) -> JSONResponse:
response: Dict = {}
task_id = request.query_params.get("task_id", "")
if not task_id:
return CustomJSONResponse(response)
# First, get general information about the series/study
query = (
select(db.dicom_series, db.tasks_table.c.data)
.select_from(db.tasks_table)
.join(db.dicom_series, db.dicom_series.c.series_uid == db.tasks_table.c.series_uid, isouter=True)
.where(
db.tasks_table.c.id == task_id,
db.tasks_table.c.parent_id.is_(None)
)
.limit(1)
)
result = await db.database.fetch_one(query)
# info_rows = await db.database.fetch_all(info_query)
if result:
result_dict = dict(result)
rename = {
"series_uid": "SeriesUID",
"study_uid": "StudyUID",
"tag_patientname": "PatientName",
"tag_patientid": "PatientID",
"tag_accessionnumber": "AccessionNumber",
"tag_seriesnumber": "SeriesNumber",
"tag_studyid": "StudyID",
"tag_patientbirthdate": "PatientBirthDate",
"tag_patientsex": "PatientSex",
"tag_acquisitiondate": "AcquisitionDate",
"tag_acquisitiontime": "AcquisitionTime",
"tag_modality": "Modality",
"tag_bodypartexamined": "BodyPartExamined",
"tag_studydescription": "StudyDescription",
"tag_seriesdescription": "SeriesDescription",
"tag_protocolname": "ProtocolName",
"tag_codevalue": "CodeValue",
"tag_codemeaning": "CodeMeaning",
"tag_sequencename": "SequenceName",
"tag_scanningsequence": "ScanningSequence",
"tag_sequencevariant": "SequenceVariant",
"tag_slicethickness": "SliceThickness",
"tag_contrastbolusagent": "ContrastBolusAgent",
"tag_referringphysicianname": "ReferringPhysicianName",
"tag_manufacturer": "Manufacturer",
"tag_manufacturermodelname": "ManufacturerModelName",
"tag_magneticfieldstrength": "MagneticFieldStrength",
"tag_deviceserialnumber": "DeviceSerialNumber",
"tag_softwareversions": "SoftwareVersions",
"tag_stationname": "StationName",
}
response["information"] = {
rename.get(x, x): result_dict.get(x)
for x in result_dict.keys() if x not in ('id', 'time', 'data')
}
try:
if 'data' in result_dict and isinstance(result_dict['data'], str):
data = json.loads(result_dict.get('data', '{}'))
if data is not None:
tags = dict(data).get("tags", None)
if tags is not None:
ds = pydicom.Dataset.from_json(tags)
response["sample_tags_received"] = dicom_to_readable_json(ds)
except:
logger.exception("Error parsing data")
# Now, get the task files embedded into the task or its subtasks
query = (
db.tasks_table.select()
.order_by(db.tasks_table.c.id)
.where(sqlalchemy.or_(db.tasks_table.c.id == task_id, db.tasks_table.c.parent_id == task_id))
)
result_rows = await db.database.fetch_all(query)
results = [dict(row) for row in result_rows]
for item in results:
if item["data"] and set(item["data"].keys()) != {"id", "tags"}:
task_id = "task " + item["id"]
response[task_id] = item["data"]
task_folder = None
for k in [Path(config.mercure.success_folder), Path(config.mercure.error_folder)]:
if (found_folder := k / item["id"]).exists():
task_folder = found_folder
break
else:
continue
try:
sample_file = next(task_folder.rglob("*.dcm"))
tags = dicom_to_readable_json(pydicom.dcmread(sample_file, stop_before_pixels=True))
if task_id not in response:
response[task_id] = {}
response[task_id]["sample_tags_result"] = tags
except (StopIteration, json.JSONDecodeError):
pass
return CustomJSONResponse(response)
query_app = Starlette(routes=router)