Package common

common.config

config.py

mercure’s configuration management, used by various mercure modules

common.config.check_folders() bool[source]

Checks if all required folders for handling the DICOM files exist.

common.config.mercure: Config
common.config.read_config() Config[source]

Reads the configuration settings (rules, targets, general settings) from the configuration file. The configuration will only be updated if the file has changed compared the the last function call. If the configuration file is locked by another process, an exception will be raised.

common.config.save_config() None[source]

Saves the current configuration in a file on the disk. Raises an exception if the file has been locked by another process.

common.config.write_configfile(json_content) None[source]

Rewrites the config file using the JSON data passed as argument. Used by the config editor of the webgui.

common.constants

constants.py

mercure-wide constants, used for standardizing key names and extensions.

class common.constants.mercure_actions[source]

Bases: object

BOTH = 'both'
DISCARD = 'discard'
NOTIFICATION = 'notification'
PROCESS = 'process'
ROUTE = 'route'
class common.constants.mercure_config[source]

Bases: object

MODULES = 'modules'
RULES = 'rules'
TARGETS = 'targets'
class common.constants.mercure_defs[source]

Bases: object

SEPARATOR = '#'
VERSION = '0.2.1-beta.1'
class common.constants.mercure_events[source]

Bases: object

COMPLETION = 1
ERROR = 2
RECEPTION = 0
class common.constants.mercure_folders[source]

Bases: object

DISCARD = 'discard_folder'
ERROR = 'error_folder'
INCOMING = 'incoming_folder'
OUTGOING = 'outgoing_folder'
PROCESSING = 'processing_folder'
STUDIES = 'studies_folder'
SUCCESS = 'success_folder'
class common.constants.mercure_info[source]

Bases: object

ACC = 'acc'
ACTION = 'action'
MERCURE_APPLIANCE = 'mercure_appliance'
MERCURE_SERVER = 'mercure_server'
MERCURE_VERSION = 'mercure_version'
MRN = 'mrn'
TRIGGERED_RULES = 'triggered_rules'
UID = 'uid'
UID_TYPE = 'uid_type'
class common.constants.mercure_names[source]

Bases: object

DCM = '.dcm'
DCMFILTER = '*.dcm'
ERROR = '.error'
HALT = 'HALT'
LOCK = '.lock'
PROCESSING = '.processing'
RUNNING = '.running'
SENDLOG = 'sent.txt'
TAGS = '.tags'
TASKFILE = 'task.json'
class common.constants.mercure_options[source]

Bases: object

FALSE = 'False'
INVALID = '#@INVALID@#'
MISSING = 'MISSING'
NORMAL = 'normal'
OFFPEAK = 'offpeak'
SERIES = 'series'
STUDY = 'study'
TRUE = 'True'
URGENT = 'urgent'
class common.constants.mercure_rule[source]

Bases: object

ACTION = 'action'
ACTION_TRIGGER = 'action_trigger'
DISABLED = 'disabled'
FALLBACK = 'fallback'
NOTIFICATION_PAYLOAD = 'notification_payload'
NOTIFICATION_TRIGGER_COMPLETION = 'notification_trigger_completion'
NOTIFICATION_TRIGGER_ERROR = 'notification_trigger_error'
NOTIFICATION_TRIGGER_RECEPTION = 'notification_trigger_reception'
NOTIFICATION_WEBHOOK = 'notification_webhook'
PRIORITY = 'priority'
PROCESSING_MODULE = 'processing_module'
RULE = 'rule'
STUDY_TRIGGER = 'study_trigger'
STUDY_TRIGGER_CONDITION = 'study_trigger_condition'
STUDY_TRIGGER_CONDITION_RECEIVED_SERIES = 'received_series'
STUDY_TRIGGER_CONDITION_TIMEOUT = 'timeout'
TARGET = 'target'
class common.constants.mercure_sections[source]

Bases: object

DISPATCH = 'dispatch'
FILES = 'files'
INFO = 'info'
JOURNAL = 'journal'
NOTIFICATION = 'notification'
PROCESS = 'process'
STUDY = 'study'
class common.constants.mercure_study[source]

Bases: object

COMPLETE_FORCE = 'complete_force'
COMPLETE_REQUIRED_SERIES = 'complete_required_series'
COMPLETE_TRIGGER = 'complete_trigger'
CREATION_TIME = 'creation_time'
LAST_RECEIVE_TIME = 'last_receive_time'
RECEIVED_SERIES = 'received_series'
STUDY_UID = 'study_uid'

common.helper

