Source code for routing.route_series

"""
route_series.py
===============
Provides functions for routing/processing of series. For study-level processing, series will be pushed into study folders.
"""

# Standard python includes
import os
from pathlib import Path
from typing import Any, Callable, Dict, List, Tuple, Union
from typing_extensions import Literal
import uuid
import json
import shutil
import daiquiri

# App-specific includes
import common.config as config
import common.rule_evaluation as rule_evaluation
import common.monitor as monitor
import common.helper as helper
import common.notification as notification
import common.log_helpers as log_helpers
from common.types import Rule
from common.constants import (
    mercure_defs,
    mercure_names,
    mercure_actions,
    mercure_rule,
    mercure_options,
    mercure_events,
)
from routing.generate_taskfile import create_series_task, create_study_task, update_study_task
from routing.common import generate_task_id

# Create local logger instance
logger = config.get_logger()


@log_helpers.clear_task_decorator
def route_series(task_id: str, series_UID: str) -> None:
    """
    Processes the series with the given series UID from the incoming folder.
    """
    logger.setTask(task_id)
    monitor.send_register_task(task_id, series_UID)

    lock_file = Path(config.mercure.incoming_folder + "/" + str(series_UID) + mercure_names.LOCK)
    if lock_file.exists():
        # Series is locked, so another instance might be working on it
        return

    # Create lock file in the incoming folder and prevent other instances from working on this series
    try:
        lock = helper.FileLock(lock_file)
    except FileExistsError:
        # Series likely already processed by other instance of router
        return
    except:
        # Can't create lock file, so something must be seriously wrong
        logger.error(f"Unable to create lock file {lock_file}", task_id)  # handle_error
        return

    logger.info(f"Processing series {series_UID}")
    fileList = []
    seriesPrefix = series_UID + mercure_defs.SEPARATOR

    # Collect all files belonging to the series
    for entry in os.scandir(config.mercure.incoming_folder):
        if entry.name.endswith(mercure_names.TAGS) and entry.name.startswith(seriesPrefix) and not entry.is_dir():
            stemName = entry.name[:-5]
            fileList.append(stemName)

    logger.info("DICOM files found: " + str(len(fileList)))
    if not len(fileList):
        logger.error(f"No tags files found for series {series_UID}", task_id)  # handle_error
        return

    # Use the tags file from the first slice for evaluating the routing rules
    tagsMasterFile = Path(config.mercure.incoming_folder + "/" + fileList[0] + mercure_names.TAGS)
    if not tagsMasterFile.exists():
        logger.error(f"Missing file! {tagsMasterFile.name}", task_id)  # handle_error
        return

    tagsList_encoding_error = False
    try:
        tagsList: Dict[str, str] = {}
        try:
            with open(tagsMasterFile, "r", encoding="utf-8", errors="strict") as json_file:
                tagsList = json.load(json_file)
        except UnicodeDecodeError:
            with open(tagsMasterFile, "r", encoding="utf-8", errors="surrogateescape") as json_file:
                tagsList = json.load(json_file)
                tagsList_encoding_error = True

    except Exception:
        logger.exception(f"Invalid tag for series {series_UID}", task_id)  # handle_error
        return

    monitor.send_register_series(tagsList)

    # Now test the routing rules and evaluate which rules have been triggered. If one of the triggered
    # rules enforces discarding, discard_series will be True.
    discard_series = ""
    triggered_rules, discard_series = get_triggered_rules(task_id, tagsList)

    monitor.send_task_event(
        monitor.task_event.REGISTER, task_id, len(fileList), ", ".join(triggered_rules), "Registered series"
    )

    if (len(triggered_rules) == 0) or (discard_series):
        # If no routing rule has triggered or discarding has been enforced, discard the series
        push_series_complete(task_id, fileList, series_UID, "DISCARD", discard_series, False, tagsList_encoding_error = tagsList_encoding_error)
    else:
        # File handling strategy: If only one triggered rule, move files (faster than copying). If multiple rules, copy files
        push_series_studylevel(task_id, triggered_rules, fileList, series_UID, tagsList)
        push_series_serieslevel(task_id, triggered_rules, fileList, series_UID, tagsList)

        # If more than one rule has triggered, the series files need to be removed
        if len(triggered_rules) > 1:
            remove_series(task_id, fileList)

    try:
        lock.free()
    except Exception:
        logger.error(f"Unable to remove lock file {lock_file}", task_id)  # handle_error
        return


