Package common

common.config

config.py

mercure’s configuration management, used by various mercure modules

common.config.check_folders() bool[source]

Checks if all required folders for handling the DICOM files exist.

common.config.mercure: Config
common.config.read_config() Config[source]

Reads the configuration settings (rules, targets, general settings) from the configuration file. The configuration will only be updated if the file has changed compared the the last function call. If the configuration file is locked by another process, an exception will be raised.

common.config.read_tagslist() None[source]

Reads the list of supported DICOM tags with example values, displayed the UI.

common.config.save_config() None[source]

Saves the current configuration in a file on the disk. Raises an exception if the file has been locked by another process.

common.config.write_configfile(json_content) None[source]

Rewrites the config file using the JSON data passed as argument. Used by the config editor of the webgui.

common.constants

constants.py

mercure-wide constants, used for standardizing key names and extensions.

class common.constants.mercure_actions[source]

Bases: object

BOTH = 'both'
DISCARD = 'discard'
NOTIFICATION = 'notification'
PROCESS = 'process'
ROUTE = 'route'
class common.constants.mercure_config[source]

Bases: object

MODULES = 'modules'
RULES = 'rules'
TARGETS = 'targets'
class common.constants.mercure_defs[source]

Bases: object

SEPARATOR = '#'
VERSION = '0.3.1-beta.1'
class common.constants.mercure_events(value)[source]

Bases: IntEnum

An enumeration.

COMPLETED = 1
ERROR = 2
RECEIVED = 0
class common.constants.mercure_folders[source]

Bases: object

DISCARD = 'discard_folder'
ERROR = 'error_folder'
INCOMING = 'incoming_folder'
OUTGOING = 'outgoing_folder'
PROCESSING = 'processing_folder'
STUDIES = 'studies_folder'
SUCCESS = 'success_folder'
class common.constants.mercure_info[source]

Bases: object

ACC = 'acc'
ACTION = 'action'
MERCURE_APPLIANCE = 'mercure_appliance'
MERCURE_SERVER = 'mercure_server'
MERCURE_VERSION = 'mercure_version'
MRN = 'mrn'
TRIGGERED_RULES = 'triggered_rules'
UID = 'uid'
UID_TYPE = 'uid_type'
class common.constants.mercure_names[source]

Bases: object

DCM = '.dcm'
DCMFILTER = '*.dcm'
ERROR = '.error'
FORCE_COMPLETE = '.force-complete'
HALT = 'HALT'
LOCK = '.lock'
PROCESSING = '.processing'
RUNNING = '.running'
SENDLOG = 'sent.txt'
TAGS = '.tags'
TASKFILE = 'task.json'
class common.constants.mercure_options[source]

Bases: object

FALSE = 'False'
INVALID = '#@INVALID@#'
MISSING = 'MISSING'
NORMAL = 'normal'
OFFPEAK = 'offpeak'
SERIES = 'series'
STUDY = 'study'
TRUE = 'True'
URGENT = 'urgent'
class common.constants.mercure_rule[source]

Bases: object

ACTION = 'action'
ACTION_TRIGGER = 'action_trigger'
DISABLED = 'disabled'
FALLBACK = 'fallback'
NOTIFICATION_PAYLOAD = 'notification_payload'
NOTIFICATION_TRIGGER_COMPLETION = 'notification_trigger_completion'
NOTIFICATION_TRIGGER_ERROR = 'notification_trigger_error'
NOTIFICATION_TRIGGER_RECEPTION = 'notification_trigger_reception'
NOTIFICATION_WEBHOOK = 'notification_webhook'
PRIORITY = 'priority'
PROCESSING_MODULE = 'processing_module'
RULE = 'rule'
STUDY_TRIGGER = 'study_trigger'
STUDY_TRIGGER_CONDITION = 'study_trigger_condition'
STUDY_TRIGGER_CONDITION_RECEIVED_SERIES = 'received_series'
STUDY_TRIGGER_CONDITION_TIMEOUT = 'timeout'
TARGET = 'target'
class common.constants.mercure_sections[source]

