Source code for routing.route_series

"""
route_series.py
===============
Provides functions for routing/processing of series. For study-level processing, series will be pushed into study folders.
"""

import json
# Standard python includes
import os
import shutil
import typing
from pathlib import Path
from typing import Any, Callable, Dict, List, Optional, Tuple, Union

# App-specific includes
import common.config as config
import common.helper as helper
import common.log_helpers as log_helpers
import common.monitor as monitor
import common.notification as notification
import common.rule_evaluation as rule_evaluation
from common.constants import mercure_actions, mercure_defs, mercure_events, mercure_names, mercure_options, mercure_rule
from common.types import Rule
from pydicom import dcmread
from routing.common import generate_task_id
from routing.generate_taskfile import create_series_task, create_study_task, update_study_task
from typing_extensions import Literal

# Create local logger instance
logger = config.get_logger()


@log_helpers.clear_task_decorator
def route_series(task_id: str, series_UID: str, files: typing.List[Path] = []) -> None:
    """
    Processes the series with the given series UID from the incoming folder.
    """
    logger.setTask(task_id)
    monitor.send_register_task(task_id, series_UID)
    base_dir = Path(config.mercure.incoming_folder) / series_UID
    lock_file = base_dir / mercure_names.LOCK
    if lock_file.exists():
        # Series is locked, so another instance might be working on it
        return

    # Create lock file in the incoming folder and prevent other instances from working on this series
    try:
        lock = helper.FileLock(lock_file)
    except FileNotFoundError:
        # Series likely already processed by other instance of router
        logger.debug("Series {} is already moved, skipping".format(series_UID))
        return
    except FileExistsError:
        # Series likely already processed by other instance of router
        logger.debug("Series {} is locked, skipping".format(series_UID))
        return
    except Exception:
        # Can't create lock file, so something must be seriously wrong
        logger.error(f"Unable to create lock file {lock_file}", task_id)  # handle_error
        return

    logger.info(f"Evaluating series {series_UID}")
    fileList = []
    seriesPrefix = series_UID + mercure_defs.SEPARATOR

    if not files:
        # Collect all files belonging to the series
        for entry in os.scandir(base_dir):
            if entry.name.endswith(mercure_names.TAGS) and entry.name.startswith(seriesPrefix) and not entry.is_dir():
                stemName = entry.name[:-5]
                fileList.append(stemName)
        logger.debug(f"Found files: {fileList}")
    else:
        fileList = [str(f.with_suffix("")) for f in files]

    logger.info("DICOM files found: " + str(len(fileList)))
    if not len(fileList):
        logger.error(f"No tags files found for series {series_UID}", task_id)  # handle_error
        lock.free()
        return

    if config.mercure.store_sample_dicom_tags:
        example_dcm: Optional[Path] = base_dir / (fileList[0] + ".dcm")
        if not example_dcm or not example_dcm.exists():
            example_dcm = base_dir / (fileList[0])
            if not example_dcm.exists():
                example_dcm = None
        if example_dcm:
            try:
                monitor.send_update_task_tags(task_id, dcmread(example_dcm, stop_before_pixels=True).to_json_dict())
            except:
                logger.exception("Error reading example DICOM file", task_id)

    # Use the tags file from the first slice for evaluating the routing rules
    tagsMasterFile = base_dir / (fileList[0] + mercure_names.TAGS)
    if not tagsMasterFile.exists():
        logger.error(f"Missing file! {tagsMasterFile.name}", task_id)  # handle_error
        lock.free()
        return

    tagsList_encoding_error = False
    try:
        tagsList: Dict[str, str] = {}
        try:
            with open(tagsMasterFile, "r", encoding="utf-8", errors="strict") as json_file:
                tagsList = json.load(json_file)
        except UnicodeDecodeError:
            with open(tagsMasterFile, "r", encoding="utf-8", errors="surrogateescape") as json_file:
                tagsList = json.load(json_file)
                tagsList_encoding_error = True

    except Exception:
        logger.exception(f"Invalid tag for series {series_UID}", task_id)  # handle_error
        lock.free()
        return

    monitor.send_register_series(tagsList)

    # Now test the routing rules and evaluate which rules have been triggered. If one of the triggered
    # rules enforces discarding, discard_series will be True.
    discard_series = ""
    triggered_rules, discard_series = get_triggered_rules(task_id, tagsList)

    monitor.send_task_event(
        monitor.task_event.REGISTER, task_id, len(fileList), ", ".join(triggered_rules), "Registered series"
    )

    if (len(triggered_rules) == 0) or (discard_series):
        # If no routing rule has triggered or discarding has been enforced, discard the series
        push_series_complete(task_id, fileList, series_UID, "DISCARD", discard_series, False,
                             tagsList_encoding_error=tagsList_encoding_error)
    else:
        # File handling strategy: If only one triggered rule, move files (faster than copying). If multiple rules, copy files
        push_series_studylevel(task_id, triggered_rules, fileList, series_UID, tagsList)
        push_series_serieslevel(task_id, triggered_rules, fileList, series_UID, tagsList)

        # If more than one rule has triggered, the series files need to be removed
        # TODO: This can probably be avoided since the files are now contained in a separate folder
        #       for each series, so that files will be removed automatically by the rmtree call
        if len(triggered_rules) > 1:
            remove_series(task_id, fileList, series_UID)

    try:
        lock.free()
    except Exception:
        logger.error(f"Unable to remove lock file {lock_file}", task_id)  # handle_error
        return
    shutil.rmtree(base_dir)


