Package common


mercure’s configuration management, used by various mercure modules

common.config.check_folders() bool[source]

Checks if all required folders for handling the DICOM files exist.

common.config.get_loglevel() int[source]
common.config.get_runner() str[source]
common.config.mercure: common.types.Config
common.config.read_config() common.types.Config[source]

Reads the configuration settings (rules, targets, general settings) from the configuration file. The configuration will only be updated if the file has changed compared the the last function call. If the configuration file is locked by another process, an exception will be raised.

common.config.save_config() None[source]

Saves the current configuration in a file on the disk. Raises an exception if the file has been locked by another process.

common.config.write_configfile(json_content) None[source]

Rewrites the config file using the JSON data passed as argument. Used by the config editor of the webgui.


mercure-wide constants, used for standardizing key names and extensions.

class common.constants.mercure_actions[source]

Bases: object

BOTH = 'both'
DISCARD = 'discard'
NOTIFICATION = 'notification'
PROCESS = 'process'
ROUTE = 'route'
class common.constants.mercure_config[source]

Bases: object

MODULES = 'modules'
RULES = 'rules'
TARGETS = 'targets'
class common.constants.mercure_defs[source]

Bases: object

VERSION = '0.2a'
class common.constants.mercure_events[source]

Bases: object

class common.constants.mercure_folders[source]

Bases: object

DISCARD = 'discard_folder'
ERROR = 'error_folder'
INCOMING = 'incoming_folder'
OUTGOING = 'outgoing_folder'
PROCESSING = 'processing_folder'
STUDIES = 'studies_folder'
SUCCESS = 'success_folder'
class common.constants.mercure_info[source]

Bases: object

ACC = 'acc'
ACTION = 'action'
MERCURE_APPLIANCE = 'mercure_appliance'
MERCURE_SERVER = 'mercure_server'
MERCURE_VERSION = 'mercure_version'
MRN = 'mrn'
TRIGGERED_RULES = 'triggered_rules'
UID = 'uid'
UID_TYPE = 'uid_type'
class common.constants.mercure_names[source]

Bases: object

DCM = '.dcm'
DCMFILTER = '*.dcm'
ERROR = '.error'
LOCK = '.lock'
PROCESSING = '.processing'
RUNNING = '.running'
SENDLOG = 'sent.txt'
TAGS = '.tags'
TASKFILE = 'task.json'
class common.constants.mercure_options[source]

Bases: object

FALSE = 'False'
NORMAL = 'normal'
OFFPEAK = 'offpeak'
SERIES = 'series'
STUDY = 'study'
TRUE = 'True'
URGENT = 'urgent'
class common.constants.mercure_rule[source]

Bases: object

ACTION = 'action'
ACTION_TRIGGER = 'action_trigger'
DISABLED = 'disabled'
FALLBACK = 'fallback'
NOTIFICATION_PAYLOAD = 'notification_payload'
NOTIFICATION_TRIGGER_COMPLETION = 'notification_trigger_completion'
NOTIFICATION_TRIGGER_ERROR = 'notification_trigger_error'
NOTIFICATION_TRIGGER_RECEPTION = 'notification_trigger_reception'
NOTIFICATION_WEBHOOK = 'notification_webhook'
PRIORITY = 'priority'
PROCESSING_MODULE = 'processing_module'
RULE = 'rule'
STUDY_TRIGGER = 'study_trigger'
STUDY_TRIGGER_CONDITION = 'study_trigger_condition'
TARGET = 'target'
class common.constants.mercure_sections[source]

Bases: object

DISPATCH = 'dispatch'
FILES = 'files'
INFO = 'info'
JOURNAL = 'journal'
NOTIFICATION = 'notification'
PROCESS = 'process'
STUDY = 'study'
class common.constants.mercure_study[source]

Bases: object

COMPLETE_FORCE = 'complete_force'
COMPLETE_REQUIRED_SERIES = 'complete_required_series'
COMPLETE_TRIGGER = 'complete_trigger'
CREATION_TIME = 'creation_time'
LAST_RECEIVE_TIME = 'last_receive_time'
RECEIVED_SERIES = 'received_series'
STUDY_UID = 'study_uid'