Bases: object

DISPATCH = 'dispatch'
FILES = 'files'
INFO = 'info'
JOURNAL = 'journal'
NOTIFICATION = 'notification'
PROCESS = 'process'
STUDY = 'study'
class common.constants.mercure_study[source]

Bases: object

COMPLETE_FORCE = 'complete_force'
COMPLETE_REQUIRED_SERIES = 'complete_required_series'
COMPLETE_TRIGGER = 'complete_trigger'
CREATION_TIME = 'creation_time'
LAST_RECEIVE_TIME = 'last_receive_time'
RECEIVED_SERIES = 'received_series'
STUDY_UID = 'study_uid'

common.helper

helper.py

Various internal helper functions for mercure.

class common.helper.AsyncTimer(interval: int, func)[source]

Bases: object

run_until_complete(loop=None) None[source]
start() None[source]
stop() None[source]

Signal to stop after the current

class common.helper.FileLock(path_for_lockfile: Path)[source]

Bases: object

Helper class that implements a file lock. The lock file will be removed also from the destructor so that no spurious lock files remain if exceptions are raised.

free() None[source]
lockCreated = False
class common.helper.RepeatedTimer(interval: float, function: Callable, exit_function: Callable, *args, **kwargs)[source]

Bases: object

Helper class for running a continuous timer that is suspended while the worker function is running

start() None[source]

Starts the timer for triggering the calllback after the defined interval.

stop() None[source]

Stops the timer and executes the defined exit callback function.

common.helper.g_log(*args, **kwargs) None[source]
common.helper.get_runner() str[source]

Returns the name of the mechanism that is used for running mercure in the current installation (systemd, docker, nomad).

common.helper.is_terminated() bool[source]

Checks if the process will terminate after the current task.

common.helper.send_to_graphite(*args, **kwargs) None[source]

Wrapper for asynchronous graphite call to avoid wait time of main loop.

common.helper.send_to_influxdb(*args, **kwargs) None[source]

Wrapper for asynchronous influxdb call to avoid wait time of main loop.

common.helper.trigger_terminate() None[source]

Trigger that the processing loop should terminate after finishing the currently active task.

common.monitor

monitor.py

Helper functions and definitions for monitoring mercure’s operations via the bookkeeper module.

exception common.monitor.MonitorHTTPError(status_code: int, message: str)[source]

Bases: Exception

Exception raised when a HTTP error occurs.

async common.monitor.async_post(endpoint: str, **kwargs)[source]
async common.monitor.async_send_task_event(event: task_event, task_id: str, file_count: int, target: str, info: str)[source]
common.monitor.configure(module, instance, address) None[source]

Configures the connection to the bookkeeper module. If not called, events will not be transmitted to the bookkeeper.

async common.monitor.do_post(endpoint, kwargs, catch_errors=False) None[source]
async common.monitor.find_tasks(search_term='') Any[source]
async common.monitor.get(endpoint: str, payload: Any = {}) Any[source]
async common.monitor.get_series(series_uid='') Any[source]
async common.monitor.get_task_events(task_id='') Any[source]
async common.monitor.get_task_info(task_id='') Any[source]
async common.monitor.get_tasks() Any[source]
async common.monitor.get_tests() Any[source]
common.monitor.post(endpoint: str, **kwargs) None[source]
common.monitor.send_event(event: m_events, severity: severity = severity.INFO, description: str = '') None[source]

Sends information about general mercure events to the bookkeeper (e.g., during module start).

common.monitor.send_process_logs(task_id: str, module_name: str, logs: str) None[source]
common.monitor.send_processor_output(task: Task, task_processing: TaskProcessing, index: int, output: dict) None[source]
common.monitor.send_register_series(tags: Dict[str, str]) None[source]

Registers a received series on the bookkeeper. This should be called when a series has been fully received and the DICOM tags have been parsed.

common.monitor.send_register_task(task_id: str, series_uid: str, parent_id: Optional[str] = None) None[source]