[docs]def get_triggered_rules( task_id: str, tagList: Dict[str, str] ) -> Tuple[Dict[str, Literal[True]], Union[Any, Literal[""]]]: """ Evaluates the routing rules and returns a list with triggered rules. """ triggered_rules: Dict[str, Literal[True]] = {} discard_rule = "" fallback_rule = "" if "mercureForceRule" in tagList: force_rule = tagList["mercureForceRule"] if force_rule not in config.mercure.rules: logger.error(f"Invalid force rule {force_rule} for task {task_id}", task_id) return {}, "" triggered_rules[force_rule] = True else: # Iterate over all defined processing rules for current_rule in config.mercure.rules: try: rule: Rule = config.mercure.rules[current_rule] # Check if the current rule has been disabled if rule.disabled is True: continue # If the current rule is flagged as fallback rule, remember the name # (to avoid having to iterate over the rules again) if rule.fallback is True: fallback_rule = current_rule # Check if the current rule is triggered for the provided tag set if rule_evaluation.parse_rule(rule.get("rule", "False"), tagList)[0]: triggered_rules[current_rule] = True if rule.get(mercure_rule.ACTION, "") == mercure_actions.DISCARD: discard_rule = current_rule # If the triggered rule's action is to discard, stop further iteration over the rules break except Exception: logger.error(f"Invalid rule found: {current_rule}", task_id) # handle_error continue # If no rule has triggered but a fallback rule exists, then apply this rule if (len(triggered_rules) == 0) and (fallback_rule): triggered_rules[fallback_rule] = True if config.mercure.rules[fallback_rule].get(mercure_rule.ACTION, "") == mercure_actions.DISCARD: discard_rule = fallback_rule logger.info("Triggered rules:") logger.info(triggered_rules) return triggered_rules, discard_rule
[docs]def push_series_complete( task_id: str, file_list: List[str], series_UID: str, destination: str, discard_rule: str, copy_files: bool, *, tagsList_encoding_error=False ) -> None: """ Moves all files of the series into either the "discard" or "success" folders, which both are periodically cleared. """ # Define the source and target folder. Use UUID as name for the target folder in the # discard or success directory to avoid collisions if destination == "DISCARD": destination_path = Path(config.mercure.discard_folder) / task_id else: destination_path = Path(config.mercure.success_folder) / task_id # Create subfolder in the discard directory and validate that is has been created try: destination_path.mkdir() except Exception: logger.error(f"Unable to create outgoing folder {destination_path}", task_id) # handle_error return if not destination_path.exists(): logger.error(f"Creating discard folder not possible {destination_path}", task_id) # handle_error return # Create lock file in destination folder (to prevent the cleaner module to work on the folder). Note that # the DICOM series in the incoming folder has already been locked in the parent function. lock_file = destination_path / mercure_names.LOCK try: lock = helper.FileLock(lock_file) except Exception: # Can't create lock file, so something must be seriously wrong logger.error(f"Unable to create lock file {destination_path}/{mercure_names.LOCK}", task_id) # handle_error return if destination == "DISCARD": if discard_rule: info_text = "Discard by rule " + discard_rule else: info_text = "Discard by default." if tagsList_encoding_error: info_text += (" Decoding error detected: some tags were not properly decoded," " likely due to a malformed DICOM file." " The expected rule may therefore not have been triggered.") logger.warning(info_text) monitor.send_task_event(monitor.task_event.DISCARD, task_id, len(file_list), discard_rule or "", info_text) if not push_files(task_id, series_UID, file_list, destination_path, copy_files): logger.error("Problem while moving completed files", task_id) # handle_error operation_name = "MOVE" if copy_files: operation_name = "COPY" monitor.send_task_event(monitor.task_event.MOVE, task_id, len(file_list), str(destination_path), operation_name) try: lock.free() except Exception: # Can't delete lock file, so something must be seriously wrong logger.error(f"Unable to remove lock file {lock_file}", task_id) # handle_error return
[docs]def push_series_studylevel( task_id: str, triggered_rules: Dict[str, Literal[True]], file_list: List[str], series_UID: str, tags_list: Dict[str, str], ) -> None: """ Prepares study-level routing for the current series. """ # Move series into individual study-level folder for every rule for current_rule in triggered_rules: if config.mercure.rules[current_rule].get("action_trigger", "series") == mercure_options.STUDY: first_series = False # Check if folder exists for buffering series until study completion. If not, create it study_UID = tags_list["StudyInstanceUID"] target_folder = Path(config.mercure.studies_folder) / (study_UID + mercure_defs.SEPARATOR + current_rule) if not target_folder.exists(): try: target_folder.mkdir() first_series = True except Exception: logger.error(f"Unable to create study folder {target_folder}", task_id) # handle_error continue lock_file = target_folder / mercure_names.LOCK try: lock = helper.FileLock(lock_file) except Exception: # Can't create lock file, so something must be seriously wrong logger.error(f"Unable to create lock file {lock_file}", task_id) # handle_error continue if first_series: # Create task file with information on complete criteria new_task_id = generate_task_id() result = create_study_task(new_task_id, target_folder, triggered_rules, current_rule, study_UID, tags_list) monitor.send_task_event(monitor.task_event.ASSIGN, task_id, len(file_list), current_rule, "Created study task") monitor.send_task_event(monitor.task_event.DELEGATE, task_id, len(file_list), new_task_id, current_rule) monitor.send_task_event(monitor.task_event.ASSIGN, new_task_id, len(file_list), task_id, "Added series to study") else: # Add data from latest series to task file result, new_task_id = update_study_task(task_id, target_folder, triggered_rules, current_rule, study_UID, tags_list) monitor.send_task_event(monitor.task_event.ASSIGN, task_id, len(file_list), current_rule, "Added to study task") monitor.send_task_event(monitor.task_event.DELEGATE, task_id, len(file_list), new_task_id, current_rule) monitor.send_task_event(monitor.task_event.ASSIGN, new_task_id, len(file_list), task_id, "Added series to study") if not result: logger.error("Problem assigning series to study", task_id) # Copy (or move) the files into the study folder push_files(task_id, series_UID, file_list, target_folder, (len(triggered_rules) > 1)) lock.free()
[docs]def push_series_serieslevel( task_id: str, triggered_rules: Dict[str, Literal[True]], file_list: List[str], series_UID: str, tags_list: Dict[str, str], ) -> None: """ Prepares all series-level routings for the current series. """ push_serieslevel_routing(task_id, triggered_rules, file_list, series_UID, tags_list) push_serieslevel_processing(task_id, triggered_rules, file_list, series_UID, tags_list) push_serieslevel_notification(task_id, triggered_rules, file_list, series_UID, tags_list)
[docs]def push_serieslevel_routing( task_id: str, triggered_rules: Dict[str, Literal[True]], file_list: List[str], series_UID: str, tags_list: Dict[str, str], ) -> None: selected_targets: Dict[str, List[str]] = {} # Collect the dispatch-only targets to avoid that a series is sent twice to the # same target due to multiple targets triggered (note: this only makes sense for routing-only # series tasks, as study-level rules might have different completion criteria and tasks involving # processing steps might create different results so that they cannot be pooled) for current_rule in triggered_rules: rule_definition = config.mercure.rules[current_rule] if ( rule_definition.get("action_trigger", "series") == mercure_options.SERIES and rule_definition.get("action") == mercure_actions.ROUTE ): targets = [] if rule_definition.get("target"): if isinstance(rule_definition.get("target"), str): # If the target is a string, only add it if it is not empty if rule_definition.get("target"): targets.append(rule_definition.get("target")) else: targets = rule_definition.get("target") for target in targets: if not selected_targets.get(target): selected_targets[target] = [current_rule] else: selected_targets[target].append(current_rule) trigger_serieslevel_notification(current_rule, tags_list, mercure_events.RECEIVED, task_id) push_serieslevel_outgoing(task_id, triggered_rules, file_list, series_UID, tags_list, selected_targets)
[docs]def push_serieslevel_processing( task_id: str, triggered_rules: Dict[str, Literal[True]], file_list: List[str], series_UID: str, tags_list: Dict[str, str], ) -> bool: # Rules with action "processing" or "processing & routing" need to be processed separately # (because the processing step can create varying results). # Thus, loop over all series-level rules that have triggered. for current_rule in triggered_rules: if config.mercure.rules[current_rule].get("action_trigger", mercure_options.SERIES) == mercure_options.SERIES: if (config.mercure.rules[current_rule].get("action", "") == mercure_actions.PROCESS) or ( config.mercure.rules[current_rule].get("action", "") == mercure_actions.BOTH ): # Determine if the files should be copied or moved. If only one rule triggered, files can # safely be moved, otherwise files will be moved and removed in the end copy_files = True if len(triggered_rules) == 1: copy_files = False new_task_id = generate_task_id() folder_name = config.mercure.processing_folder + "/" + new_task_id target_folder = Path(folder_name) # Create processing folder try: os.mkdir(folder_name) except Exception: logger.error(f"Unable to create outgoing folder {folder_name}", task_id) # handle_error return False if not Path(folder_name).exists(): logger.error(f"Creating folder not possible {folder_name}", task_id) # handle_error return False # Lock the case lock_file = Path(folder_name) / mercure_names.LOCK try: lock = helper.FileLock(lock_file) except Exception: # Can't create lock file, so something must be seriously wrong logger.error(f"Unable to create lock file {lock_file}", task_id) # handle_error return False monitor.send_task_event(monitor.task_event.DELEGATE, task_id, len(file_list), new_task_id, current_rule) # Generate task file with processing information if create_series_task( new_task_id, target_folder, triggered_rules, current_rule, series_UID, tags_list, "" ): monitor.send_register_task(new_task_id, series_UID, task_id) else: return False if not push_files(task_id, series_UID, file_list, target_folder, copy_files): logger.error( f"Unable to push files into processing folder {target_folder}", task_id ) # handle_error return False try: lock.free() except Exception: # Can't delete lock file, so something must be seriously wrong logger.error(f"Unable to remove lock file {lock_file}", task_id) # handle_error return False trigger_serieslevel_notification(current_rule, tags_list, mercure_events.RECEIVED, task_id) return True
[docs]def push_serieslevel_notification( task_id: str, triggered_rules: Dict[str, Literal[True]], file_list: List[str], series_UID: str, tags_list: Dict[str, str], ) -> bool: notification_rules_count = 0 for current_rule in triggered_rules: if config.mercure.rules[current_rule].get("action_trigger", mercure_options.SERIES) == mercure_options.SERIES: if config.mercure.rules[current_rule].get("action", "") == mercure_actions.NOTIFICATION: trigger_serieslevel_notification(current_rule, tags_list, mercure_events.RECEIVED, task_id) trigger_serieslevel_notification(current_rule, tags_list, mercure_events.COMPLETED, task_id) notification_rules_count += 1 # If the current rule is "notification-only" and this is the only rule that has been # triggered, then discard the files so that they end up in the discard folder. If more # than one rule has triggered, the parent function will remove the files from the incoming # folder. However, it multiple rules have triggered and all such rules are notifications, # make a copy of the files into the discard folder, so that the files can be recovered if notification_rules_count > 0: if (len(triggered_rules) == 1) or (len(triggered_rules) == notification_rules_count): push_series_complete(task_id, file_list, series_UID, "SUCCESS", "", len(triggered_rules) > 1) return True
[docs]def push_serieslevel_outgoing( task_id: str, triggered_rules: Dict[str, Literal[True]], file_list: List[str], series_UID: str, tags_list: Dict[str, str], selected_targets: Dict[str, List[str]], ) -> None: """ Move the DICOM files of the series to a separate subfolder for each target in the outgoing folder. """ source_folder = Path(config.mercure.incoming_folder) / series_UID # Determine if the files should be copied or moved. If only one rule triggered, files can # safely be moved, otherwise files will be moved and removed in the end move_operation = False if len(triggered_rules) == 1: move_operation = True for i, target in enumerate(selected_targets): if target not in config.mercure.targets: logger.error(f"Invalid target selected {target}", task_id) # handle_error # TODO: Better error handling! continue new_task_id = generate_task_id() folder_name = config.mercure.outgoing_folder + "/" + new_task_id target_folder = Path(folder_name) try: os.mkdir(folder_name) except Exception: logger.error(f"Unable to create outgoing folder {folder_name}", task_id) # handle_error return if not Path(folder_name).exists(): logger.error(f"Creating folder not possible {folder_name}", task_id) # handle_error return lock_file = Path(folder_name) / mercure_names.LOCK try: lock = helper.FileLock(lock_file) except Exception: logger.error(f"Unable to create lock file {lock_file}", task_id) # handle_error return # Collect the rules that triggered the dispatching to the current target target_rules: Dict[str, Literal[True]] = {} for rule in selected_targets[target]: target_rules[rule] = True # Generate task file with dispatch information if create_series_task(new_task_id, target_folder, target_rules, "", series_UID, tags_list, target): monitor.send_register_task(new_task_id, series_UID, task_id) else: continue monitor.send_task_event(monitor.task_event.DELEGATE, task_id, len(file_list), new_task_id, ", ".join(selected_targets[target])) operation: Callable is_operation_move = False if move_operation: # If there are more targets for one rule, then move the files only for the last target if i == len(selected_targets) - 1: operation = shutil.move is_operation_move = True else: operation = shutil.copy else: operation = shutil.copy for entry in file_list: try: operation(source_folder / (entry + mercure_names.DCM), target_folder / (entry + mercure_names.DCM)) operation(source_folder / (entry + mercure_names.TAGS), target_folder / (entry + mercure_names.TAGS)) except Exception: logger.error( # handle_error (f"Problem while pushing file to outgoing [{entry}]\n" f"Source folder {source_folder}\nTarget folder {target_folder}"), task_id, ) raise if is_operation_move: monitor.send_task_event(monitor.task_event.MOVE, task_id, len(file_list), str(target_folder), "Moved files") else: monitor.send_task_event(monitor.task_event.COPY, task_id, len(file_list), str(target_folder), "Copied files") try: lock.free() except Exception: # Can't delete lock file, so something must be seriously wrong logger.error(f"Unable to remove lock file {lock_file}", task_id) # handle_error return
[docs]def push_files(task_id: str, series_uid: str, file_list: List[str], target_folder: Path, copy_files: bool) -> bool: """ Copies or moves the given files to the target path. If copy_files is True, files are copied, otherwise moved. Note that this function does not create a lock file (this needs to be done by the calling function). """ operation: Callable if copy_files is False: operation = shutil.move else: operation = shutil.copy source_folder = Path(config.mercure.incoming_folder) / series_uid for entry in file_list: try: operation(source_folder / (entry + mercure_names.DCM), target_folder / (entry + mercure_names.DCM)) operation(source_folder / (entry + mercure_names.TAGS), target_folder / (entry + mercure_names.TAGS)) logger.debug(f"Pushed {source_folder / (entry+mercure_names.DCM)}") except Exception: logger.error( # handle_error f"Problem while pushing file to outgoing {entry}\n " f"Source folder {source_folder}\nTarget folder {target_folder}", task_id, ) return False if copy_files is False: monitor.send_task_event(monitor.task_event.MOVE, task_id, len(file_list), str(target_folder), "Moved files") else: monitor.send_task_event(monitor.task_event.COPY, task_id, len(file_list), str(target_folder), "Copied files") return True
[docs]def remove_series(task_id: str, file_list: List[str], series_UID: str) -> bool: """ Deletes the given files from the incoming folder. """ source_folder = Path(config.mercure.incoming_folder) / series_UID for entry in file_list: try: (source_folder / (entry + mercure_names.TAGS)).unlink() (source_folder / (entry + mercure_names.DCM)).unlink() except Exception: logger.error(f"Error while removing file {entry}", task_id) # handle_error return False monitor.send_task_event(monitor.task_event.REMOVE, task_id, len(file_list), "", "Removed duplicate files") return True
[docs]def route_error_files() -> None: """ Looks for error files, moves these files and the corresponding DICOM files to the error folder, and sends an alert to the bookkeeper instance. """ error_files_found = 0 errors_folder = Path(config.mercure.incoming_folder) / "error" entries: List[os.DirEntry] = [] if errors_folder.is_dir(): entries += list(os.scandir(errors_folder)) entries += list(os.scandir(config.mercure.incoming_folder)) for entry in entries: if not entry.name.endswith(mercure_names.ERROR) or entry.is_dir(): continue # Check if a lock file exists. If not, create one. lock_file = str(entry.path) + mercure_names.LOCK if os.path.exists(lock_file): continue try: lock = helper.FileLock(Path(lock_file)) except Exception: continue logger.error(f"Found incoming error file {entry.name}") error_files_found += 1 move_error_to = config.mercure.error_folder + "/" + entry.name logger.error(f"Moving {entry.name} to {move_error_to}") shutil.move( entry.path, move_error_to, ) dicom_file = Path(entry.path).with_suffix(".dcm") dicom_file_b = Path(entry.path).with_suffix("") for f in [dicom_file, dicom_file_b]: if f.exists(): move_to = config.mercure.error_folder + "/" + f.name logger.info(f"Moving {f.name} to {move_to}") shutil.move( str(f), move_to, ) lock.free() if error_files_found > 0: monitor.send_event( monitor.m_events.PROCESSING, monitor.severity.ERROR, f"Error parsing {error_files_found} incoming files" ) return
[docs]def trigger_serieslevel_notification( current_rule: str, tags_list: Dict[str, str], event: mercure_events, task_id: str ) -> None: notification.trigger_notification_for_rule( current_rule, task_id, event, tags_list=tags_list, )