Source code for dispatch.dispatcher

"""
dispatcher.py
=============
The dispatcher service of mercure that executes the DICOM transfer to the different targets.
"""

# Standard python includes
import asyncio
import json
import logging
import os
import signal
import sys
from datetime import datetime
from pathlib import Path
from typing import Literal

# App-specific includes
import common.config as config
import common.helper as helper
import common.influxdb
import common.monitor as monitor
import common.notification as notification
import graphyte
import hupper
from common.constants import mercure_defs, mercure_names
from common.types import Task
from dispatch.send import execute
from dispatch.status import is_ready_for_sending

# Create local logger instance
logger = config.get_logger()
main_loop = None  # type: helper.AsyncTimer  # type: ignore


dispatcher_lockfile = None
dispatcher_is_locked = False


[docs]async def terminate_process(signalNumber, frame) -> None: """Triggers the shutdown of the service.""" helper.g_log("events.shutdown", 1) logger.info("Shutdown requested") monitor.send_event(monitor.m_events.SHUTDOWN_REQUEST, monitor.severity.INFO) # Note: main_loop can be read here because it has been declared as global variable if "main_loop" in globals() and main_loop.is_running: main_loop.stop() helper.trigger_terminate()
[docs]def dispatch() -> None: global dispatcher_lockfile global dispatcher_is_locked """Main entry function.""" if helper.is_terminated(): return helper.g_log("events.run", 1) try: config.read_config() except Exception: logger.exception("Unable to read configuration. Skipping processing.") monitor.send_event( monitor.m_events.CONFIG_UPDATE, monitor.severity.WARNING, "Unable to read configuration (possibly locked)", ) return success_folder = Path(config.mercure.success_folder) error_folder = Path(config.mercure.error_folder) retry_max = config.mercure.retry_max retry_delay = config.mercure.retry_delay def get_priority(task_folder: Path) -> Literal['normal', 'urgent', 'offpeak']: try: task_instance = Task.from_file(task_folder / mercure_names.TASKFILE) applied_rule = config.mercure.rules.get(task_instance.info.get("applied_rule")) if applied_rule is not None: return applied_rule.priority triggered_rule_names = task_instance.info.get("triggered_rules") # replace/return the priority if a rule with higher priority is found for rule_name in triggered_rule_names: current_priority = config.mercure.get("rules", {}).get(rule_name, {}).get("priority") if current_priority == "urgent": return "urgent" elif current_priority == "normal": return "normal" elif current_priority == "offpeak" and priority == "": return "offpeak" else: return "normal" except Exception: logger.exception("Error while checking priority") return "normal" try: items = Path(config.mercure.outgoing_folder).iterdir() is_offpeak = helper._is_offpeak(config.mercure.offpeak_start, config.mercure.offpeak_end, datetime.now().time()) # Get the folders that are ready for dispatching valid_items = [item for item in items if item.is_dir() and is_ready_for_sending(item)] urgent_items, normal_items = [], [] for item in valid_items: priority = get_priority(item) if priority == "urgent": urgent_items.append(item) elif priority == "normal" or (priority == "offpeak" and is_offpeak): normal_items.append(item) sorted_urgent_items = sorted(urgent_items, key=os.path.getmtime) sorted_normal_items = sorted(normal_items, key=os.path.getmtime) counter = 0 while sorted_urgent_items or sorted_normal_items: if (counter % 3) < 2 and sorted_urgent_items: entry = sorted_urgent_items.pop(0) else: entry = sorted_normal_items.pop(0) # First, check if dispatching might have been suspended via the UI if dispatcher_lockfile and dispatcher_lockfile.exists(): if not dispatcher_is_locked: dispatcher_is_locked = True logger.info("Dispatching halted") break else: if dispatcher_is_locked: dispatcher_is_locked = False logger.info("Dispatching resumed") execute(Path(entry), success_folder, error_folder, retry_max, retry_delay) # If termination is requested, stop processing series after the # active one has been completed if helper.is_terminated(): break counter += 1 except Exception: logger.exception("Error while dispatching") return
[docs]def exit_dispatcher(args) -> None: """Stop the asyncio event loop.""" helper.loop.call_soon_threadsafe(helper.loop.stop)
[docs]def main(args=sys.argv[1:]) -> None: global dispatcher_lockfile if "--reload" in args or os.getenv("MERCURE_ENV", "PROD").lower() == "dev": # start_reloader will only return in a monitored subprocess hupper.start_reloader("dispatcher.main") logger.info("") logger.info(f"mercure DICOM Dispatcher ver {mercure_defs.VERSION}") logger.info("--------------------------------------------") logger.info("") # Register system signals to be caught signals = (signal.SIGTERM, signal.SIGINT) for s in signals: helper.loop.add_signal_handler(s, lambda s=s: asyncio.create_task(terminate_process(s, helper.loop))) instance_name = "main" if len(sys.argv) > 1: instance_name = sys.argv[1] try: config.read_config() except Exception: logger.exception("Cannot start service. Going down.") sys.exit(1) appliance_name = config.mercure.appliance_name logger.info(f"Appliance name = {appliance_name}") logger.info(f"Instance name = {instance_name}") logger.info(f"Instance PID = {os.getpid()}") logger.info(sys.version) notification.setup() monitor.configure("dispatcher", instance_name, config.mercure.bookkeeper) monitor.send_event(monitor.m_events.BOOT, monitor.severity.INFO, f"PID = {os.getpid()}") if len(config.mercure.graphite_ip) > 0: logging.info(f"Sending events to graphite server: {config.mercure.graphite_ip}") graphite_prefix = "mercure." + appliance_name + ".dispatcher." + instance_name graphyte.init( config.mercure.graphite_ip, config.mercure.graphite_port, prefix=graphite_prefix, ) if len(config.mercure.influxdb_host) > 0: logger.info(f"Sending events to influxdb server: {config.mercure.influxdb_host}") common.influxdb.init( config.mercure.influxdb_host, config.mercure.influxdb_token, config.mercure.influxdb_org, config.mercure.influxdb_bucket, "mercure." + appliance_name + ".dispatcher." + instance_name ) logger.info(f"Dispatching folder: {config.mercure.outgoing_folder}") dispatcher_lockfile = Path(config.mercure.outgoing_folder + "/" + mercure_names.HALT) global main_loop main_loop = helper.AsyncTimer(config.mercure.dispatcher_scan_interval, dispatch) helper.g_log("events.boot", 1) try: # Start the asyncio event loop for asynchronous function calls main_loop.run_until_complete(helper.loop) # Process will exit here once the asyncio loop has been stopped monitor.send_event(monitor.m_events.SHUTDOWN, monitor.severity.INFO) except Exception as e: # Process will exit here once the asyncio loop has been stopped monitor.send_event(monitor.m_events.SHUTDOWN, monitor.severity.ERROR, str(e)) finally: # Finish all asyncio tasks that might be still pending remaining_tasks = helper.asyncio.all_tasks(helper.loop) # type: ignore[attr-defined] if remaining_tasks: helper.loop.run_until_complete(helper.asyncio.gather(*remaining_tasks)) logging.info("Going down now")