Registers a new task on the bookkeeper. This should be called whenever a new task has been created.

common.monitor.send_task_event(event: task_event, task_id: str, file_count: int, target: str, info: str) None[source]

Send an event related to a specific series to the bookkeeper.

common.monitor.send_update_task(task: Task) None[source]

Registers a new task on the bookkeeper. This should be called whenever a new task has been created.

common.monitor.send_webgui_event(event: w_events, user: str, description='') None[source]

Sends information about an event on the webgui to the bookkeeper.

common.monitor.set_api_key() None[source]
common.monitor.task_event_payload(event: task_event, task_id: str, file_count: int, target, info)[source]
async common.monitor.task_process_logs(task_id='') Any[source]

common.notification

notification.py

Helper functions for triggering webhook calls.

common.notification.get_task_custom_notification(task: Task) Optional[str][source]
common.notification.get_task_requested_notification(task: Task) bool[source]
common.notification.parse_payload(payload: str, event: mercure_events, rule_name: str, task_id: str, details: str = '', context: dict = {}, *, task: Optional[Task] = None, tags_list: Optional[Dict[str, str]] = None) str[source]
common.notification.post(url: str, payload: Any) None[source]
common.notification.send_email(address: str, payload: str, event: mercure_events, rule_name: str, rule_type: str) None[source]
common.notification.send_email_helper(to: str, subject: str, content: str, rule_type='plain') None[source]
common.notification.send_webhook(url: str, payload: str) None[source]
common.notification.setup() bool[source]

Load the SSL certificate if it is configured (after the configuration has been read).

common.notification.trigger_notification_for_rule(rule_name: str, task_id: str, event: mercure_events, *, task: Task, details: Optional[str] = '', send_always: bool = False)[source]
common.notification.trigger_notification_for_rule(rule_name: str, task_id: str, event: mercure_events, *, tags_list: Dict[str, str], details: Optional[str] = '', send_always: bool = False)

common.rule_evaluation

rule_evaluation.py

Helper functions for evaluating routing rules and study-completion conditions.

common.rule_evaluation.eval_rule(rule: str, tags: Dict[str, str]) Any[source]

Parses the given rule, replaces all tag variables with values from the given tags dictionary, and evaluates the rule. If the rule is invalid, an exception will be raised.

common.rule_evaluation.parse_completion_series(task_id: str, completion_str: str, received_series: list) bool[source]

Evaluates the configuration string defining which series are required using the list of received series as input. Returns true if all required series have arrived, otherwise false is returned.

common.rule_evaluation.parse_rule(rule: str, tags: Dict[str, str]) Tuple[bool, Optional[str], Optional[str]][source]
common.rule_evaluation.replace_tags(rule: str, tags: Dict[str, str]) Any[source]

Replaces all tags with format @tagname@ in the given rule string with the corresponding values from the currently processed series (stored in the second argument).

common.rule_evaluation.test_completion_series(value: str) str[source]

Tests if the given string with the list of series required for study completion has valid format. If so, True is returned as string, otherwise the error description is returned.

common.tagslist

tagslist.py

Helper functions for displaying a list of DICOM tags available for routing in the graphical user interface of mercure.

common.types

types.py

Definitions for using TypedDicts throughout mercure.

class common.types.Compat[source]

Bases: object

get(item, els=None) Any[source]
class common.types.Config(*, appliance_name: str, port: int, accept_compressed_images: bool, incoming_folder: str, studies_folder: str, outgoing_folder: str, success_folder: str, error_folder: str, discard_folder: str, processing_folder: str, router_scan_interval: int, dispatcher_scan_interval: int, cleaner_scan_interval: int, retention: int, emergency_clean_percentage: int, retry_delay: int, retry_max: int, series_complete_trigger: int, study_complete_trigger: int, study_forcecomplete_trigger: int, dicom_receiver: DicomReceiverConfig = DicomReceiverConfig(additional_tags={}), graphite_ip: str, graphite_port: int, influxdb_host: str, influxdb_token: str, influxdb_org: str, influxdb_bucket: str, bookkeeper: str, offpeak_start: str, offpeak_end: str, targets: Dict[str, Target], rules: Dict[str, Rule], modules: Dict[str, Module], process_runner: typing_extensions.Literal[docker, nomad] = '', processing_runtime: Optional[str] = None, bookkeeper_api_key: Optional[str] = None, features: Dict[str, bool], processing_logs: ProcessingLogsConfig = ProcessingLogsConfig(discard_logs=False, logs_file_store=None), email_notification_from: str = 'mercure@mercure.mercure', support_root_modules: Optional[bool] = False, webhook_certificate_location: Optional[str] = None)[source]

