Source code for processor

"""
processor.py
============
mercure' processor that executes processing modules on DICOM series filtered for processing. 
"""

# Standard python includes
import base64
import asyncio
import shutil
import signal
import os
import sys
import json
from typing import Dict
import threading
import graphyte
import daiquiri
import nomad
from pathlib import Path
import hupper

# App-specific includes
import common.helper as helper
import common.config as config
import common.monitor as monitor
from common.constants import mercure_defs, mercure_names, mercure_events
from process.status import is_ready_for_processing
from process.process_series import (
    process_series,
    move_results,
    trigger_notification,
    push_input_task,
    push_input_images,
    handle_processor_output,
)
from common.types import Task, TaskProcessing
import common.influxdb
import common.notification as notification


# Create local logger instance
logger = config.get_logger()
processing_loop = None  # type: helper.AsyncTimer # type: ignore


processor_lockfile = None
processor_is_locked = False

try:
    nomad_connection = nomad.Nomad(host="172.17.0.1", timeout=5) # type: ignore
    logger.info("Connected to Nomad")
except:
    nomad_connection = None


[docs]async def search_folder(counter) -> bool: global processor_lockfile global processor_is_locked global nomad_connection helper.g_log("events.run", 1) tasks: Dict[str, float] = {} complete = [] for entry in os.scandir(config.mercure.processing_folder): logger.debug(f"Scanning folder {entry.name}") if entry.is_dir(): if is_ready_for_processing(entry.path): logger.debug(f"{entry.name} ready for processing") modification_time = entry.stat().st_mtime tasks[entry.path] = modification_time continue # Some tasks are actually currently processing if (Path(entry.path) / ".processing").exists() and (Path(entry.path) / "nomad_job.json").exists(): logger.debug(f"{entry.name} currently processing") with open(Path(entry.path) / "nomad_job.json", "r") as f: id = json.load(f).get("DispatchedJobID") logger.debug(f"Job id: {id}") job_info = nomad_connection.job.get_job(id) # job_allocations = nomad_connection.job.get_allocations(id) status = job_info.get("Status") if status == "dead": logger.debug(f"{entry.name} is complete") logs = [] try: allocations = nomad_connection.job.get_allocations(id) alloc = allocations[-1]["ID"] logger.debug("========== logs ==========") for s in ("stdout", "stderr"): result = nomad_connection.client.stream_logs.stream(alloc, "process", s) if len(result): data = json.loads(result).get("Data") result = base64.b64decode(data).decode(encoding="utf-8") result = f"{s}:\n" + result logs.append(result) logger.info(result) except: logger.exception("Failed to retrieve process logs.") if not config.mercure.processing_logs.discard_logs: task_path = Path(entry.path) / "in" / "task.json" task = Task(**json.loads(task_path.read_text())) assert isinstance(task.process, TaskProcessing) monitor.send_process_logs(task.id, task.process.module_name, "\n".join(logs)) complete.append(dict(path=Path(entry.path))) # , info=job_info, allocations=job_allocations)) else: logger.debug(f"Status: {status}") # Move complete tasks for c in complete: p_folder = c["path"] in_folder = p_folder / "in" out_folder = p_folder / "out" logger.debug(f"Complete task: {p_folder.name}") job_info = json.loads((p_folder / "nomad_job.json").read_text()) # Move task.json over to the output directory if it wasn't moved by the processing module push_input_task(in_folder, out_folder) # Patch the nomad info into the task file. task_path = out_folder / "task.json" task = Task(**json.loads(task_path.read_text())) with task_path.open("w") as f: task.nomad_info = job_info json.dump(task.dict(), f) # Copy input images if configured in rule task_processing = (task.process[0] if isinstance(task.process,list) else task.process) if not task_processing: continue if task_processing.retain_input_images == True: push_input_images(task.id, in_folder, out_folder) # Remember the number of DCM files in the output folder (for logging purpose) file_count_complete = len(list(Path(out_folder).glob(mercure_names.DCMFILTER))) handle_processor_output(task, task_processing, 0, p_folder) # If the only file is task.json, the processing failed if [p.name for p in out_folder.rglob("*")] == ["task.json"]: logger.error("Processing failed", task.id) move_results(task.id, p_folder, None, False, False) trigger_notification(task, mercure_events.ERROR) continue needs_dispatching = True if task.get("dispatch") else False move_results(task.id, p_folder, None, True, needs_dispatching) shutil.rmtree(in_folder) (p_folder / "nomad_job.json").unlink() (p_folder / ".processing").unlink() p_folder.rmdir() monitor.send_task_event( monitor.task_event.PROCESS_COMPLETE, task.id, file_count_complete, "", "Processing complete" ) # If dispatching not needed, then trigger the completion notification (for Nomad) if not needs_dispatching: trigger_notification(task, mercure_events.COMPLETED) monitor.send_task_event(monitor.task_event.COMPLETE, task.id, 0, "", "Task complete") # Check if processing has been suspended via the UI if processor_lockfile and processor_lockfile.exists(): if not processor_is_locked: processor_is_locked = True logger.info(f"Processing halted") return False else: if processor_is_locked: processor_is_locked = False logger.info("Processing resumed") # Return if no tasks have been found if not len(tasks): # logger.debug("No tasks found") return False sorted_tasks = sorted(tasks) # TODO: Add priority sorting. However, do not honor the priority flag for, e.g., every third run # so that stagnation of cases is avoided # Only process one case at a time because the processing might take a while and # another instance might have processed the other entries already. So the folder # needs to be refreshed each time task_folder = Path(sorted_tasks[0]) try: await process_series(task_folder) # Return true, so that the parent function will trigger another search of the folder return True except Exception: for p in (task_folder / "out" / "task.json", task_folder / "in" / "task.json"): try: task_id = json.load(open(p))["id"] logger.error("Exception while processing", task_id) # handle_error break except: pass else: logger.error("Exception while processing", None) # handle_error return False
[docs]async def run_processor() -> None: """Main processing function that is called every second.""" if helper.is_terminated(): return try: config.read_config() except Exception: logger.warning( # handle_error "Unable to update configuration. Skipping processing", None, event_type=monitor.m_events.CONFIG_UPDATE, ) return call_counter = 0 while await search_folder(call_counter): call_counter += 1 # If termination is requested, stop processing series after the active one has been completed if helper.is_terminated(): return
[docs]def exit_processor() -> None: """Callback function that is triggered when the process terminates. Stops the asyncio event loop.""" helper.loop.call_soon_threadsafe(helper.loop.stop)
[docs]async def terminate_process(signalNumber, loop) -> None: """Triggers the shutdown of the service.""" helper.g_log("events.shutdown", 1) logger.info("Shutdown requested") monitor.send_event(monitor.m_events.SHUTDOWN_REQUEST, monitor.severity.INFO) # Note: processing_loop can be read here because it has been declared as global variable if "processing_loop" in globals() and processing_loop.is_running: processing_loop.stop() helper.trigger_terminate()
[docs]def main(args=sys.argv[1:]) -> None: global processor_lockfile if "--reload" in args or os.getenv("MERCURE_ENV", "PROD").lower() == "dev": # start_reloader will only return in a monitored subprocess reloader = hupper.start_reloader("processor.main") import logging logging.getLogger("watchdog").setLevel(logging.WARNING) logger.info("") logger.info(f"mercure DICOM Processor ver {mercure_defs.VERSION}") logger.info("--------------------------------------------") logger.info("") # Register system signals to be caught signals = (signal.SIGTERM, signal.SIGINT) for s in signals: helper.loop.add_signal_handler(s, lambda s=s: asyncio.create_task(terminate_process(s, helper.loop))) instance_name = "main" if len(sys.argv) > 1: instance_name = sys.argv[1] # Read the configuration file and terminate if it cannot be read try: config.read_config() except Exception: logger.exception("Cannot start service. Going down.") sys.exit(1) appliance_name = config.mercure.appliance_name logger.info(f"Appliance name = {appliance_name}") logger.info(f"Instance name = {instance_name}") logger.info(f"Instance PID = {os.getpid()}") logger.info(f"Thread ID = {threading.get_native_id()}") logger.info(sys.version) notification.setup() monitor.configure("processor", instance_name, config.mercure.bookkeeper) monitor.send_event(monitor.m_events.BOOT, monitor.severity.INFO, f"PID = {os.getpid()}") if len(config.mercure.graphite_ip) > 0: logger.info(f"Sending events to graphite server: {config.mercure.graphite_ip}") graphite_prefix = "mercure." + appliance_name + ".processor." + instance_name graphyte.init(config.mercure.graphite_ip, config.mercure.graphite_port, prefix=graphite_prefix) if len(config.mercure.influxdb_host) > 0: logger.info(f"Sending events to influxdb server: {config.mercure.influxdb_host}") common.influxdb.init( config.mercure.influxdb_host, config.mercure.influxdb_token, config.mercure.influxdb_org, config.mercure.influxdb_bucket, "mercure." + appliance_name + ".processor." + instance_name ) logger.info(f"Processing folder: {config.mercure.processing_folder}") processor_lockfile = Path(config.mercure.processing_folder + "/" + mercure_names.HALT) # Start the timer that will periodically trigger the scan of the incoming folder global processing_loop processing_loop = helper.AsyncTimer(config.mercure.dispatcher_scan_interval, run_processor) # , exit_processor) helper.g_log("events.boot", 1) try: processing_loop.run_until_complete(helper.loop) # # Process will exit here once the asyncio loop has been stopped monitor.send_event(monitor.m_events.SHUTDOWN, monitor.severity.INFO) except Exception as e: monitor.send_event(monitor.m_events.SHUTDOWN, monitor.severity.ERROR, str(e)) finally: # Finish all asyncio tasks that might be still pending remaining_tasks = helper.asyncio.all_tasks(helper.loop) # type: ignore[attr-defined] if remaining_tasks: helper.loop.run_until_complete(helper.asyncio.gather(*remaining_tasks)) logger.info("Going down now")
if __name__ == "__main__": main()