helper.py

Various internal helper functions for mercure.

class common.helper.FileLock(path_for_lockfile: Path)[source]

Bases: object

Helper class that implements a file lock. The lock file will be removed also from the destructor so that no spurious lock files remain if exceptions are raised.

free() None[source]
class common.helper.RepeatedTimer(interval: float, function: Callable, exit_function: Callable, *args, **kwargs)[source]

Bases: object

Helper class for running a continuous timer that is suspended while the worker function is running

start() None[source]

Starts the timer for triggering the calllback after the defined interval.

stop() None[source]

Stops the timer and executes the defined exit callback function.

common.helper.g_log(*args, **kwargs) None[source]

Sends diagnostic information to graphite (if configured).

common.helper.get_runner() str[source]

Returns the name of the mechanism that is used for running mercure in the current installation (systemd, docker, nomad).

common.helper.is_terminated() bool[source]

Checks if the process will terminate after the current task.

async common.helper.send_to_graphite(*args, **kwargs) None[source]

Wrapper for asynchronous graphite call to avoid wait time of main loop.

common.helper.trigger_terminate() None[source]

Trigger that the processing loop should terminate after finishing the currently active task.

common.monitor

monitor.py

Helper functions and definitions for monitoring mercure’s operations via the bookkeeper module.

exception common.monitor.MonitorHTTPError(status_code: int, message: str)[source]

Bases: Exception

Exception raised when a HTTP error occurs.

common.monitor.configure(module, instance, address) None[source]

Configures the connection to the bookkeeper module. If not called, events will not be transmitted to the bookkeeper.

async common.monitor.do_post(endpoint, kwargs, catch_errors=False) None[source]
async common.monitor.find_tasks(search_term='') Any[source]
async common.monitor.get(endpoint: str, payload: Any = {}) Any[source]
async common.monitor.get_series(series_uid='') Any[source]
async common.monitor.get_task_events(task_id='') Any[source]
async common.monitor.get_task_info(task_id='') Any[source]
async common.monitor.get_tasks() Any[source]
async common.monitor.get_tests() Any[source]
common.monitor.post(endpoint: str, **kwargs) None[source]
common.monitor.send_event(event: m_events, severity: severity = severity.INFO, description: str = '') None[source]

Sends information about general mercure events to the bookkeeper (e.g., during module start).

common.monitor.send_register_series(tags: Dict[str, str]) None[source]

Registers a received series on the bookkeeper. This should be called when a series has been fully received and the DICOM tags have been parsed.

common.monitor.send_register_task(task_id: str, series_uid: str, parent_id: Optional[str] = None) None[source]

Registers a new task on the bookkeeper. This should be called whenever a new task has been created.

common.monitor.send_task_event(event: task_event, task_id, file_count, target, info) None[source]

Send an event related to a specific series to the bookkeeper.

common.monitor.send_update_task(task: Task) None[source]

Registers a new task on the bookkeeper. This should be called whenever a new task has been created.

common.monitor.send_webgui_event(event: w_events, user: str, description='') None[source]

Sends information about an event on the webgui to the bookkeeper.

common.monitor.set_api_key() None[source]

common.notification

notification.py

Helper functions for triggering webhook calls.

common.notification.post(url: str, payload: Any) None[source]
common.notification.send_webhook(url, payload, event, rule_name, task_id='') None[source]

common.rule_evaluation

rule_evaluation.py

Helper functions for evaluating routing rules and study-completion conditions.

common.rule_evaluation.parse_completion_series(task_id: str, completion_str: str, received_series: list) bool[source]

Evaluates the configuration string defining which series are required using the list of received series as input. Returns true if all required series have arrived, otherwise false is returned.

common.rule_evaluation.parse_rule(rule: str, tags: Dict[str, str]) Union[Any, bool][source]

Parses the given rule, replaces all tag variables with values from the given tags dictionary, and evaluates the rule. If the rule is invalid, an exception will be raised.

common.rule_evaluation.replace_tags(rule: str, tags: Dict[str, str]) Any[source]

Replaces all tags with format @tagname@ in the given rule string with the corresponding values from the currently processed series (stored in the second argument).

common.rule_evaluation.test_completion_series(value: str) str[source]

Tests if the given string with the list of series required for study completion has valid format. If so, True is returned as string, otherwise the error description is returned.

common.rule_evaluation.test_rule(rule: str, tags: Dict[str, str]) str[source]

Tests the given rule for validity using the given tags dictionary. Similar to parse_rule but with more diagnostic output format for the testing dialog. Also warns about invalid tags.

