"""
generate_taskfile.py
====================
Helper functions for generating task files in json format,
which describe the job to be done and maintain a journal of the executed actions.
"""
# Standard python includes
import json
import pprint
import socket
from datetime import datetime
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple, Union, cast
# App-specific includes
import common.config as config
import common.monitor as monitor
from common.constants import mercure_actions, mercure_defs, mercure_names, mercure_options, mercure_rule
from common.helper import get_now_str
from common.types import EmptyDict, Rule, Task, TaskDispatch, TaskDispatchStatus, TaskInfo, TaskProcessing, TaskStudy
from typing_extensions import Literal
# Create local logger instance
logger = config.get_logger()
[docs]def compose_task(
task_id: str,
uid: str,
uid_type: Literal["series", "study"],
triggered_rules: Dict[str, Literal[True]],
applied_rule: str,
tags_list: Dict[str, str],
target: str,
) -> Task:
"""
Composes the JSON content that is written into a task file when submitting a job (for processing, dispatching, or both)
"""
logger.debug("Composing task.")
task = Task(
id=task_id,
# Add general information about the job
info=add_info(uid, uid_type, triggered_rules, applied_rule, tags_list),
# Add dispatch information -- completed only if the job includes a dispatching step
dispatch=add_dispatching(task_id, uid, applied_rule, target) or cast(EmptyDict, {}),
# Add processing information -- completed only if the job includes a processing step
process=add_processing(applied_rule) or cast(EmptyDict, {}),
# Add information about the study, included all collected series
study=add_study(uid, uid_type, applied_rule, tags_list) or cast(EmptyDict, {}),
)
# task.dispatch = "foo"
logger.debug("Generated task:")
logger.debug(pprint.pformat(task.dict()))
return task
[docs]def add_processing(applied_rule: str) -> Optional[Union[TaskProcessing, List[TaskProcessing]]]:
"""
Adds information about the desired processing step into the task file,
which is evaluated by the processing module
"""
# If the applied_rule name is empty, don't add processing information
# (rules with processing steps always have applied_rule set)
if not applied_rule:
return None
applied_rule_info: Rule = config.mercure.rules[applied_rule]
logger.debug(f"Applied rule info: {applied_rule_info}")
if applied_rule_info.action not in (
mercure_actions.PROCESS,
mercure_actions.BOTH,
):
return None
# TODO: Revise this part. Needs to be prepared for sequential execution of modules
# Get the name of the module that should be triggered
module_names: List[str] = []
if isinstance(applied_rule_info.processing_module, str):
module_names = [applied_rule_info.processing_module]
else:
module_names = applied_rule_info.processing_module
logger.info(f"module: {module_names}")
process_infos = []
for i, module_name in enumerate(module_names):
# Get the configuration of this module
if module_name not in config.mercure.modules:
logger.warning(f"Module {module_name} not found in configuration modules ({list(config.mercure.modules.keys())})")
module_config = config.mercure.modules.get(module_name, None)
# Compose the processing settings that should be used (module level + rule level)
settings: Dict[str, Any] = {}
if module_config is not None:
settings.update(module_config.settings)
if isinstance(applied_rule_info.processing_settings, list):
if i < len(applied_rule_info.processing_settings):
settings.update(applied_rule_info.processing_settings[i])
else:
settings.update(applied_rule_info.processing_settings)
# Store in the target structure
process_info: TaskProcessing = TaskProcessing(
module_name=module_name,
module_config=module_config,
settings=settings,
retain_input_images=applied_rule_info.processing_retain_images,
)
process_infos.append(process_info)
if len(process_infos) > 1:
return process_infos
return process_infos[0]
[docs]def add_study(
uid: str, uid_type: Literal["series", "study"], applied_rule: str, tags_list: Dict[str, str]
) -> Optional[TaskStudy]:
"""
Adds study information into the task file. Returns nothing if the task is a series-level task
"""
# If the current task is a series task, then don't add study information
if uid_type == "series":
return None
study_info: TaskStudy = TaskStudy(
study_uid=uid,
complete_trigger=config.mercure.rules[applied_rule].study_trigger_condition,
complete_required_series=config.mercure.rules[applied_rule].study_trigger_series,
creation_time=datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
last_receive_time=datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
received_series=[tags_list.get("SeriesDescription", mercure_options.INVALID)],
received_series_uid=[tags_list.get("SeriesInstanceUID", mercure_options.INVALID)],
complete_force=False,
complete_force_action=config.mercure.rules[applied_rule].study_force_completion_action,
)
return study_info
[docs]def add_dispatching(
task_id: str, uid: str, applied_rule: str, target: Union[str, List[str]]
) -> Optional[TaskDispatch]:
"""
Adds information about the desired dispatching step into the task file, which is evaluated by the dispatcher.
For series-level dispatching, the target information is provided in string "target", as dispatch operations
from multiple rules to the same target are combined (to avoid double sending). In all other cases, the
applied_rule is provided and the target information is taken from the rule definition.
"""
logger.debug("Maybe adding dispatching...")
perform_dispatch = False
if not applied_rule and not target:
# applied_rule and target should not be empty at the same time!
logger.warning(f"Applied_rule and target empty. Cannot add dispatch information for UID {uid}")
return None
targets_used: List[str] = []
if isinstance(target, str):
# Only insert string into list if it is not empty
if target:
targets_used = [target]
else:
targets_used = target
# Check if a target string is provided (i.e., job is from series-level dispatching).
# If so, the images should be dispatched in any case
if len(targets_used) > 0:
logger.debug(f"Adding dispatching info because series-level target {target} specified")
perform_dispatch = True
else:
# If no target string is provided, read the target defined in the provided applied rule
rule_target = config.mercure.rules[applied_rule].target
if isinstance(rule_target, str):
targets_used = [rule_target]
else:
targets_used = rule_target
# Applied_rule involves dispatching and target has been set? Then go forward with dispatching
if (
config.mercure.rules[applied_rule].get(mercure_rule.ACTION, mercure_actions.PROCESS)
in (mercure_actions.ROUTE, mercure_actions.BOTH)
) and len(targets_used) > 0:
logger.debug(f"Adding dispatching info because rule specified target {target}")
perform_dispatch = True
# If dispatching should not be performed, just return
if not perform_dispatch:
logger.debug("Not adding dispatch information.")
return None
target_status: Dict[str, TaskDispatchStatus] = {}
current_time = get_now_str()
for target_item in targets_used:
# logger.info(f"Adding target {target_item} to dispatching info")
# Check if all selected targets actually exist in the configuration (could have been deleted by now)
if not config.mercure.targets.get(target_item, {}):
logger.error(f"Target {target_item} does not exist for UID {uid}", task_id) # handle_error
return None
target_status[target_item] = TaskDispatchStatus(state="waiting", time=current_time)
# All looks good, fill the dispatching section and return it
return TaskDispatch(
target_name=targets_used,
status=target_status,
retries=None,
next_retry_at=None,
)
[docs]def add_info(
uid: str,
uid_type: Literal["series", "study"],
triggered_rules: Dict[str, Literal[True]],
applied_rule: str,
tags_list: Dict[str, str],
) -> TaskInfo:
"""
Adds general information into the task file
"""
if applied_rule:
task_action = config.mercure.rules[applied_rule].get("action", "process")
else:
task_action = "route"
return TaskInfo(
action=task_action,
uid=uid,
uid_type=uid_type,
triggered_rules=triggered_rules,
applied_rule=applied_rule,
patient_name=tags_list.get("PatientName", mercure_options.MISSING),
mrn=tags_list.get("PatientID", mercure_options.MISSING),
acc=tags_list.get("AccessionNumber", mercure_options.MISSING),
sender_address=tags_list.get("SenderAddress", mercure_options.MISSING),
sender_aet=tags_list.get("SenderAET", mercure_options.MISSING),
receiver_aet=tags_list.get("ReceiverAET", mercure_options.MISSING),
device_serial_number=tags_list.get("DeviceSerialNumber"),
mercure_version=mercure_defs.VERSION,
mercure_appliance=config.mercure.appliance_name,
mercure_server=socket.gethostname(),
)
[docs]def create_series_task(
task_id: str,
folder_name: Path,
triggered_rules: Dict[str, Literal[True]],
applied_rule: str,
series_UID: str,
tags_list: Dict[str, str],
target: str,
) -> bool:
"""
Writes a task file for the received series, containing all information needed by the processor and dispatcher.
Additional information is written into the file as well
"""
# Compose the JSON content for the file
task = compose_task(task_id, series_UID, "series", triggered_rules, applied_rule, tags_list, target)
monitor.send_update_task(task)
task_filename = folder_name / mercure_names.TASKFILE
try:
with open(task_filename, "w") as task_file:
json.dump(task.dict(), task_file)
except Exception:
logger.error(
f"Unable to create series task file {task_filename} with contents {task.dict()}", task.id
) # handle_error
return False
return True
[docs]def create_study_task(
task_id: str,
target_folder: Path,
triggered_rules: Dict[str, Literal[True]],
applied_rule: str,
study_UID: str,
tags_list: Dict[str, str],
) -> bool:
"""
Generate task file with information on the study
"""
# Compose the JSON content for the file
task = compose_task(task_id, study_UID, "study", triggered_rules, applied_rule, tags_list, "")
monitor.send_update_task(task)
task_filename = target_folder / mercure_names.TASKFILE
logger.debug(f"Writing study task file {task_filename}")
try:
task.to_file(task_filename)
except Exception:
logger.error(f"Unable to create study task file {task_filename}", task.id) # handle_error
return False
return True
[docs]def update_study_task(
task_id: str,
folder: Path,
triggered_rules: Dict[str, Literal[True]],
applied_rule: str,
study_UID: str,
tags_list: Dict[str, str],
) -> Tuple[bool, str]:
"""
Update the study task file with information from the latest received series
"""
series_description = tags_list.get("SeriesDescription", mercure_options.INVALID)
series_uid = tags_list.get("SeriesInstanceUID", mercure_options.INVALID)
task_filename = folder / mercure_names.TASKFILE
# Load existing task file. Raise error if it does not exist
try:
task = Task.from_file(task_filename)
except Exception:
logger.error(f"Unable to open study task file {task_filename}", task_id) # handle_error
return False, ""
# Ensure that the task file contains the study information
if not task.study:
logger.error(f"Study information missing in task file {task_filename}", task_id) # handle_error
return False, ""
study = cast(TaskStudy, task.study)
# Remember the time when the last series was received, as needed to determine completion on timeout
study.last_receive_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
# Remember all received series descriptions, as needed to determine completion on received series
if study.received_series and (isinstance(study.received_series, list)):
study.received_series.append(series_description)
else:
study.received_series = [series_description]
# Also remember the received SeriesUIDs for information purpose
if study.received_series_uid and (isinstance(study.received_series_uid, list)):
study.received_series_uid.append(series_uid)
else:
study.received_series_uid = [series_uid]
# Safe the updated file back to disk
try:
task.to_file(task_filename)
except Exception:
logger.exception(f"Unable to write task file {task_filename}", task.id) # handle_error
return False, ""
monitor.send_update_task(task)
return True, task.id