"""
route_studies.py
================
Provides functions for routing and processing of studies (consisting of multiple series).
"""
# Standard python includes
import os
from pathlib import Path
from typing import Dict, Optional, Union
import uuid
import json
import shutil
import daiquiri
from datetime import datetime, timedelta
# App-specific includes
import common.config as config
import common.rule_evaluation as rule_evaluation
import common.monitor as monitor
import common.notification as notification
import common.helper as helper
import common.log_helpers as log_helpers
from common.types import Rule, Task, TaskHasStudy, TaskInfo
from common.constants import (
mercure_defs,
mercure_names,
mercure_actions,
mercure_rule,
mercure_config,
mercure_options,
mercure_folders,
mercure_sections,
mercure_study,
mercure_info,
mercure_events,
)
# Create local logger instance
logger = config.get_logger()
[docs]def route_studies(pending_series: Dict[str, float]) -> None:
"""
Searches for completed studies and initiates the routing of the completed studies
"""
# TODO: Handle studies that exceed the "force completion" timeout in the "CONDITION_RECEIVED_SERIES" mode
studies_ready = {}
with os.scandir(config.mercure.studies_folder) as it:
for entry in it:
if entry.is_dir() and not is_study_locked(entry.path) and is_study_complete(entry.path, pending_series):
modificationTime = entry.stat().st_mtime
studies_ready[entry.name] = modificationTime
logger.debug(f"Studies ready for processing: {studies_ready}")
# Process all complete studies
for dir_entry in sorted(studies_ready):
study_success = False
try:
study_success = route_study(dir_entry)
except Exception:
error_message = f"Problems while processing study {dir_entry}"
logger.exception(error_message)
# TODO: Add study events to bookkeeper
# monitor.send_series_event(monitor.task_event.ERROR, entry, 0, "", "Exception while processing")
monitor.send_event(
monitor.m_events.PROCESSING,
monitor.severity.ERROR,
error_message,
)
if not study_success:
# Move the study to the error folder to avoid repeated processing
push_studylevel_error(dir_entry)
# If termination is requested, stop processing after the active study has been completed
if helper.is_terminated():
return
[docs]def is_study_locked(folder: str) -> bool:
"""
Returns true if the given folder is locked, i.e. if another process is already working on the study
"""
path = Path(folder)
folder_status = (
(path / mercure_names.LOCK).exists()
or (path / mercure_names.PROCESSING).exists()
or len(list(path.glob(mercure_names.DCMFILTER))) == 0
)
return folder_status
[docs]def is_study_complete(folder: str, pending_series: Dict[str, float]) -> bool:
"""
Returns true if the study in the given folder is ready for processing, i.e. if the completeness criteria of the triggered rule has been met
"""
try:
logger.debug(f"Checking completeness of study {folder}, with pending series: {pending_series}")
# Read stored task file to determine completeness criteria
with open(Path(folder) / mercure_names.TASKFILE, "r") as json_file:
task: TaskHasStudy = TaskHasStudy(**json.load(json_file))
if task.study.complete_force == True:
return True
if (Path(folder) / mercure_names.FORCE_COMPLETE).exists():
task.study.complete_force = True
with open(Path(folder) / mercure_names.TASKFILE, "w") as json_file:
json.dump(task.dict(), json_file)
return True
study = task.study
# Check if processing of the study has been enforced (e.g., via UI selection)
complete_trigger = study.complete_trigger
if not complete_trigger:
logger.error(f"Missing trigger condition in task file in study folder {folder}", task.id) # handle_error
return False
complete_required_series = study.get("complete_required_series", "")
# If trigger condition is received series but list of required series is missing, then switch to timeout mode instead
if (complete_trigger == mercure_rule.STUDY_TRIGGER_CONDITION_RECEIVED_SERIES) and (
not complete_required_series
):
complete_trigger = mercure_rule.STUDY_TRIGGER_CONDITION_TIMEOUT
logger.warning( # handle_error
f"Missing series for trigger condition in study folder {folder}. Using timeout instead", task.id
)
# Check for trigger condition
if complete_trigger == mercure_rule.STUDY_TRIGGER_CONDITION_TIMEOUT:
return check_study_timeout(task, pending_series)
elif complete_trigger == mercure_rule.STUDY_TRIGGER_CONDITION_RECEIVED_SERIES:
return check_study_series(task, complete_required_series)
else:
logger.error(f"Invalid trigger condition in task file in study folder {folder}", task.id) # handle_error
return False
except Exception:
logger.error(f"Invalid task file in study folder {folder}", task.id) # handle_error
return False
[docs]def check_study_timeout(task: TaskHasStudy, pending_series: Dict[str, float]) -> bool:
"""
Checks if the duration since the last series of the study was received exceeds the study completion timeout
"""
logger.debug("Checking study timeout")
study = task.study
last_received_string = study.last_receive_time
logger.debug(f"Last received time: {last_received_string}, {datetime.now()}")
if not last_received_string:
return False
last_receive_time = datetime.strptime(last_received_string, "%Y-%m-%d %H:%M:%S")
if datetime.now() > last_receive_time + timedelta(seconds=config.mercure.study_complete_trigger):
# Check if there is a pending series on this study. If so, we need to wait for it to timeout before we can complete the study
for series_uid in pending_series.keys():
example_file = next(Path(config.mercure.incoming_folder).glob(f"{series_uid}*.tags"))
tags_list = json.loads(example_file.read_text())
if tags_list["StudyInstanceUID"] == study.study_uid:
logger.debug(f"Timeout met, but found a pending series ({series_uid}) in study {study.study_uid}")
return False
return True
else:
return False
[docs]def check_study_series(task: TaskHasStudy, required_series: str) -> bool:
"""
Checks if all series required for study completion have been received
"""
received_series = []
# Fetch the list of received series descriptions from the task file
if (task.study.received_series) and (isinstance(task.study.received_series, list)):
received_series = task.study.received_series
# Check if the completion criteria is fulfilled
return rule_evaluation.parse_completion_series(task.id, required_series, received_series)
@log_helpers.clear_task_decorator
def route_study(study) -> bool:
"""
Processses the study in the folder 'study'. Loads the task file and delegates the action to helper functions
"""
logger.debug(f"Route_study {study}")
study_folder = config.mercure.studies_folder + "/" + study
if is_study_locked(study_folder):
# If the study folder has been locked in the meantime, then skip and proceed with the next one
return True
# Create lock file in the study folder and prevent other instances from working on this study
lock_file = Path(study_folder + "/" + study + mercure_names.LOCK)
if lock_file.exists():
return True
try:
lock = helper.FileLock(lock_file)
except:
# Can't create lock file, so something must be seriously wrong
try:
with open(Path(study_folder) / mercure_names.TASKFILE, "r") as json_file:
task: Task = Task(**json.load(json_file))
logger.error(f"Unable to create study lock file {lock_file}", task.id) # handle_error
except:
logger.error(f"Unable to create study lock file {lock_file}", None) # handle_error
return False
try:
# Read stored task file to determine completeness criteria
with open(Path(study_folder) / mercure_names.TASKFILE, "r") as json_file:
task = Task(**json.load(json_file))
except Exception:
try:
with open(Path(study_folder) / mercure_names.TASKFILE, "r") as json_file:
logger.error(
f"Invalid task file in study folder {study_folder}", json.load(json_file)["id"]
) # handle_error
except:
logger.error(f"Invalid task file in study folder {study_folder}", None) # handle_error
return False
logger.setTask(task.id)
action_result = True
info: TaskInfo = task.info
action = info.get("action", "")
if not action:
logger.error(f"Missing action in study folder {study_folder}", task.id) # handle_error
return False
# TODO: Clean folder for duplicate DICOMs (i.e., if series have been sent twice -- check by instance UID)
if action == mercure_actions.NOTIFICATION:
action_result = push_studylevel_notification(study, task)
elif action == mercure_actions.ROUTE:
action_result = push_studylevel_dispatch(study, task)
elif action == mercure_actions.PROCESS or action == mercure_actions.BOTH:
action_result = push_studylevel_processing(study, task)
else:
# This point should not be reached (discard actions should be handled on the series level)
logger.error(f"Invalid task action in study folder {study_folder}", task.id) # handle_error
return False
if not action_result:
logger.error(f"Error during processing of study {study}", task.id) # handle_error
return False
if not remove_study_folder(task.id, study, lock):
logger.error(f"Error removing folder of study {study}", task.id) # handle_error
return False
return True
[docs]def push_studylevel_dispatch(study: str, task: Task) -> bool:
"""
Pushes the study folder to the dispatchter, including the generated task file containing the destination information
"""
trigger_studylevel_notification(study, task, mercure_events.RECEIVED)
return move_study_folder(task.id, study, "OUTGOING")
[docs]def push_studylevel_processing(study: str, task: Task) -> bool:
"""
Pushes the study folder to the processor, including the generated task file containing the processing instructions
"""
trigger_studylevel_notification(study, task, mercure_events.RECEIVED)
return move_study_folder(task.id, study, "PROCESSING")
[docs]def push_studylevel_notification(study: str, task: Task) -> bool:
"""
Executes the study-level reception notification
"""
trigger_studylevel_notification(study, task, mercure_events.RECEIVED)
trigger_studylevel_notification(study, task, mercure_events.COMPLETED)
move_study_folder(task.id, study, "SUCCESS")
return True
[docs]def push_studylevel_error(study: str) -> None:
"""
Pushes the study folder to the error folder after unsuccessful processing
"""
study_folder = config.mercure.studies_folder + "/" + study
lock_file = Path(study_folder + "/" + study + mercure_names.LOCK)
if lock_file.exists():
# Study normally shouldn't be locked at this point, but since it is, just exit and wait.
# Might require manual intervention if a former process terminated without removing the lock file
return
try:
lock = helper.FileLock(lock_file)
except:
# Can't create lock file, so something must be seriously wrong
logger.error(f"Unable to lock study for removal {lock_file}") # handle_error
return
if not move_study_folder(None, study, "ERROR"):
# At this point, we can only wait for manual intervention
logger.error(f"Unable to move study to ERROR folder {lock_file}") # handle_error
return
if not remove_study_folder(None, study, lock):
logger.error(f"Unable to delete study folder {lock_file}") # handle_error
return
[docs]def move_study_folder(task_id: Union[str, None], study: str, destination: str) -> bool:
"""
Moves the study subfolder to the specified destination with proper locking of the folders
"""
logger.debug(f"Move_study_folder {study} to {destination}")
source_folder = config.mercure.studies_folder + "/" + study
destination_folder = config.mercure.discard_folder
if destination == "PROCESSING":
destination_folder = config.mercure.processing_folder
elif destination == "SUCCESS":
destination_folder = config.mercure.success_folder
elif destination == "ERROR":
destination_folder = config.mercure.error_folder
elif destination == "OUTGOING":
destination_folder = config.mercure.outgoing_folder
else:
logger.error(f"Unknown destination {destination} requested for {study}", task_id) # handle_error
return False
# Create unique name of destination folder
destination_folder += "/" + str(uuid.uuid1())
# Create the destination folder and validate that is has been created
try:
os.mkdir(destination_folder)
except Exception:
logger.error(f"Unable to create study destination folder {destination_folder}", task_id) # handle_error
return False
if not Path(destination_folder).exists():
logger.error(f"Creating study destination folder not possible {destination_folder}", task_id) # handle_error
return False
# Create lock file in destination folder (to prevent any other module to work on the folder). Note that
# the source folder has already been locked in the parent function.
lock_file = Path(destination_folder) / mercure_names.LOCK
try:
lock = helper.FileLock(lock_file)
except:
# Can't create lock file, so something must be seriously wrong
logger.error(f"Unable to create lock file {destination_folder}/{mercure_names.LOCK}", task_id) # handle_error
return False
# Move all files except the lock file
# FIXME: if we don't use a list instead of an iterator, in testing we get an error from pyfakefs about the iterator changing during the iteration
for entry in list(os.scandir(source_folder)):
# Move all files but exclude the lock file in the source folder
if not entry.name.endswith(mercure_names.LOCK):
try:
shutil.move(source_folder + "/" + entry.name, destination_folder + "/" + entry.name)
except Exception:
logger.error( # handle_error
f"Problem while pushing file {entry} from {source_folder} to {destination_folder}", task_id
)
# Remove the lock file in the target folder. Would happen automatically when leaving the function,
# but better to do explicitly with error handling
try:
lock.free()
except:
# Can't delete lock file, so something must be seriously wrong
logger.error(f"Unable to remove lock file {lock_file}", task_id) # handle_error
return False
return True
[docs]def remove_study_folder(task_id: Union[str, None], study: str, lock: helper.FileLock) -> bool:
"""
Removes a study folder containing nothing but the lock file (called during cleanup after all files have
been moved somewhere else already)
"""
study_folder = config.mercure.studies_folder + "/" + study
# Remove the lock file
try:
lock.free()
except:
# Can't delete lock file, so something must be seriously wrong
logger.error(f"Unable to remove lock file while removing study folder {study}", task_id) # handle_error
return False
# Remove the empty study folder
try:
shutil.rmtree(study_folder)
except Exception as e:
logger.error(f"Unable to delete study folder {study_folder}", task_id) # handle_error
return True
[docs]def trigger_studylevel_notification(study: str, task: Task, event: mercure_events) -> bool:
# Check if the applied_rule is available
current_rule = task.info.applied_rule
if not current_rule:
logger.error(f"Missing applied_rule in task file in study {study}", task.id) # handle_error
return False
notification.trigger_notification_for_rule(current_rule, task.id, event,task=task)
return True