Various internal helper functions for mercure.

class common.helper.FileLock(path_for_lockfile: pathlib.Path)[source]

Bases: object

Helper class that implements a file lock. The lock file will be removed also from the destructor so that no spurious lock files remain if exceptions are raised.

free() None[source]
class common.helper.RepeatedTimer(interval: float, function: Callable, exit_function: Callable, *args, **kwargs)[source]

Bases: object

Helper class for running a continuous timer that is suspended while the worker function is running

start() None[source]

Starts the timer for triggering the calllback after the defined interval.

stop() None[source]

Stops the timer and executes the defined exit callback function.

common.helper.g_log(*args, **kwargs) None[source]

Sends diagnostic information to graphite (if configured).

common.helper.is_terminated() bool[source]

Checks if the process will terminate after the current task.

async common.helper.send_to_graphite(*args, **kwargs) None[source]

Wrapper for asynchronous graphite call to avoid wait time of main loop.

common.helper.trigger_terminate() None[source]

Trigger that the processing loop should terminate after finishing the currently active task.


Helper functions and definitions for monitoring mercure’s operations via the bookkeeper module.

common.monitor.configure(module, instance, address) None[source]

Configures the connection to the bookkeeper module. If not called, events will not be transmitted to the bookkeeper.

class common.monitor.m_events[source]

Bases: object

Event types for general mercure monitoring.

class common.monitor.s_events[source]

Bases: object

Event types for monitoring everything related to one specific series.

common.monitor.send_event(event, severity=0, description: str = '') None[source]

Sends information about general mercure events to the bookkeeper (e.g., during module start).

common.monitor.send_register_series(tags: Dict[str, str]) None[source]

Registers a received series on the bookkeeper. This should be called when a series has been fully received and the DICOM tags have been parsed.

common.monitor.send_series_event(event, series_uid, file_count, target, info) None[source]

Send an event related to a specific series to the bookkeeper.

common.monitor.send_webgui_event(event, user, description='') None[source]

Sends information about an event on the webgui to the bookkeeper.

class common.monitor.severity[source]

Bases: object

Severity level associated to the mercure events.

INFO = 0
class common.monitor.w_events[source]

Bases: object

Event types for monitoring the webgui activity.



Helper functions for triggering webhook calls.

common.notification.send_webhook(url, payload, event) None[source]


Helper functions for evaluating routing rules and study-completion conditions.

common.rule_evaluation.parse_completion_series(completion_str: str, received_series: list) bool[source]

Evaluates the configuration string defining which series are required using the list of received series as input. Returns true if all required series have arrived, otherwise false is returned.

common.rule_evaluation.parse_rule(rule: str, tags: Dict[str, str]) Union[Any, bool][source]

Parses the given rule, replaces all tag variables with values from the given tags dictionary, and evaluates the rule. If the rule is invalid, an exception will be raised.

common.rule_evaluation.replace_tags(rule: str, tags: Dict[str, str]) Any[source]

Replaces all tags with format @tagname@ in the given rule string with the corresponding values from the currently processed series (stored in the second argument).

common.rule_evaluation.test_completion_series(value: str) str[source]

Tests if the given string with the list of series required for study completion has valid format. If so, True is returned as string, otherwise the error description is returned.

common.rule_evaluation.test_rule(rule: str, tags: Dict[str, str]) str[source]

Tests the given rule for validity using the given tags dictionary. Similar to parse_rule but with more diagnostic output format for the testing dialog. Also warns about invalid tags.


Definitions for using TypedDicts throughout mercure.

class common.types.Compat[source]

Bases: object

