Source code for dispatcher

"""
dispatcher.py
=============
The dispatcher service of mercure that executes the DICOM transfer to the different targets.
"""

# Standard python includes
import asyncio
import logging
import os
import signal
import sys
from pathlib import Path
import daiquiri
import graphyte
import hupper

# App-specific includes
import common.config as config
import common.helper as helper
import common.monitor as monitor
from common.constants import mercure_names
from dispatch.status import is_ready_for_sending
from dispatch.send import execute
from common.constants import mercure_defs
import common.influxdb
import common.notification as notification


# Create local logger instance
logger = config.get_logger()
main_loop = None  # type: helper.AsyncTimer # type: ignore


dispatcher_lockfile = None
dispatcher_is_locked = False


[docs]async def terminate_process(signalNumber, frame) -> None: """Triggers the shutdown of the service.""" helper.g_log("events.shutdown", 1) logger.info("Shutdown requested") monitor.send_event(monitor.m_events.SHUTDOWN_REQUEST, monitor.severity.INFO) # Note: main_loop can be read here because it has been declared as global variable if "main_loop" in globals() and main_loop.is_running: main_loop.stop() helper.trigger_terminate()
[docs]def dispatch() -> None: global dispatcher_lockfile global dispatcher_is_locked """Main entry function.""" if helper.is_terminated(): return helper.g_log("events.run", 1) try: config.read_config() except Exception: logger.exception("Unable to read configuration. Skipping processing.") monitor.send_event( monitor.m_events.CONFIG_UPDATE, monitor.severity.WARNING, "Unable to read configuration (possibly locked)", ) return success_folder = Path(config.mercure.success_folder) error_folder = Path(config.mercure.error_folder) retry_max = config.mercure.retry_max retry_delay = config.mercure.retry_delay try: # Obtain a sorted folder list, so that the oldest DICOMs get dispatched first items = sorted(Path(config.mercure.outgoing_folder).iterdir(), key=os.path.getmtime) for entry in items: # First, check if dispatching might have been suspended via the UI if dispatcher_lockfile and dispatcher_lockfile.exists(): if not dispatcher_is_locked: dispatcher_is_locked = True logger.info(f"Dispatching halted") break else: if dispatcher_is_locked: dispatcher_is_locked = False logger.info("Dispatching resumed") # Now process the folders that are ready for dispatching if entry.is_dir() and is_ready_for_sending(entry): execute(Path(entry), success_folder, error_folder, retry_max, retry_delay) # If termination is requested, stop processing series after the # active one has been completed if helper.is_terminated(): break except: return
[docs]def exit_dispatcher(args) -> None: """Stop the asyncio event loop.""" helper.loop.call_soon_threadsafe(helper.loop.stop)
[docs]def main(args=sys.argv[1:]) -> None: global dispatcher_lockfile if "--reload" in args or os.getenv("MERCURE_ENV", "PROD").lower() == "dev": # start_reloader will only return in a monitored subprocess reloader = hupper.start_reloader("dispatcher.main") logger.info("") logger.info(f"mercure DICOM Dispatcher ver {mercure_defs.VERSION}") logger.info("--------------------------------------------") logger.info("") # Register system signals to be caught signals = (signal.SIGTERM, signal.SIGINT) for s in signals: helper.loop.add_signal_handler(s, lambda s=s: asyncio.create_task(terminate_process(s, helper.loop))) instance_name = "main" if len(sys.argv) > 1: instance_name = sys.argv[1] try: config.read_config() except Exception: logger.exception("Cannot start service. Going down.") sys.exit(1) appliance_name = config.mercure.appliance_name logger.info(f"Appliance name = {appliance_name}") logger.info(f"Instance name = {instance_name}") logger.info(f"Instance PID = {os.getpid()}") logger.info(sys.version) notification.setup() monitor.configure("dispatcher", instance_name, config.mercure.bookkeeper) monitor.send_event(monitor.m_events.BOOT, monitor.severity.INFO, f"PID = {os.getpid()}") if len(config.mercure.graphite_ip) > 0: logging.info(f"Sending events to graphite server: {config.mercure.graphite_ip}") graphite_prefix = "mercure." + appliance_name + ".dispatcher." + instance_name graphyte.init( config.mercure.graphite_ip, config.mercure.graphite_port, prefix=graphite_prefix, ) if len(config.mercure.influxdb_host) > 0: logger.info(f"Sending events to influxdb server: {config.mercure.influxdb_host}") common.influxdb.init( config.mercure.influxdb_host, config.mercure.influxdb_token, config.mercure.influxdb_org, config.mercure.influxdb_bucket, "mercure." + appliance_name + ".dispatcher." + instance_name ) logger.info(f"Dispatching folder: {config.mercure.outgoing_folder}") dispatcher_lockfile = Path(config.mercure.outgoing_folder + "/" + mercure_names.HALT) global main_loop main_loop = helper.AsyncTimer(config.mercure.dispatcher_scan_interval, dispatch) helper.g_log("events.boot", 1) try: # Start the asyncio event loop for asynchronous function calls main_loop.run_until_complete(helper.loop) # Process will exit here once the asyncio loop has been stopped monitor.send_event(monitor.m_events.SHUTDOWN, monitor.severity.INFO) except Exception as e: # Process will exit here once the asyncio loop has been stopped monitor.send_event(monitor.m_events.SHUTDOWN, monitor.severity.ERROR, str(e)) finally: # Finish all asyncio tasks that might be still pending remaining_tasks = helper.asyncio.all_tasks(helper.loop) # type: ignore[attr-defined] if remaining_tasks: helper.loop.run_until_complete(helper.asyncio.gather(*remaining_tasks)) logging.info("Going down now")
if __name__ == "__main__": main()