common.types

types.py

Definitions for using TypedDicts throughout mercure.

class common.types.Compat[source]

Bases: object

get(item, els=None) Any[source]
class common.types.Config(*, appliance_name: str, port: int, accept_compressed_images: str, incoming_folder: str, studies_folder: str, outgoing_folder: str, success_folder: str, error_folder: str, discard_folder: str, processing_folder: str, router_scan_interval: int, dispatcher_scan_interval: int, cleaner_scan_interval: int, retention: int, retry_delay: int, retry_max: int, series_complete_trigger: int, study_complete_trigger: int, study_forcecomplete_trigger: int, graphite_ip: str, graphite_port: int, bookkeeper: str, offpeak_start: str, offpeak_end: str, targets: Dict[str, Target], rules: Dict[str, Rule], modules: Dict[str, Module], process_runner: Literal['docker', 'nomad', ''] = '', bookkeeper_api_key: str = None, features: Dict[str, bool])[source]

Bases: BaseModel, Compat

accept_compressed_images: str
appliance_name: str
bookkeeper: str
bookkeeper_api_key: Optional[str]
cleaner_scan_interval: int
discard_folder: str
dispatcher_scan_interval: int
error_folder: str
features: Dict[str, bool]
graphite_ip: str
graphite_port: int
incoming_folder: str
modules: Dict[str, Module]
offpeak_end: str
offpeak_start: str
outgoing_folder: str
port: int
process_runner: Literal['docker', 'nomad', '']
processing_folder: str
retention: int
retry_delay: int
retry_max: int
router_scan_interval: int
rules: Dict[str, Rule]
series_complete_trigger: int
studies_folder: str
study_complete_trigger: int
study_forcecomplete_trigger: int
success_folder: str
targets: Dict[str, Target]
class common.types.DicomTarget(*, contact: str = '', comment: str = '', target_type: Literal['dicom'] = 'dicom', ip: str, port: str, aet_target: str, aet_source: str = '')[source]

Bases: Target

aet_source: Optional[str]
aet_target: str
ip: str
port: str
target_type: Literal['dicom']
class common.types.DicomWebTarget(*, contact: str = '', comment: str = '', target_type: Literal['dicomweb'] = 'dicomweb', url: str, qido_url_prefix: str = None, wado_url_prefix: str = None, stow_url_prefix: str = None, access_token: str = None, http_user: str = None, http_password: str = None)[source]

Bases: Target

access_token: Optional[str]
http_password: Optional[str]
http_user: Optional[str]
qido_url_prefix: Optional[str]
stow_url_prefix: Optional[str]
target_type: Literal['dicomweb']
url: str
wado_url_prefix: Optional[str]
class common.types.DummyTarget(*, contact: str = '', comment: str = '', target_type: Literal['dummy'] = 'dummy')[source]

Bases: Target

target_type: Literal['dummy']
class common.types.EmptyDict(_typename, _fields=None, /, **kwargs)[source]

Bases: dict

class common.types.Module(*, docker_tag: str = '', additional_volumes: str = '', environment: str = '', docker_arguments: str = '', settings: Dict[str, Any] = {}, contact: str = '', comment: str = '', constraints: str = '', resources: str = '')[source]

Bases: BaseModel, Compat

additional_volumes: Optional[str]
comment: Optional[str]
constraints: Optional[str]
contact: Optional[str]
docker_arguments: Optional[str]
docker_tag: Optional[str]
environment: Optional[str]
resources: Optional[str]
settings: Dict[str, Any]
class common.types.Rule(*, rule: str = 'False', target: str = '', disabled: Literal['True', 'False'] = 'False', fallback: Literal['True', 'False'] = 'False', contact: str = '', comment: str = '', tags: str = '', action: Literal['route', 'both', 'process', 'discard', 'notification'] = 'route', action_trigger: Literal['series', 'study'] = 'series', study_trigger_condition: Literal['timeout', 'received_series'] = 'timeout', study_trigger_series: str = '', priority: Literal['normal', 'urgent', 'offpeak'] = 'normal', processing_module: str = '', processing_settings: Dict[str, Any] = {}, processing_retain_images: Literal['True', 'False'] = 'False', notification_webhook: str = '', notification_payload: str = '', notification_trigger_reception: Literal['True', 'False'] = 'True', notification_trigger_completion: Literal['True', 'False'] = 'True', notification_trigger_error: Literal['True', 'False'] = 'True')[source]

Bases: BaseModel, Compat