get(item, els=None) Any[source]
class common.types.Config(*, appliance_name: str, port: int, incoming_folder: str, studies_folder: str, outgoing_folder: str, success_folder: str, error_folder: str, discard_folder: str, processing_folder: str, router_scan_interval: int, dispatcher_scan_interval: int, cleaner_scan_interval: int, retention: int, retry_delay: int, retry_max: int, series_complete_trigger: int, study_complete_trigger: int, study_forcecomplete_trigger: int, graphite_ip: str, graphite_port: int, bookkeeper: str, offpeak_start: str, offpeak_end: str, targets: Dict[str, Union[common.types.DicomTarget, common.types.SftpTarget]], rules: Dict[str, common.types.Rule], modules: Dict[str, common.types.Module], process_runner: typing_extensions.Literal['docker', 'nomad', ''] = '')[source]

Bases: pydantic.main.BaseModel, common.types.Compat

appliance_name: str
bookkeeper: str
cleaner_scan_interval: int
discard_folder: str
dispatcher_scan_interval: int
error_folder: str
graphite_ip: str
graphite_port: int
incoming_folder: str
modules: Dict[str, common.types.Module]
offpeak_end: str
offpeak_start: str
outgoing_folder: str
port: int
process_runner: typing_extensions.Literal['docker', 'nomad', '']
processing_folder: str
retention: int
retry_delay: int
retry_max: int
router_scan_interval: int
rules: Dict[str, common.types.Rule]
series_complete_trigger: int
studies_folder: str
study_complete_trigger: int
study_forcecomplete_trigger: int
success_folder: str
targets: Dict[str, Union[common.types.DicomTarget, common.types.SftpTarget]]
class common.types.DicomTarget(*, contact: str = '', comment: str = '', target_type: typing_extensions.Literal['dicom'] = 'dicom', ip: str, port: str, aet_target: str, aet_source: str = '')[source]

Bases: common.types.Target

aet_source: Optional[str]
aet_target: str
ip: str
port: str
target_type: typing_extensions.Literal['dicom']
class common.types.EmptyDict(**kwargs)[source]

Bases: dict

class common.types.Module(*, docker_tag: str = '', additional_volumes: str = '', environment: str = '', docker_arguments: str = '', settings: Dict[str, Any] = {}, contact: str = '', comment: str = '', constraints: str = '', resources: str = '')[source]

Bases: pydantic.main.BaseModel, common.types.Compat

additional_volumes: Optional[str]
comment: Optional[str]
constraints: Optional[str]
contact: Optional[str]
docker_arguments: Optional[str]
docker_tag: Optional[str]
environment: Optional[str]
resources: Optional[str]
settings: Dict[str, Any]
class common.types.Rule(*, rule: str = 'False', target: str = '', disabled: typing_extensions.Literal['True', 'False'] = 'False', fallback: typing_extensions.Literal['True', 'False'] = 'False', contact: str = '', comment: str = '', tags: str = '', action: typing_extensions.Literal['route', 'both', 'process', 'discard', 'notification'] = 'route', action_trigger: typing_extensions.Literal['series', 'study'] = 'series', study_trigger_condition: typing_extensions.Literal['timeout', 'received_series'] = 'timeout', study_trigger_series: str = '', priority: typing_extensions.Literal['normal', 'urgent', 'offpeak'] = 'normal', processing_module: str = '', processing_settings: Dict[str, Any] = {}, notification_webhook: str = '', notification_payload: str = '', notification_trigger_reception: typing_extensions.Literal['True', 'False'] = 'False', notification_trigger_completion: typing_extensions.Literal['True', 'False'] = 'False', notification_trigger_error: typing_extensions.Literal['True', 'False'] = 'False')[source]

Bases: pydantic.main.BaseModel, common.types.Compat

action: typing_extensions.Literal['route', 'both', 'process', 'discard', 'notification']
action_trigger: typing_extensions.Literal['series', 'study']
comment: str
contact: str
disabled: typing_extensions.Literal['True', 'False']
fallback: typing_extensions.Literal['True', 'False']
notification_payload: str
notification_trigger_completion: typing_extensions.Literal['True', 'False']
notification_trigger_error: typing_extensions.Literal['True', 'False']
notification_trigger_reception: typing_extensions.Literal['True', 'False']
notification_webhook: str
priority: typing_extensions.Literal['normal', 'urgent', 'offpeak']
processing_module: str
processing_settings: Dict[str, Any]
rule: str
study_trigger_condition: typing_extensions.Literal['timeout', 'received_series']
study_trigger_series: str
tags: str
target: str
class common.types.SftpTarget(*, contact: str = '', comment: str = '', target_type: typing_extensions.Literal['sftp'] = 'sftp', folder: str, user: str, host: str, password: str = None)[source]