Bases: BaseModel, Compat

accept_compressed_images: bool
appliance_name: str
bookkeeper: str
bookkeeper_api_key: Optional[str]
cleaner_scan_interval: int
dicom_receiver: DicomReceiverConfig
discard_folder: str
dispatcher_scan_interval: int
email_notification_from: str
emergency_clean_percentage: int
error_folder: str
features: Dict[str, bool]
graphite_ip: str
graphite_port: int
incoming_folder: str
influxdb_bucket: str
influxdb_host: str
influxdb_org: str
influxdb_token: str
modules: Dict[str, Module]
offpeak_end: str
offpeak_start: str
outgoing_folder: str
port: int
process_runner: typing_extensions.Literal[docker, nomad]
processing_folder: str
processing_logs: ProcessingLogsConfig
processing_runtime: Optional[str]
retention: int
retry_delay: int
retry_max: int
router_scan_interval: int
rules: Dict[str, Rule]
series_complete_trigger: int
studies_folder: str
study_complete_trigger: int
study_forcecomplete_trigger: int
success_folder: str
support_root_modules: Optional[bool]
targets: Dict[str, Target]
webhook_certificate_location: Optional[str]
class common.types.DicomReceiverConfig(*, additional_tags: Dict[str, str] = {})[source]

Bases: BaseModel

additional_tags: Dict[str, str]
class common.types.DicomTLSTarget(*, contact: Optional[str] = '', comment: str = '', target_type: typing_extensions.Literal[dicomtls] = 'dicomtls', ip: str, port: str, aet_target: str, aet_source: Optional[str] = '', tls_key: str, tls_cert: str, ca_cert: str)[source]

Bases: Target

aet_source: Optional[str]
aet_target: str
ca_cert: str
ip: str
port: str
target_type: typing_extensions.Literal[dicomtls]
tls_cert: str
tls_key: str
class common.types.DicomTarget(*, contact: Optional[str] = '', comment: str = '', target_type: typing_extensions.Literal[dicom] = 'dicom', ip: str, port: str, aet_target: str, aet_source: Optional[str] = '')[source]

Bases: Target

aet_source: Optional[str]
aet_target: str
ip: str
port: str
target_type: typing_extensions.Literal[dicom]
class common.types.DicomWebTarget(*, contact: Optional[str] = '', comment: str = '', target_type: typing_extensions.Literal[dicomweb] = 'dicomweb', url: str, qido_url_prefix: Optional[str] = None, wado_url_prefix: Optional[str] = None, stow_url_prefix: Optional[str] = None, access_token: Optional[str] = None, http_user: Optional[str] = None, http_password: Optional[str] = None)[source]

Bases: Target

access_token: Optional[str]
http_password: Optional[str]
http_user: Optional[str]
qido_url_prefix: Optional[str]
stow_url_prefix: Optional[str]
target_type: typing_extensions.Literal[dicomweb]
url: str
wado_url_prefix: Optional[str]
class common.types.DummyTarget(*, contact: Optional[str] = '', comment: str = '', target_type: typing_extensions.Literal[dummy] = 'dummy')[source]

Bases: Target

target_type: typing_extensions.Literal[dummy]
class common.types.EmptyDict(_typename, _fields=None, /, **kwargs)[source]

Bases: TypedDict