action: Literal['route', 'both', 'process', 'discard', 'notification']
action_trigger: Literal['series', 'study']
comment: str
contact: str
disabled: Literal['True', 'False']
fallback: Literal['True', 'False']
notification_payload: str
notification_trigger_completion: Literal['True', 'False']
notification_trigger_error: Literal['True', 'False']
notification_trigger_reception: Literal['True', 'False']
notification_webhook: str
priority: Literal['normal', 'urgent', 'offpeak']
processing_module: str
processing_retain_images: Literal['True', 'False']
processing_settings: Dict[str, Any]
rule: str
study_trigger_condition: Literal['timeout', 'received_series']
study_trigger_series: str
tags: str
target: str
class common.types.S3Target(*, contact: str = '', comment: str = '', target_type: Literal['s3'] = 's3', region: str, bucket: str, prefix: str, access_key_id: str, secret_access_key: str)[source]

Bases: Target

access_key_id: str
bucket: str
prefix: str
region: str
secret_access_key: str
target_type: Literal['s3']
class common.types.SftpTarget(*, contact: str = '', comment: str = '', target_type: Literal['sftp'] = 'sftp', folder: str, user: str, host: str, password: str = None)[source]

Bases: Target

folder: str
host: str
password: Optional[str]
target_type: Literal['sftp']
user: str
class common.types.Target(*, contact: str = '', comment: str = '')[source]

Bases: BaseModel, Compat

comment: str
contact: Optional[str]
classmethod get_name() str[source]
classmethod validate(v)[source]

Parse the target as any of the known target types.

class common.types.Task(*, info: TaskInfo, id: str, dispatch: Union[TaskDispatch, EmptyDict] = {}, process: Union[TaskProcessing, EmptyDict] = {}, study: Union[TaskStudy, EmptyDict] = {}, nomad_info: Any = None)[source]

Bases: BaseModel, Compat

class Config[source]

Bases: object

extra = 'forbid'
dispatch: Union[TaskDispatch, EmptyDict]
id: str
info: TaskInfo
nomad_info: Optional[Any]
process: Union[TaskProcessing, EmptyDict]
study: Union[TaskStudy, EmptyDict]
class common.types.TaskDispatch(*, target_name: str, retries: int = 0, next_retry_at: float = 0, series_uid: str = None)[source]

Bases: BaseModel, Compat

next_retry_at: Optional[float]
retries: Optional[int]
series_uid: Optional[str]
target_name: str
class common.types.TaskHasStudy(*, info: TaskInfo, id: str, dispatch: Union[TaskDispatch, EmptyDict] = {}, process: Union[Module, EmptyDict] = {}, study: TaskStudy)[source]

Bases: BaseModel, Compat

dispatch: Union[TaskDispatch, EmptyDict]
id: str
info: TaskInfo
process: Union[Module, EmptyDict]
study: TaskStudy
class common.types.TaskInfo(*, action: Literal['route', 'both', 'process', 'discard', 'notification'], uid: str, uid_type: Literal['series', 'study'], triggered_rules: Union[Dict[str, Literal[True]], str], applied_rule: str = None, mrn: str, acc: str, mercure_version: str, mercure_appliance: str, mercure_server: str)[source]

Bases: BaseModel, Compat

acc: str
action: Literal['route', 'both', 'process', 'discard', 'notification']
applied_rule: Optional[str]
mercure_appliance: str
mercure_server: str
mercure_version: str
mrn: str
triggered_rules: Union[Dict[str, Literal[True]], str]
uid: str
uid_type: Literal['series', 'study']
class common.types.TaskProcessing(*, module_name: str, module_config: Module = None, settings: Dict[str, Any] = {}, retain_input_images: Literal['False', 'True'])[source]

Bases: BaseModel, Compat

module_config: Optional[Module]
module_name: str
retain_input_images: Literal['False', 'True']
settings: Dict[str, Any]
class common.types.TaskStudy(*, study_uid: str, complete_trigger: str = None, complete_required_series: str, creation_time: str, last_receive_time: str, received_series: List[str] = None, received_series_uid: List[str] = None, complete_force: Literal['True', 'False'])[source]

Bases: BaseModel, Compat

complete_force: Literal['True', 'False']
complete_required_series: str
complete_trigger: Optional[str]
creation_time: str
last_receive_time: str
received_series: Optional[List[str]]
received_series_uid: Optional[List[str]]
study_uid: str
class common.types.UnsetRule(_typename, _fields=None, /, **kwargs)[source]

Bases: dict

rule: str