Bases: common.types.Target

folder: str
host: str
password: Optional[str]
target_type: typing_extensions.Literal['sftp']
user: str
class common.types.Target(*, contact: str = '', comment: str = '')[source]

Bases: pydantic.main.BaseModel, common.types.Compat

comment: str
contact: Optional[str]
class common.types.Task(*, info: common.types.TaskInfo, dispatch: Union[common.types.TaskDispatch, common.types.EmptyDict] = {}, process: Union[common.types.TaskProcessing, common.types.EmptyDict] = {}, study: Union[common.types.TaskStudy, common.types.EmptyDict] = {}, nomad_info: Any = None)[source]

Bases: pydantic.main.BaseModel, common.types.Compat

class Config[source]

Bases: object

extra = 'forbid'
dispatch: Union[common.types.TaskDispatch, common.types.EmptyDict]
info: common.types.TaskInfo
nomad_info: Optional[Any]
process: Union[common.types.TaskProcessing, common.types.EmptyDict]
study: Union[common.types.TaskStudy, common.types.EmptyDict]
class common.types.TaskDispatch(*, target_name: str = None, target: Union[common.types.DicomTarget, common.types.SftpTarget], retries: int = None, next_retry_at: float = None, series_uid: str = None)[source]

Bases: pydantic.main.BaseModel, common.types.Compat

next_retry_at: Optional[float]
retries: Optional[int]
series_uid: Optional[str]
target: Union[common.types.DicomTarget, common.types.SftpTarget]
target_name: Optional[str]
class common.types.TaskHasStudy(*, info: common.types.TaskInfo, dispatch: Union[common.types.TaskDispatch, common.types.EmptyDict] = {}, process: Union[common.types.Module, common.types.EmptyDict] = {}, study: common.types.TaskStudy)[source]

Bases: pydantic.main.BaseModel, common.types.Compat

dispatch: Union[common.types.TaskDispatch, common.types.EmptyDict]
info: common.types.TaskInfo
process: Union[common.types.Module, common.types.EmptyDict]
study: common.types.TaskStudy
class common.types.TaskInfo(*, action: typing_extensions.Literal['route', 'both', 'process', 'discard', 'notification'], uid: str, uid_type: typing_extensions.Literal['series', 'study'], triggered_rules: Union[Dict[str, typing_extensions.Literal[True]], str], applied_rule: str = None, mrn: str, acc: str, mercure_version: str, mercure_appliance: str, mercure_server: str)[source]

Bases: pydantic.main.BaseModel, common.types.Compat

acc: str
action: typing_extensions.Literal['route', 'both', 'process', 'discard', 'notification']
applied_rule: Optional[str]
mercure_appliance: str
mercure_server: str
mercure_version: str
mrn: str
triggered_rules: Union[Dict[str, typing_extensions.Literal[True]], str]
uid: str
uid_type: typing_extensions.Literal['series', 'study']
class common.types.TaskProcessing(*, module_name: str = None, module_config: common.types.Module = None, settings: Dict[str, Any] = {})[source]

Bases: pydantic.main.BaseModel, common.types.Compat

module_config: Optional[common.types.Module]
module_name: Optional[str]
settings: Dict[str, Any]
class common.types.TaskStudy(*, study_uid: str, complete_trigger: str = None, complete_required_series: str, creation_time: str, last_receive_time: str, received_series: List = None, complete_force: typing_extensions.Literal['True', 'False'])[source]

Bases: pydantic.main.BaseModel, common.types.Compat

complete_force: typing_extensions.Literal['True', 'False']
complete_required_series: str
complete_trigger: Optional[str]
creation_time: str
last_receive_time: str
received_series: Optional[List]
study_uid: str
class common.types.UnsetRule(**kwargs)[source]

Bases: dict

rule: str