class common.types.FolderTarget(*, contact: Optional[str] = '', comment: str = '', target_type: typing_extensions.Literal[folder] = 'folder', folder: str)[source]

Bases: Target

folder: str
target_type: typing_extensions.Literal[folder]
class common.types.Module(*, docker_tag: Optional[str] = '', additional_volumes: Optional[str] = '', environment: Optional[str] = '', docker_arguments: Optional[str] = '', settings: Dict[str, Any] = {}, contact: Optional[str] = '', comment: Optional[str] = '', constraints: Optional[str] = '', resources: Optional[str] = '', requires_root: Optional[bool] = False)[source]

Bases: BaseModel, Compat

additional_volumes: Optional[str]
comment: Optional[str]
constraints: Optional[str]
contact: Optional[str]
docker_arguments: Optional[str]
docker_tag: Optional[str]
environment: Optional[str]
requires_root: Optional[bool]
resources: Optional[str]
settings: Dict[str, Any]
class common.types.ProcessingLogsConfig(*, discard_logs: bool = False, logs_file_store: Optional[str] = None)[source]

Bases: BaseModel

discard_logs: bool
logs_file_store: Optional[str]
class common.types.RsyncTarget(*, contact: Optional[str] = '', comment: str = '', target_type: typing_extensions.Literal[rsync] = 'rsync', folder: str, user: str, host: str, password: Optional[str] = None, run_on_complete: bool = False)[source]

Bases: Target

folder: str
host: str
password: Optional[str]
run_on_complete: bool
target_type: typing_extensions.Literal[rsync]
user: str
class common.types.Rule(*, rule: str = 'False', target: str = '', disabled: bool = False, fallback: bool = False, contact: str = '', comment: str = '', tags: str = '', action: typing_extensions.Literal[route, both, process, discard, notification] = 'route', action_trigger: typing_extensions.Literal[series, study] = 'series', study_trigger_condition: typing_extensions.Literal[timeout, received_series] = 'timeout', study_trigger_series: str = '', priority: typing_extensions.Literal[normal, urgent, offpeak] = 'normal', processing_module: Union[str, List[str]] = '', processing_settings: Union[List[Dict[str, Any]], Dict[str, Any]] = {}, processing_retain_images: bool = False, notification_email: str = '', notification_webhook: str = '', notification_payload: str = '', notification_payload_body: str = '', notification_email_body: str = '', notification_email_type: str = 'plain', notification_trigger_reception: bool = True, notification_trigger_completion: bool = True, notification_trigger_completion_on_request: bool = False, notification_trigger_error: bool = True)[source]

Bases: BaseModel, Compat

action: typing_extensions.Literal[route, both, process, discard, notification]
action_trigger: typing_extensions.Literal[series, study]
comment: str
contact: str
disabled: bool
fallback: bool
notification_email: str
notification_email_body: str
notification_email_type: str
notification_payload: str
notification_payload_body: str
notification_trigger_completion: bool
notification_trigger_completion_on_request: bool
notification_trigger_error: bool
notification_trigger_reception: bool
notification_webhook: str
priority: typing_extensions.Literal[normal, urgent, offpeak]
processing_module: Union[str, List[str]]
processing_retain_images: bool
processing_settings: Union[List[Dict[str, Any]], Dict[str, Any]]
rule: str
study_trigger_condition: typing_extensions.Literal[timeout, received_series]
study_trigger_series: str
tags: str
target: str
class common.types.S3Target(*, contact: Optional[str] = '', comment: str = '', target_type: typing_extensions.Literal[s3] = 's3', region: str, bucket: str, prefix: str, access_key_id: str, secret_access_key: str)[source]

Bases: Target

access_key_id: str
bucket: str
prefix: str
region: str
secret_access_key: str
target_type: typing_extensions.Literal[s3]
class common.types.SftpTarget(*, contact: Optional[str] = '', comment: str = '', target_type: typing_extensions.Literal[sftp] = 'sftp', folder: str, user: str, host: str, password: Optional[str] = None)[source]

