"""
generate_taskfile.py
====================
Helper functions for generating task files in json format, which describe the job to be done and maintain a journal of the executed actions.
"""
# Standard python includes
import json
from pathlib import Path
import daiquiri
import socket
from datetime import datetime
import pprint
# from mypy_extensions import TypedDict
from typing_extensions import Literal
from typing import Dict, Optional, cast, Tuple
from common.types import *
# App-specific includes
import common.config as config
import common.monitor as monitor
from common.types import *
from common.constants import (
mercure_defs,
mercure_names,
mercure_rule,
mercure_options,
mercure_actions,
)
# Create local logger instance
logger = config.get_logger()
[docs]def compose_task(
task_id: str,
uid: str,
uid_type: Literal["series", "study"],
triggered_rules: Dict[str, Literal[True]],
applied_rule: str,
tags_list: Dict[str, str],
target: str,
) -> Task:
"""
Composes the JSON content that is written into a task file when submitting a job (for processing, dispatching, or both)
"""
logger.debug("Composing task.")
task = Task(
id=task_id,
# Add general information about the job
info=add_info(uid, uid_type, triggered_rules, applied_rule, tags_list),
# Add dispatch information -- completed only if the job includes a dispatching step
dispatch=add_dispatching(task_id, uid, applied_rule, tags_list, target) or cast(EmptyDict, {}),
# Add processing information -- completed only if the job includes a processing step
process=add_processing(uid, applied_rule, tags_list) or cast(EmptyDict, {}),
# Add information about the study, included all collected series
study=add_study(uid, uid_type, applied_rule, tags_list) or cast(EmptyDict, {}),
)
# task.dispatch = "foo"
logger.debug("Generated task:")
logger.debug(pprint.pformat(task.dict()))
return task
[docs]def add_processing(uid: str, applied_rule: str, tags_list: Dict[str, str]) -> Optional[Union[TaskProcessing,List[TaskProcessing]]]:
"""
Adds information about the desired processing step into the task file, which is evaluated by the processing module
"""
# If the applied_rule name is empty, don't add processing information (rules with processing steps always have applied_rule set)
if not applied_rule:
return None
applied_rule_info: Rule = config.mercure.rules[applied_rule]
logger.debug(f"Applied rule info: {applied_rule_info}")
if applied_rule_info.action not in (
mercure_actions.PROCESS,
mercure_actions.BOTH,
):
return None
# TODO: Revise this part. Needs to be prepared for sequential execution of modules
# Get the name of the module that should be triggered
module_names: List[str] = []
if isinstance(applied_rule_info.processing_module,str):
module_names = [applied_rule_info.processing_module]
else:
module_names = applied_rule_info.processing_module
logger.info(f"module: {module_names}")
process_infos = []
for i, module_name in enumerate(module_names):
# Get the configuration of this module
module_config = config.mercure.modules.get(module_name, None)
# Compose the processing settings that should be used (module level + rule level)
settings: Dict[str, Any] = {}
if module_config is not None:
settings.update(module_config.settings)
rule_settings: List[Dict[str,Any]] = []
if isinstance(applied_rule_info.processing_settings,list):
rule_settings = applied_rule_info.processing_settings
else:
rule_settings = [applied_rule_info.processing_settings]
if i < len(rule_settings):
settings.update(rule_settings[i])
# Store in the target structure
process_info: TaskProcessing = TaskProcessing(
module_name=module_name,
module_config=module_config,
settings=settings,
retain_input_images=applied_rule_info.processing_retain_images,
)
process_infos.append(process_info)
if len(process_infos) > 1:
return process_infos
return process_infos[0]
[docs]def add_study(
uid: str, uid_type: Literal["series", "study"], applied_rule: str, tags_list: Dict[str, str]
) -> Optional[TaskStudy]:
"""
Adds study information into the task file. Returns nothing if the task is a series-level task
"""
# If the current task is a series task, then don't add study information
if uid_type == "series":
return None
study_info: TaskStudy = TaskStudy(
study_uid=uid,
complete_trigger=config.mercure.rules[applied_rule].study_trigger_condition,
complete_required_series=config.mercure.rules[applied_rule].study_trigger_series,
creation_time=datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
last_receive_time=datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
received_series=[tags_list.get("SeriesDescription", mercure_options.INVALID)],
received_series_uid=[tags_list.get("SeriesInstanceUID", mercure_options.INVALID)],
complete_force=False,
)
return study_info
[docs]def add_dispatching(
task_id: str, uid: str, applied_rule: str, tags_list: Dict[str, str], target: str
) -> Optional[TaskDispatch]:
"""
Adds information about the desired dispatching step into the task file, which is evaluated by the dispatcher. For series-level dispatching,
the target information is provided in string "target", as dispatch operations from multiple rules to the same target are combined (to avoid
double sending). In all other cases, the applied_rule is provided and the target information is taken from the rule definition.
"""
logger.debug("Maybe adding dispatching...")
perform_dispatch = False
if not applied_rule and not target:
# applied_rule and target should not be empty at the same time!
logger.warning(f"Applied_rule and target empty. Cannot add dispatch information for UID {uid}")
return None
target_used: str = target
# Check if a target string is provided (i.e., job is from series-level dispatching). If so, the images should be dispatched in any case
if target_used:
logger.debug(f"Adding dispatching info because series-level target {target} specified")
perform_dispatch = True
else:
# If no target string is provided, read the target defined in the provided applied rule
target_used = config.mercure.rules[applied_rule].get("target", "")
# Applied_rule involves dispatching and target has been set? Then go forward with dispatching
if (
config.mercure.rules[applied_rule].get(mercure_rule.ACTION, mercure_actions.PROCESS)
in (mercure_actions.ROUTE, mercure_actions.BOTH)
) and target_used:
logger.debug(f"Adding dispatching info because rule target {target} specified")
perform_dispatch = True
# If dispatching should not be performed, just return
if not perform_dispatch:
logger.debug("Not adding dispatch information.")
return None
# Check if the selected target actually exists in the configuration (could have been deleted by now)
if not config.mercure.targets.get(target_used, {}):
logger.error(f"Target {target_used} does not exist for UID {uid}", task_id) # handle_error
return None
# All looks good, fill the dispatching section and return it
target_info = config.mercure.targets[target_used]
return TaskDispatch(
target_name=target_used,
retries=None,
next_retry_at=None,
)
[docs]def add_info(
uid: str,
uid_type: Literal["series", "study"],
triggered_rules: Dict[str, Literal[True]],
applied_rule: str,
tags_list: Dict[str, str],
) -> TaskInfo:
"""
Adds general information into the task file
"""
if applied_rule:
task_action = config.mercure.rules[applied_rule].get("action", "process")
else:
task_action = "route"
return TaskInfo(
action=task_action,
uid=uid,
uid_type=uid_type,
triggered_rules=triggered_rules,
applied_rule=applied_rule,
patient_name=tags_list.get("PatientName", mercure_options.MISSING),
mrn=tags_list.get("PatientID", mercure_options.MISSING),
acc=tags_list.get("AccessionNumber", mercure_options.MISSING),
sender_address=tags_list.get("SenderAddress", mercure_options.MISSING),
device_serial_number=tags_list.get("DeviceSerialNumber"),
mercure_version=mercure_defs.VERSION,
mercure_appliance=config.mercure.appliance_name,
mercure_server=socket.gethostname(),
)
[docs]def create_series_task(
task_id: str,
folder_name: str,
triggered_rules: Dict[str, Literal[True]],
applied_rule: str,
series_UID: str,
tags_list: Dict[str, str],
target: str,
) -> bool:
"""
Writes a task file for the received series, containing all information needed by the processor and dispatcher. Additional information is written into the file as well
"""
# Compose the JSON content for the file
task = compose_task(task_id, series_UID, "series", triggered_rules, applied_rule, tags_list, target)
monitor.send_update_task(task)
task_filename = folder_name + mercure_names.TASKFILE
try:
with open(task_filename, "w") as task_file:
json.dump(task.dict(), task_file)
except:
logger.error(
f"Unable to create series task file {task_filename} with contents {task.dict()}", task.id
) # handle_error
return False
return True
[docs]def create_study_task(
task_id: str,
folder_name: str,
triggered_rules: Dict[str, Literal[True]],
applied_rule: str,
study_UID: str,
tags_list: Dict[str, str],
) -> bool:
"""
Generate task file with information on the study
"""
# Compose the JSON content for the file
task = compose_task(task_id, study_UID, "study", triggered_rules, applied_rule, tags_list, "")
monitor.send_update_task(task)
task_filename = folder_name + mercure_names.TASKFILE
logger.debug(f"Writing study task file {task_filename}")
try:
with open(task_filename, "w") as task_file:
json.dump(task.dict(), task_file)
except:
logger.error(f"Unable to create study task file {task_filename}", task.id) # handle_error
return False
return True
[docs]def update_study_task(
task_id: str,
folder_name: str,
triggered_rules: Dict[str, Literal[True]],
applied_rule: str,
study_UID: str,
tags_list: Dict[str, str],
) -> Tuple[bool, str]:
"""
Update the study task file with information from the latest received series
"""
series_description = tags_list.get("SeriesDescription", mercure_options.INVALID)
series_uid = tags_list.get("SeriesInstanceUID", mercure_options.INVALID)
task_filename = folder_name + mercure_names.TASKFILE
# Load existing task file. Raise error if it does not exist
try:
with open(task_filename, "r") as task_file:
task: Task = Task(**json.load(task_file))
except:
logger.error(f"Unable to open study task file {task_filename}", task_id) # handle_error
return False, ""
# Ensure that the task file contains the study information
if not task.study:
logger.error(f"Study information missing in task file {task_filename}", task_id) # handle_error
return False, ""
study = cast(TaskStudy, task.study)
# Remember the time when the last series was received, as needed to determine completion on timeout
study.last_receive_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
# Remember all received series descriptions, as needed to determine completion on received series
if study.received_series and (isinstance(study.received_series, list)):
study.received_series.append(series_description)
else:
study.received_series = [series_description]
# Also remember the received SeriesUIDs for information purpose
if study.received_series_uid and (isinstance(study.received_series_uid, list)):
study.received_series_uid.append(series_uid)
else:
study.received_series_uid = [series_uid]
# Safe the updated file back to disk
try:
with open(task_filename, "w") as task_file:
json.dump(task.dict(), task_file)
except:
logger.error(f"Unable to write task file {task_filename}", task.id) # handle_error
return False, ""
monitor.send_update_task(task)
return True, task.id