[docs]def get_triggered_rules( task_id: str, tagList: Dict[str, str] ) -> Tuple[Dict[str, Literal[True]], Union[Any, Literal[""]]]: """ Evaluates the routing rules and returns a list with triggered rules. """ triggered_rules: Dict[str, Literal[True]] = {} discard_rule = "" fallback_rule = "" # Iterate over all defined processing rules for current_rule in config.mercure.rules: try: rule: Rule = config.mercure.rules[current_rule] # Check if the current rule has been disabled if rule.disabled == True: continue # If the current rule is flagged as fallback rule, remember the name (to avoid having to iterate over the rules again) if rule.fallback == True: fallback_rule = current_rule # Check if the current rule is triggered for the provided tag set if rule_evaluation.parse_rule(rule.get("rule", "False"), tagList)[0]: triggered_rules[current_rule] = True if rule.get(mercure_rule.ACTION, "") == mercure_actions.DISCARD: discard_rule = current_rule # If the triggered rule's action is to discard, stop further iteration over the rules break except Exception as e: logger.error(f"Invalid rule found: {current_rule}", task_id) # handle_error continue # If no rule has triggered but a fallback rule exists, then apply this rule if (len(triggered_rules) == 0) and (fallback_rule): triggered_rules[fallback_rule] = True if config.mercure.rules[fallback_rule].get(mercure_rule.ACTION, "") == mercure_actions.DISCARD: discard_rule = fallback_rule logger.info("Triggered rules:") logger.info(triggered_rules) return triggered_rules, discard_rule
[docs]def push_series_complete( task_id: str, file_list: List[str], series_UID: str, destination: str, discard_rule: str, copy_files: bool, *, tagsList_encoding_error=False ) -> None: """ Moves all files of the series into either the "discard" or "success" folders, which both are periodically cleared. """ # Define the source and target folder. Use UUID as name for the target folder in the # discard or success directory to avoid collisions if destination == "DISCARD": destination_path = config.mercure.discard_folder + "/" + task_id else: destination_path = config.mercure.success_folder + "/" + task_id # Create subfolder in the discard directory and validate that is has been created try: os.mkdir(destination_path) except Exception: logger.error(f"Unable to create outgoing folder {destination_path}", task_id) # handle_error return if not Path(destination_path).exists(): logger.error(f"Creating discard folder not possible {destination_path}", task_id) # handle_error return # Create lock file in destination folder (to prevent the cleaner module to work on the folder). Note that # the DICOM series in the incoming folder has already been locked in the parent function. lock_file = Path(destination_path) / mercure_names.LOCK try: lock = helper.FileLock(lock_file) except Exception: # Can't create lock file, so something must be seriously wrong logger.error(f"Unable to create lock file {destination_path}/{mercure_names.LOCK}", task_id) # handle_error return if destination == "DISCARD": if discard_rule: info_text = "Discard by rule " + discard_rule else: info_text = "Discard by default." if tagsList_encoding_error: info_text += " Decoding error detected: some tags were not properly decoded, likely due to a malformed DICOM file. The expected rule may therefore not have been triggered." logger.warning(info_text) monitor.send_task_event(monitor.task_event.DISCARD, task_id, len(file_list), discard_rule or "", info_text) if not push_files(task_id, file_list, destination_path, copy_files): logger.error(f"Problem while moving completed files", task_id) # handle_error operation_name = "MOVE" if copy_files: operation_name = "COPY" monitor.send_task_event(monitor.task_event.MOVE, task_id, len(file_list), destination_path, operation_name) try: lock.free() except Exception: # Can't delete lock file, so something must be seriously wrong logger.error(f"Unable to remove lock file {lock_file}", task_id) # handle_error return
[docs]def push_series_studylevel( task_id: str, triggered_rules: Dict[str, Literal[True]], file_list: List[str], series_UID: str, tags_list: Dict[str, str], ) -> None: """ Prepeares study-level routing for the current series. """ # Move series into individual study-level folder for every rule for current_rule in triggered_rules: if config.mercure.rules[current_rule].get("action_trigger", "series") == mercure_options.STUDY: first_series = False # Check if folder exists for buffering series until study completion. If not, create it study_UID = tags_list["StudyInstanceUID"] folder_name = config.mercure.studies_folder + "/" + study_UID + mercure_defs.SEPARATOR + current_rule target_folder = folder_name + "/" if not os.path.exists(folder_name): try: os.mkdir(folder_name) first_series = True except Exception: logger.error(f"Unable to create study folder {folder_name}", task_id) # handle_error continue lock_file = Path(folder_name) / mercure_names.LOCK try: lock = helper.FileLock(lock_file) except: # Can't create lock file, so something must be seriously wrong logger.error(f"Unable to create lock file {lock_file}", task_id) # handle_error continue if first_series: # Create task file with information on complete criteria new_task_id = generate_task_id() result = create_study_task(new_task_id, target_folder, triggered_rules, current_rule, study_UID, tags_list) monitor.send_task_event(monitor.task_event.ASSIGN, task_id, len(file_list), current_rule, "Created study task") monitor.send_task_event(monitor.task_event.DELEGATE, task_id, len(file_list), new_task_id, current_rule) monitor.send_task_event(monitor.task_event.ASSIGN, new_task_id, len(file_list), task_id, "Added series to study") else: # Add data from latest series to task file result, new_task_id = update_study_task(task_id, target_folder, triggered_rules, current_rule, study_UID, tags_list) monitor.send_task_event(monitor.task_event.ASSIGN, task_id, len(file_list), current_rule, "Added to study task") monitor.send_task_event(monitor.task_event.DELEGATE, task_id, len(file_list), new_task_id, current_rule) monitor.send_task_event(monitor.task_event.ASSIGN, new_task_id, len(file_list), task_id, "Added series to study") if not result: logger.error(f"Problem assigning series to study ", task_id) # Copy (or move) the files into the study folder push_files(task_id, file_list, folder_name, (len(triggered_rules) > 1)) lock.free()
[docs]def push_series_serieslevel( task_id: str, triggered_rules: Dict[str, Literal[True]], file_list: List[str], series_UID: str, tags_list: Dict[str, str], ) -> None: """ Prepeares all series-level routings for the current series. """ push_serieslevel_routing(task_id, triggered_rules, file_list, series_UID, tags_list) push_serieslevel_processing(task_id, triggered_rules, file_list, series_UID, tags_list) push_serieslevel_notification(task_id, triggered_rules, file_list, series_UID, tags_list)
[docs]def push_serieslevel_routing( task_id: str, triggered_rules: Dict[str, Literal[True]], file_list: List[str], series_UID: str, tags_list: Dict[str, str], ) -> None: selected_targets: Dict[str, List[str]] = {} # Collect the dispatch-only targets to avoid that a series is sent twice to the # same target due to multiple targets triggered (note: this only makes sense for routing-only # series tasks, as study-level rules might have different completion criteria and tasks involving # processing steps might create different results so that they cannot be pooled) for current_rule in triggered_rules: rule_definition = config.mercure.rules[current_rule] if ( rule_definition.get("action_trigger", "series") == mercure_options.SERIES and rule_definition.get("action") == mercure_actions.ROUTE ): target = rule_definition.get("target") if target: if not selected_targets.get(target): selected_targets[target] = [current_rule] else: selected_targets[target].append(current_rule) trigger_serieslevel_notification(current_rule, tags_list, mercure_events.RECEIVED, task_id) push_serieslevel_outgoing(task_id, triggered_rules, file_list, series_UID, tags_list, selected_targets)
[docs]def push_serieslevel_processing( task_id: str, triggered_rules: Dict[str, Literal[True]], file_list: List[str], series_UID: str, tags_list: Dict[str, str], ) -> bool: # Rules with action "processing" or "processing & routing" need to be processed separately (because the processing step can create varying results). # Thus, loop over all series-level rules that have triggered. for current_rule in triggered_rules: if config.mercure.rules[current_rule].get("action_trigger", mercure_options.SERIES) == mercure_options.SERIES: if (config.mercure.rules[current_rule].get("action", "") == mercure_actions.PROCESS) or ( config.mercure.rules[current_rule].get("action", "") == mercure_actions.BOTH ): # Determine if the files should be copied or moved. If only one rule triggered, files can # safely be moved, otherwise files will be moved and removed in the end copy_files = True if len(triggered_rules) == 1: copy_files = False new_task_id = generate_task_id() folder_name = config.mercure.processing_folder + "/" + new_task_id target_folder = folder_name + "/" # Create processing folder try: os.mkdir(folder_name) except Exception: logger.error(f"Unable to create outgoing folder {folder_name}", task_id) # handle_error return False if not Path(folder_name).exists(): logger.error(f"Creating folder not possible {folder_name}", task_id) # handle_error return False # Lock the case lock_file = Path(folder_name) / mercure_names.LOCK try: lock = helper.FileLock(lock_file) except: # Can't create lock file, so something must be seriously wrong logger.error(f"Unable to create lock file {lock_file}", task_id) # handle_error return False monitor.send_task_event(monitor.task_event.DELEGATE, task_id, len(file_list), new_task_id, current_rule) # Generate task file with processing information if create_series_task( new_task_id, target_folder, triggered_rules, current_rule, series_UID, tags_list, "" ): monitor.send_register_task(new_task_id, series_UID, task_id) else: return False if not push_files(task_id, file_list, target_folder, copy_files): logger.error( f"Unable to push files into processing folder {target_folder}", task_id ) # handle_error return False try: lock.free() except: # Can't delete lock file, so something must be seriously wrong logger.error(f"Unable to remove lock file {lock_file}", task_id) # handle_error return False trigger_serieslevel_notification(current_rule, tags_list, mercure_events.RECEIVED, task_id) return True
[docs]def push_serieslevel_notification( task_id: str, triggered_rules: Dict[str, Literal[True]], file_list: List[str], series_UID: str, tags_list: Dict[str, str], ) -> bool: notification_rules_count = 0 for current_rule in triggered_rules: if config.mercure.rules[current_rule].get("action_trigger", mercure_options.SERIES) == mercure_options.SERIES: if config.mercure.rules[current_rule].get("action", "") == mercure_actions.NOTIFICATION: trigger_serieslevel_notification(current_rule, tags_list, mercure_events.RECEIVED, task_id) trigger_serieslevel_notification(current_rule, tags_list, mercure_events.COMPLETED, task_id) notification_rules_count += 1 # If the current rule is "notification-only" and this is the only rule that has been # triggered, then discard the files so that they end up in the discard folder. If more # than one rule has triggered, the parent function will remove the files from the incoming # folder. However, it multiple rules have triggered and all such rules are notifications, # make a copy of the files into the discard folder, so that the files can be recovered if notification_rules_count > 0: if (len(triggered_rules) == 1) or (len(triggered_rules) == notification_rules_count): push_series_complete(task_id, file_list, series_UID, "SUCCESS", "", len(triggered_rules) > 1) return True
[docs]def push_serieslevel_outgoing( task_id: str, triggered_rules: Dict[str, Literal[True]], file_list: List[str], series_UID: str, tags_list: Dict[str, str], selected_targets: Dict[str, List[str]], ) -> None: """ Move the DICOM files of the series to a separate subfolder for each target in the outgoing folder. """ source_folder = config.mercure.incoming_folder + "/" # Determine if the files should be copied or moved. If only one rule triggered, files can # safely be moved, otherwise files will be moved and removed in the end move_operation = False if len(triggered_rules) == 1: move_operation = True for target in selected_targets: if not target in config.mercure.targets: logger.error(f"Invalid target selected {target}", task_id) # handle_error continue new_task_id = generate_task_id() folder_name = config.mercure.outgoing_folder + "/" + new_task_id target_folder = folder_name + "/" try: os.mkdir(folder_name) except Exception: logger.error(f"Unable to create outgoing folder {folder_name}", task_id) # handle_error return if not Path(folder_name).exists(): logger.error(f"Creating folder not possible {folder_name}", task_id) # handle_error return lock_file = Path(folder_name) / mercure_names.LOCK try: lock = helper.FileLock(lock_file) except: logger.error(f"Unable to create lock file {lock_file}", task_id) # handle_error return # Collect the rules that triggered the dispatching to the current target target_rules: Dict[str, Literal[True]] = {} for rule in selected_targets[target]: target_rules[rule] = True # Generate task file with dispatch information if create_series_task(new_task_id, target_folder, target_rules, "", series_UID, tags_list, target): monitor.send_register_task(new_task_id, series_UID, task_id) else: continue monitor.send_task_event(monitor.task_event.DELEGATE, task_id, len(file_list), new_task_id, ", ".join(selected_targets[target])) operation: Callable if move_operation: operation = shutil.move else: operation = shutil.copy for entry in file_list: try: operation(source_folder + entry + mercure_names.DCM, target_folder + entry + mercure_names.DCM) operation(source_folder + entry + mercure_names.TAGS, target_folder + entry + mercure_names.TAGS) except Exception: logger.error( # handle_error f"Problem while pushing file to outgoing {entry}\nSource folder {source_folder}\nTarget folder {target_folder}", task_id, ) if move_operation: monitor.send_task_event(monitor.task_event.MOVE, task_id, len(file_list), target_folder, "Moved files") else: monitor.send_task_event(monitor.task_event.COPY, task_id, len(file_list), target_folder, "Copied files") try: lock.free() except Exception: # Can't delete lock file, so something must be seriously wrong logger.error(f"Unable to remove lock file {lock_file}", task_id) # handle_error return
[docs]def push_files(task_id: str, file_list: List[str], target_path: str, copy_files: bool) -> bool: """ Copies or moves the given files to the target path. If copy_files is True, files are copied, otherwise moved. Note that this function does not create a lock file (this needs to be done by the calling function). """ operation: Callable if copy_files == False: operation = shutil.move else: operation = shutil.copy source_folder = config.mercure.incoming_folder + "/" target_folder = target_path + "/" for entry in file_list: try: operation(source_folder + entry + mercure_names.DCM, target_folder + entry + mercure_names.DCM) operation(source_folder + entry + mercure_names.TAGS, target_folder + entry + mercure_names.TAGS) except Exception: logger.error( # handle_error f"Problem while pushing file to outgoing {entry}\n Source folder {source_folder}\nTarget folder {target_folder}", task_id, ) return False if copy_files == False: monitor.send_task_event(monitor.task_event.MOVE, task_id, len(file_list), target_path, "Moved files") else: monitor.send_task_event(monitor.task_event.COPY, task_id, len(file_list), target_path, "Copied files") return True
[docs]def remove_series(task_id: str, file_list: List[str]) -> bool: """ Deletes the given files from the incoming folder. """ source_folder = config.mercure.incoming_folder + "/" for entry in file_list: try: os.remove(source_folder + entry + mercure_names.TAGS) os.remove(source_folder + entry + mercure_names.DCM) except Exception: logger.error(f"Error while removing file {entry}", task_id) # handle_error return False monitor.send_task_event(monitor.task_event.REMOVE, task_id, len(file_list), "", "Removed duplicate files") return True
[docs]def route_error_files() -> None: """ Looks for error files, moves these files and the corresponding DICOM files to the error folder, and sends an alert to the bookkeeper instance. """ error_files_found = 0 for entry in os.scandir(config.mercure.incoming_folder): if entry.name.endswith(mercure_names.ERROR) and not entry.is_dir(): # Check if a lock file exists. If not, create one. lock_file = Path(config.mercure.incoming_folder + "/" + entry.name + mercure_names.LOCK) if lock_file.exists(): continue try: lock = helper.FileLock(lock_file) except: continue logger.error(f"Found incoming error file {entry.name}") error_files_found += 1 shutil.move( config.mercure.incoming_folder + "/" + entry.name, config.mercure.error_folder + "/" + entry.name, ) dicom_filename = entry.name[:-6] dicom_file = Path(config.mercure.incoming_folder + "/" + dicom_filename) if dicom_file.exists(): shutil.move( config.mercure.incoming_folder + "/" + dicom_filename, config.mercure.error_folder + "/" + dicom_filename, ) lock.free() if error_files_found > 0: monitor.send_event( monitor.m_events.PROCESSING, monitor.severity.ERROR, f"Error parsing {error_files_found} incoming files" ) return
[docs]def trigger_serieslevel_notification( current_rule: str, tags_list: Dict[str, str], event: mercure_events, task_id: str ) -> None: notification.trigger_notification_for_rule( current_rule, task_id, event, tags_list=tags_list, )