Bases: Target

folder: str
host: str
password: Optional[str]
target_type: typing_extensions.Literal[sftp]
user: str
class common.types.Target(*, contact: Optional[str] = '', comment: str = '')[source]

Bases: BaseModel, Compat

comment: str
contact: Optional[str]
classmethod get_name() str[source]
classmethod validate(v)[source]

Parse the target as any of the known target types.

class common.types.Task(*, info: TaskInfo, id: str, dispatch: Union[TaskDispatch, EmptyDict] = {}, process: Union[TaskProcessing, EmptyDict, List[TaskProcessing]] = {}, study: Union[TaskStudy, EmptyDict] = {}, nomad_info: Optional[Any] = None)[source]

Bases: BaseModel, Compat

class Config[source]

Bases: object

extra = 'forbid'
dispatch: Union[TaskDispatch, EmptyDict]
id: str
info: TaskInfo
nomad_info: Optional[Any]
process: Union[TaskProcessing, EmptyDict, List[TaskProcessing]]
study: Union[TaskStudy, EmptyDict]
class common.types.TaskDispatch(*, target_name: str, retries: Optional[int] = 0, next_retry_at: Optional[float] = 0, series_uid: Optional[str] = None)[source]

Bases: BaseModel, Compat

next_retry_at: Optional[float]
retries: Optional[int]
series_uid: Optional[str]
target_name: str
class common.types.TaskHasStudy(*, info: TaskInfo, id: str, dispatch: Union[TaskDispatch, EmptyDict] = {}, process: Union[TaskProcessing, EmptyDict, List[TaskProcessing]] = {}, study: TaskStudy, nomad_info: Optional[Any] = None)[source]

Bases: Task

study: TaskStudy
class common.types.TaskInfo(*, action: typing_extensions.Literal[route, both, process, discard, notification], uid: str, uid_type: typing_extensions.Literal[series, study], triggered_rules: Union[Dict[str, typing_extensions.Literal[True]], str], applied_rule: Optional[str] = None, patient_name: Optional[str] = None, mrn: str, acc: str, sender_address: str = 'MISSING', mercure_version: str, mercure_appliance: str, mercure_server: str, device_serial_number: Optional[str] = None)[source]

Bases: BaseModel, Compat

acc: str
action: typing_extensions.Literal[route, both, process, discard, notification]
applied_rule: Optional[str]
device_serial_number: Optional[str]
mercure_appliance: str
mercure_server: str
mercure_version: str
mrn: str
patient_name: Optional[str]
sender_address: str
triggered_rules: Union[Dict[str, typing_extensions.Literal[True]], str]
uid: str
uid_type: typing_extensions.Literal[series, study]
class common.types.TaskProcessing(*, module_name: str, module_config: Optional[Module] = None, settings: Dict[str, Any] = {}, retain_input_images: bool, output: Optional[Dict] = None)[source]

Bases: BaseModel, Compat

module_config: Optional[Module]
module_name: str
output: Optional[Dict]
retain_input_images: bool
settings: Dict[str, Any]
class common.types.TaskStudy(*, study_uid: str, complete_trigger: Optional[str] = None, complete_required_series: str, creation_time: str, last_receive_time: str, received_series: Optional[List[str]] = None, received_series_uid: Optional[List[str]] = None, complete_force: bool = False)[source]

Bases: BaseModel, Compat

complete_force: bool
complete_required_series: str
complete_trigger: Optional[str]
creation_time: str
last_receive_time: str
received_series: Optional[List[str]]
received_series_uid: Optional[List[str]]
study_uid: str
class common.types.UnsetRule(_typename, _fields=None, /, **kwargs)[source]

Bases: TypedDict

rule: str
class common.types.XnatTarget(*, contact: Optional[str] = '', comment: str = '', target_type: typing_extensions.Literal[xnat] = 'xnat', project_id: str, host: str, user: str, password: str)[source]

Bases: Target

host: str
password: str
project_id: str
target_type: typing_extensions.Literal[xnat]
user: str