Package common
common.config
config.py
mercure’s configuration management, used by various mercure modules
- common.config.check_folders() bool [source]
Checks if all required folders for handling the DICOM files exist.
- common.config.read_config() Config [source]
Reads the configuration settings (rules, targets, general settings) from the configuration file. The configuration will only be updated if the file has changed compared the the last function call. If the configuration file is locked by another process, an exception will be raised.
- common.config.read_tagslist() None [source]
Reads the list of supported DICOM tags with example values, displayed the UI.
common.constants
constants.py
mercure-wide constants, used for standardizing key names and extensions.
- class common.constants.mercure_actions[source]
Bases:
object
- BOTH = 'both'
- DISCARD = 'discard'
- NOTIFICATION = 'notification'
- PROCESS = 'process'
- ROUTE = 'route'
- class common.constants.mercure_config[source]
Bases:
object
- MODULES = 'modules'
- RULES = 'rules'
- TARGETS = 'targets'
- class common.constants.mercure_defs[source]
Bases:
object
- SEPARATOR = '#'
- VERSION = '0.3.1-beta.1'
- class common.constants.mercure_events(value)[source]
Bases:
IntEnum
An enumeration.
- COMPLETED = 1
- ERROR = 2
- RECEIVED = 0
- class common.constants.mercure_folders[source]
Bases:
object
- DISCARD = 'discard_folder'
- ERROR = 'error_folder'
- INCOMING = 'incoming_folder'
- OUTGOING = 'outgoing_folder'
- PROCESSING = 'processing_folder'
- STUDIES = 'studies_folder'
- SUCCESS = 'success_folder'
- class common.constants.mercure_info[source]
Bases:
object
- ACC = 'acc'
- ACTION = 'action'
- MERCURE_APPLIANCE = 'mercure_appliance'
- MERCURE_SERVER = 'mercure_server'
- MERCURE_VERSION = 'mercure_version'
- MRN = 'mrn'
- TRIGGERED_RULES = 'triggered_rules'
- UID = 'uid'
- UID_TYPE = 'uid_type'
- class common.constants.mercure_names[source]
Bases:
object
- DCM = '.dcm'
- DCMFILTER = '*.dcm'
- ERROR = '.error'
- FORCE_COMPLETE = '.force-complete'
- HALT = 'HALT'
- LOCK = '.lock'
- PROCESSING = '.processing'
- RUNNING = '.running'
- SENDLOG = 'sent.txt'
- TAGS = '.tags'
- TASKFILE = 'task.json'
- class common.constants.mercure_options[source]
Bases:
object
- FALSE = 'False'
- INVALID = '#@INVALID@#'
- MISSING = 'MISSING'
- NORMAL = 'normal'
- OFFPEAK = 'offpeak'
- SERIES = 'series'
- STUDY = 'study'
- TRUE = 'True'
- URGENT = 'urgent'
- class common.constants.mercure_rule[source]
Bases:
object
- ACTION = 'action'
- ACTION_TRIGGER = 'action_trigger'
- DISABLED = 'disabled'
- FALLBACK = 'fallback'
- NOTIFICATION_PAYLOAD = 'notification_payload'
- NOTIFICATION_TRIGGER_COMPLETION = 'notification_trigger_completion'
- NOTIFICATION_TRIGGER_ERROR = 'notification_trigger_error'
- NOTIFICATION_TRIGGER_RECEPTION = 'notification_trigger_reception'
- NOTIFICATION_WEBHOOK = 'notification_webhook'
- PRIORITY = 'priority'
- PROCESSING_MODULE = 'processing_module'
- RULE = 'rule'
- STUDY_TRIGGER = 'study_trigger'
- STUDY_TRIGGER_CONDITION = 'study_trigger_condition'
- STUDY_TRIGGER_CONDITION_RECEIVED_SERIES = 'received_series'
- STUDY_TRIGGER_CONDITION_TIMEOUT = 'timeout'
- TARGET = 'target'
- class common.constants.mercure_sections[source]
Bases:
object
- DISPATCH = 'dispatch'
- FILES = 'files'
- INFO = 'info'
- JOURNAL = 'journal'
- NOTIFICATION = 'notification'
- PROCESS = 'process'
- STUDY = 'study'
- class common.constants.mercure_study[source]
Bases:
object
- COMPLETE_FORCE = 'complete_force'
- COMPLETE_REQUIRED_SERIES = 'complete_required_series'
- COMPLETE_TRIGGER = 'complete_trigger'
- CREATION_TIME = 'creation_time'
- LAST_RECEIVE_TIME = 'last_receive_time'
- RECEIVED_SERIES = 'received_series'
- STUDY_UID = 'study_uid'
common.helper
helper.py
Various internal helper functions for mercure.
- class common.helper.FileLock(path_for_lockfile: Path)[source]
Bases:
object
Helper class that implements a file lock. The lock file will be removed also from the destructor so that no spurious lock files remain if exceptions are raised.
- lockCreated = False
- class common.helper.RepeatedTimer(interval: float, function: Callable, exit_function: Callable, *args, **kwargs)[source]
Bases:
object
Helper class for running a continuous timer that is suspended while the worker function is running
- common.helper.get_runner() str [source]
Returns the name of the mechanism that is used for running mercure in the current installation (systemd, docker, nomad).
- common.helper.is_terminated() bool [source]
Checks if the process will terminate after the current task.
- common.helper.send_to_graphite(*args, **kwargs) None [source]
Wrapper for asynchronous graphite call to avoid wait time of main loop.
common.monitor
monitor.py
Helper functions and definitions for monitoring mercure’s operations via the bookkeeper module.
- exception common.monitor.MonitorHTTPError(status_code: int, message: str)[source]
Bases:
Exception
Exception raised when a HTTP error occurs.
- async common.monitor.async_send_task_event(event: task_event, task_id: str, file_count: int, target: str, info: str)[source]
- common.monitor.configure(module, instance, address) None [source]
Configures the connection to the bookkeeper module. If not called, events will not be transmitted to the bookkeeper.
- common.monitor.send_event(event: m_events, severity: severity = severity.INFO, description: str = '') None [source]
Sends information about general mercure events to the bookkeeper (e.g., during module start).
- common.monitor.send_processor_output(task: Task, task_processing: TaskProcessing, index: int, output: dict) None [source]
- common.monitor.send_register_series(tags: Dict[str, str]) None [source]
Registers a received series on the bookkeeper. This should be called when a series has been fully received and the DICOM tags have been parsed.
- common.monitor.send_register_task(task_id: str, series_uid: str, parent_id: Optional[str] = None) None [source]
Registers a new task on the bookkeeper. This should be called whenever a new task has been created.
- common.monitor.send_task_event(event: task_event, task_id: str, file_count: int, target: str, info: str) None [source]
Send an event related to a specific series to the bookkeeper.
- common.monitor.send_update_task(task: Task) None [source]
Registers a new task on the bookkeeper. This should be called whenever a new task has been created.
- common.monitor.send_webgui_event(event: w_events, user: str, description='') None [source]
Sends information about an event on the webgui to the bookkeeper.
common.notification
notification.py
Helper functions for triggering webhook calls.
- common.notification.parse_payload(payload: str, event: mercure_events, rule_name: str, task_id: str, details: str = '', context: dict = {}, *, task: Optional[Task] = None, tags_list: Optional[Dict[str, str]] = None) str [source]
- common.notification.send_email(address: str, payload: str, event: mercure_events, rule_name: str, rule_type: str) None [source]
- common.notification.send_email_helper(to: str, subject: str, content: str, rule_type='plain') None [source]
- common.notification.setup() bool [source]
Load the SSL certificate if it is configured (after the configuration has been read).
- common.notification.trigger_notification_for_rule(rule_name: str, task_id: str, event: mercure_events, *, task: Task, details: Optional[str] = '', send_always: bool = False)[source]
- common.notification.trigger_notification_for_rule(rule_name: str, task_id: str, event: mercure_events, *, tags_list: Dict[str, str], details: Optional[str] = '', send_always: bool = False)
common.rule_evaluation
rule_evaluation.py
Helper functions for evaluating routing rules and study-completion conditions.
- common.rule_evaluation.eval_rule(rule: str, tags: Dict[str, str]) Any [source]
Parses the given rule, replaces all tag variables with values from the given tags dictionary, and evaluates the rule. If the rule is invalid, an exception will be raised.
- common.rule_evaluation.parse_completion_series(task_id: str, completion_str: str, received_series: list) bool [source]
Evaluates the configuration string defining which series are required using the list of received series as input. Returns true if all required series have arrived, otherwise false is returned.
- common.rule_evaluation.parse_rule(rule: str, tags: Dict[str, str]) Tuple[bool, Optional[str], Optional[str]] [source]
common.types
types.py
Definitions for using TypedDicts throughout mercure.
- class common.types.Config(*, appliance_name: str, port: int, accept_compressed_images: bool, incoming_folder: str, studies_folder: str, outgoing_folder: str, success_folder: str, error_folder: str, discard_folder: str, processing_folder: str, router_scan_interval: int, dispatcher_scan_interval: int, cleaner_scan_interval: int, retention: int, emergency_clean_percentage: int, retry_delay: int, retry_max: int, series_complete_trigger: int, study_complete_trigger: int, study_forcecomplete_trigger: int, dicom_receiver: DicomReceiverConfig = DicomReceiverConfig(additional_tags={}), graphite_ip: str, graphite_port: int, influxdb_host: str, influxdb_token: str, influxdb_org: str, influxdb_bucket: str, bookkeeper: str, offpeak_start: str, offpeak_end: str, targets: Dict[str, Target], rules: Dict[str, Rule], modules: Dict[str, Module], process_runner: typing_extensions.Literal[docker, nomad] = '', processing_runtime: Optional[str] = None, bookkeeper_api_key: Optional[str] = None, features: Dict[str, bool], processing_logs: ProcessingLogsConfig = ProcessingLogsConfig(discard_logs=False, logs_file_store=None), email_notification_from: str = 'mercure@mercure.mercure', support_root_modules: Optional[bool] = False, webhook_certificate_location: Optional[str] = None)[source]
Bases:
BaseModel
,Compat
- accept_compressed_images: bool
- appliance_name: str
- bookkeeper: str
- bookkeeper_api_key: Optional[str]
- cleaner_scan_interval: int
- dicom_receiver: DicomReceiverConfig
- discard_folder: str
- dispatcher_scan_interval: int
- email_notification_from: str
- emergency_clean_percentage: int
- error_folder: str
- features: Dict[str, bool]
- graphite_ip: str
- graphite_port: int
- incoming_folder: str
- influxdb_bucket: str
- influxdb_host: str
- influxdb_org: str
- influxdb_token: str
- offpeak_end: str
- offpeak_start: str
- outgoing_folder: str
- port: int
- process_runner: typing_extensions.Literal[docker, nomad]
- processing_folder: str
- processing_logs: ProcessingLogsConfig
- processing_runtime: Optional[str]
- retention: int
- retry_delay: int
- retry_max: int
- router_scan_interval: int
- series_complete_trigger: int
- studies_folder: str
- study_complete_trigger: int
- study_forcecomplete_trigger: int
- success_folder: str
- support_root_modules: Optional[bool]
- webhook_certificate_location: Optional[str]
- class common.types.DicomReceiverConfig(*, additional_tags: Dict[str, str] = {})[source]
Bases:
BaseModel
- additional_tags: Dict[str, str]
- class common.types.DicomTLSTarget(*, contact: Optional[str] = '', comment: str = '', target_type: typing_extensions.Literal[dicomtls] = 'dicomtls', ip: str, port: str, aet_target: str, aet_source: Optional[str] = '', tls_key: str, tls_cert: str, ca_cert: str)[source]
Bases:
Target
- aet_source: Optional[str]
- aet_target: str
- ca_cert: str
- ip: str
- port: str
- target_type: typing_extensions.Literal[dicomtls]
- tls_cert: str
- tls_key: str
- class common.types.DicomTarget(*, contact: Optional[str] = '', comment: str = '', target_type: typing_extensions.Literal[dicom] = 'dicom', ip: str, port: str, aet_target: str, aet_source: Optional[str] = '')[source]
Bases:
Target
- aet_source: Optional[str]
- aet_target: str
- ip: str
- port: str
- target_type: typing_extensions.Literal[dicom]
- class common.types.DicomWebTarget(*, contact: Optional[str] = '', comment: str = '', target_type: typing_extensions.Literal[dicomweb] = 'dicomweb', url: str, qido_url_prefix: Optional[str] = None, wado_url_prefix: Optional[str] = None, stow_url_prefix: Optional[str] = None, access_token: Optional[str] = None, http_user: Optional[str] = None, http_password: Optional[str] = None)[source]
Bases:
Target
- access_token: Optional[str]
- http_password: Optional[str]
- http_user: Optional[str]
- qido_url_prefix: Optional[str]
- stow_url_prefix: Optional[str]
- target_type: typing_extensions.Literal[dicomweb]
- url: str
- wado_url_prefix: Optional[str]
- class common.types.DummyTarget(*, contact: Optional[str] = '', comment: str = '', target_type: typing_extensions.Literal[dummy] = 'dummy')[source]
Bases:
Target
- target_type: typing_extensions.Literal[dummy]
- class common.types.FolderTarget(*, contact: Optional[str] = '', comment: str = '', target_type: typing_extensions.Literal[folder] = 'folder', folder: str)[source]
Bases:
Target
- folder: str
- target_type: typing_extensions.Literal[folder]
- class common.types.Module(*, docker_tag: Optional[str] = '', additional_volumes: Optional[str] = '', environment: Optional[str] = '', docker_arguments: Optional[str] = '', settings: Dict[str, Any] = {}, contact: Optional[str] = '', comment: Optional[str] = '', constraints: Optional[str] = '', resources: Optional[str] = '', requires_root: Optional[bool] = False)[source]
Bases:
BaseModel
,Compat
- additional_volumes: Optional[str]
- comment: Optional[str]
- constraints: Optional[str]
- contact: Optional[str]
- docker_arguments: Optional[str]
- docker_tag: Optional[str]
- environment: Optional[str]
- requires_root: Optional[bool]
- resources: Optional[str]
- settings: Dict[str, Any]
- class common.types.ProcessingLogsConfig(*, discard_logs: bool = False, logs_file_store: Optional[str] = None)[source]
Bases:
BaseModel
- discard_logs: bool
- logs_file_store: Optional[str]
- class common.types.RsyncTarget(*, contact: Optional[str] = '', comment: str = '', target_type: typing_extensions.Literal[rsync] = 'rsync', folder: str, user: str, host: str, password: Optional[str] = None, run_on_complete: bool = False)[source]
Bases:
Target
- folder: str
- host: str
- password: Optional[str]
- run_on_complete: bool
- target_type: typing_extensions.Literal[rsync]
- user: str
- class common.types.Rule(*, rule: str = 'False', target: str = '', disabled: bool = False, fallback: bool = False, contact: str = '', comment: str = '', tags: str = '', action: typing_extensions.Literal[route, both, process, discard, notification] = 'route', action_trigger: typing_extensions.Literal[series, study] = 'series', study_trigger_condition: typing_extensions.Literal[timeout, received_series] = 'timeout', study_trigger_series: str = '', priority: typing_extensions.Literal[normal, urgent, offpeak] = 'normal', processing_module: Union[str, List[str]] = '', processing_settings: Union[List[Dict[str, Any]], Dict[str, Any]] = {}, processing_retain_images: bool = False, notification_email: str = '', notification_webhook: str = '', notification_payload: str = '', notification_payload_body: str = '', notification_email_body: str = '', notification_email_type: str = 'plain', notification_trigger_reception: bool = True, notification_trigger_completion: bool = True, notification_trigger_completion_on_request: bool = False, notification_trigger_error: bool = True)[source]
Bases:
BaseModel
,Compat
- action: typing_extensions.Literal[route, both, process, discard, notification]
- action_trigger: typing_extensions.Literal[series, study]
- comment: str
- contact: str
- disabled: bool
- fallback: bool
- notification_email: str
- notification_email_body: str
- notification_email_type: str
- notification_payload: str
- notification_payload_body: str
- notification_trigger_completion: bool
- notification_trigger_completion_on_request: bool
- notification_trigger_error: bool
- notification_trigger_reception: bool
- notification_webhook: str
- priority: typing_extensions.Literal[normal, urgent, offpeak]
- processing_module: Union[str, List[str]]
- processing_retain_images: bool
- processing_settings: Union[List[Dict[str, Any]], Dict[str, Any]]
- rule: str
- study_trigger_condition: typing_extensions.Literal[timeout, received_series]
- study_trigger_series: str
- tags: str
- target: str
- class common.types.S3Target(*, contact: Optional[str] = '', comment: str = '', target_type: typing_extensions.Literal[s3] = 's3', region: str, bucket: str, prefix: str, access_key_id: str, secret_access_key: str)[source]
Bases:
Target
- access_key_id: str
- bucket: str
- prefix: str
- region: str
- secret_access_key: str
- target_type: typing_extensions.Literal[s3]
- class common.types.SftpTarget(*, contact: Optional[str] = '', comment: str = '', target_type: typing_extensions.Literal[sftp] = 'sftp', folder: str, user: str, host: str, password: Optional[str] = None)[source]
Bases:
Target
- folder: str
- host: str
- password: Optional[str]
- target_type: typing_extensions.Literal[sftp]
- user: str
- class common.types.Target(*, contact: Optional[str] = '', comment: str = '')[source]
Bases:
BaseModel
,Compat
- comment: str
- contact: Optional[str]
- class common.types.Task(*, info: TaskInfo, id: str, dispatch: Union[TaskDispatch, EmptyDict] = {}, process: Union[TaskProcessing, EmptyDict, List[TaskProcessing]] = {}, study: Union[TaskStudy, EmptyDict] = {}, nomad_info: Optional[Any] = None)[source]
Bases:
BaseModel
,Compat
- dispatch: Union[TaskDispatch, EmptyDict]
- id: str
- nomad_info: Optional[Any]
- process: Union[TaskProcessing, EmptyDict, List[TaskProcessing]]
- class common.types.TaskDispatch(*, target_name: str, retries: Optional[int] = 0, next_retry_at: Optional[float] = 0, series_uid: Optional[str] = None)[source]
Bases:
BaseModel
,Compat
- next_retry_at: Optional[float]
- retries: Optional[int]
- series_uid: Optional[str]
- target_name: str
- class common.types.TaskHasStudy(*, info: TaskInfo, id: str, dispatch: Union[TaskDispatch, EmptyDict] = {}, process: Union[TaskProcessing, EmptyDict, List[TaskProcessing]] = {}, study: TaskStudy, nomad_info: Optional[Any] = None)[source]
Bases:
Task
- class common.types.TaskInfo(*, action: typing_extensions.Literal[route, both, process, discard, notification], uid: str, uid_type: typing_extensions.Literal[series, study], triggered_rules: Union[Dict[str, typing_extensions.Literal[True]], str], applied_rule: Optional[str] = None, patient_name: Optional[str] = None, mrn: str, acc: str, sender_address: str = 'MISSING', mercure_version: str, mercure_appliance: str, mercure_server: str, device_serial_number: Optional[str] = None)[source]
Bases:
BaseModel
,Compat
- acc: str
- action: typing_extensions.Literal[route, both, process, discard, notification]
- applied_rule: Optional[str]
- device_serial_number: Optional[str]
- mercure_appliance: str
- mercure_server: str
- mercure_version: str
- mrn: str
- patient_name: Optional[str]
- sender_address: str
- triggered_rules: Union[Dict[str, typing_extensions.Literal[True]], str]
- uid: str
- uid_type: typing_extensions.Literal[series, study]
- class common.types.TaskProcessing(*, module_name: str, module_config: Optional[Module] = None, settings: Dict[str, Any] = {}, retain_input_images: bool, output: Optional[Dict] = None)[source]
Bases:
BaseModel
,Compat
- module_name: str
- output: Optional[Dict]
- retain_input_images: bool
- settings: Dict[str, Any]
- class common.types.TaskStudy(*, study_uid: str, complete_trigger: Optional[str] = None, complete_required_series: str, creation_time: str, last_receive_time: str, received_series: Optional[List[str]] = None, received_series_uid: Optional[List[str]] = None, complete_force: bool = False)[source]
Bases:
BaseModel
,Compat
- complete_force: bool
- complete_required_series: str
- complete_trigger: Optional[str]
- creation_time: str
- last_receive_time: str
- received_series: Optional[List[str]]
- received_series_uid: Optional[List[str]]
- study_uid: str
- class common.types.UnsetRule(_typename, _fields=None, /, **kwargs)[source]
Bases:
TypedDict
- rule: str
- class common.types.XnatTarget(*, contact: Optional[str] = '', comment: str = '', target_type: typing_extensions.Literal[xnat] = 'xnat', project_id: str, host: str, user: str, password: str)[source]
Bases:
Target
- host: str
- password: str
- project_id: str
- target_type: typing_extensions.Literal[xnat]
- user: str