Source code for common.types

"""
types.py
========
Definitions for using TypedDicts throughout mercure.
"""

# Standard python includes
from typing import Any, Dict, List, Optional, Type, Union, cast
from typing_extensions import Literal, TypedDict
from pydantic import BaseModel
import typing

# TODO: Add description for the individual classes


[docs]class Compat:
[docs] def get(self, item, els=None) -> Any: return self.__dict__.get(item, els) or els
[docs]class EmptyDict(TypedDict): pass
[docs]class Target(BaseModel, Compat): contact: Optional[str] = "" comment: str = "" @classmethod def __get_validators__(cls): # one or more validators may be yielded which will be called in the # order to validate the input, each validator will receive as an input # the value returned from the previous validator yield cls.validate
[docs] @classmethod def validate(cls, v): """Parse the target as any of the known target types.""" subclass_dict: typing.Dict[str, Type[Target]] = {sbc.__name__: sbc for sbc in cls.__subclasses__()} for k in subclass_dict: try: return subclass_dict[k](**v) except: pass raise ValueError("Couldn't validate target as any of", list(subclass_dict.keys()))
[docs] @classmethod def get_name(cls) -> str: return cls.construct().target_type # type: ignore
[docs]class DicomTarget(Target): target_type: Literal["dicom"] = "dicom" ip: str port: str aet_target: str aet_source: Optional[str] = ""
[docs]class DicomTLSTarget(Target): target_type: Literal["dicomtls"] = "dicomtls" ip: str port: str aet_target: str aet_source: Optional[str] = "" tls_key: str tls_cert: str ca_cert: str
[docs]class SftpTarget(Target): target_type: Literal["sftp"] = "sftp" folder: str user: str host: str password: Optional[str]
[docs]class RsyncTarget(Target): target_type: Literal["rsync"] = "rsync" folder: str user: str host: str password: Optional[str] run_on_complete: bool = False
[docs]class XnatTarget(Target): target_type: Literal["xnat"] = "xnat" project_id: str host: str user: str password: str
[docs]class DicomWebTarget(Target): target_type: Literal["dicomweb"] = "dicomweb" url: str qido_url_prefix: Optional[str] wado_url_prefix: Optional[str] stow_url_prefix: Optional[str] access_token: Optional[str] http_user: Optional[str] http_password: Optional[str]
[docs]class S3Target(Target): target_type: Literal["s3"] = "s3" region: str bucket: str prefix: str access_key_id: str secret_access_key: str
[docs]class FolderTarget(Target): target_type: Literal["folder"] = "folder" folder: str
[docs]class DummyTarget(Target): target_type: Literal["dummy"] = "dummy"
[docs]class Module(BaseModel, Compat): docker_tag: Optional[str] = "" additional_volumes: Optional[str] = "" environment: Optional[str] = "" docker_arguments: Optional[str] = "" settings: Dict[str, Any] = {} contact: Optional[str] = "" comment: Optional[str] = "" constraints: Optional[str] = "" resources: Optional[str] = "" requires_root: Optional[bool] = False
[docs]class UnsetRule(TypedDict): rule: str
[docs]class Rule(BaseModel, Compat): rule: str = "False" target: str = "" disabled: bool = False fallback: bool = False contact: str = "" comment: str = "" tags: str = "" action: Literal["route", "both", "process", "discard", "notification"] = "route" action_trigger: Literal["series", "study"] = "series" study_trigger_condition: Literal["timeout", "received_series"] = "timeout" study_trigger_series: str = "" priority: Literal["normal", "urgent", "offpeak"] = "normal" processing_module: Union[str,List[str]] = "" processing_settings: Union[List[Dict[str, Any]],Dict[str, Any]] = {} processing_retain_images: bool = False notification_email: str = "" notification_webhook: str = "" notification_payload: str = "" notification_payload_body: str = "" notification_email_body: str = "" notification_email_type: str = "plain" notification_trigger_reception: bool = True notification_trigger_completion: bool = True notification_trigger_completion_on_request: bool = False notification_trigger_error: bool = True
[docs]class ProcessingLogsConfig(BaseModel): discard_logs: bool = False logs_file_store: Optional[str] = None
[docs]class DicomReceiverConfig(BaseModel): additional_tags: Dict[str,str] = {}
[docs]class Config(BaseModel, Compat): appliance_name: str port: int accept_compressed_images: bool incoming_folder: str studies_folder: str outgoing_folder: str success_folder: str error_folder: str discard_folder: str processing_folder: str router_scan_interval: int # in seconds dispatcher_scan_interval: int # in seconds cleaner_scan_interval: int # in seconds retention: int # in seconds (3 days) emergency_clean_percentage: int # in % of disk space retry_delay: int # in seconds (15 min) retry_max: int series_complete_trigger: int # in seconds study_complete_trigger: int # in seconds study_forcecomplete_trigger: int # in seconds dicom_receiver: DicomReceiverConfig = DicomReceiverConfig() graphite_ip: str graphite_port: int influxdb_host: str influxdb_token: str influxdb_org: str influxdb_bucket: str bookkeeper: str offpeak_start: str offpeak_end: str targets: Dict[str, Target] rules: Dict[str, Rule] modules: Dict[str, Module] process_runner: Literal["docker", "nomad", ""] = "" processing_runtime: Optional[str] = None bookkeeper_api_key: Optional[str] features: Dict[str, bool] processing_logs: ProcessingLogsConfig = ProcessingLogsConfig() email_notification_from: str = "mercure@mercure.mercure" support_root_modules: Optional[bool] = False webhook_certificate_location: Optional[str] = None
[docs]class TaskInfo(BaseModel, Compat): action: Literal["route", "both", "process", "discard", "notification"] uid: str uid_type: Literal["series", "study"] triggered_rules: Union[Dict[str, Literal[True]], str] applied_rule: Optional[str] patient_name: Optional[str] mrn: str acc: str sender_address: str = "MISSING" mercure_version: str mercure_appliance: str mercure_server: str device_serial_number: Optional[str] = None
[docs]class TaskDispatch(BaseModel, Compat): target_name: str retries: Optional[int] = 0 next_retry_at: Optional[float] = 0 series_uid: Optional[str]
[docs]class TaskStudy(BaseModel, Compat): study_uid: str complete_trigger: Optional[str] complete_required_series: str creation_time: str last_receive_time: str received_series: Optional[List[str]] received_series_uid: Optional[List[str]] complete_force: bool = False
[docs]class TaskProcessing(BaseModel, Compat): module_name: str module_config: Optional[Module] settings: Dict[str, Any] = {} retain_input_images: bool output: Optional[Dict]
# class PydanticFile(object): # def __init__(self, klass, file_name): # self.Klass = klass # self.file_name = file_name # def __enter__(self): # self.file = open(self.file_name, "r+") # self.dict_orig = json.load(self.file) # self.file.seek(0) # self.obj = self.Klass(**self.dict_orig) # return self.obj # def __exit__(self, type, value, traceback): # new_dict = self.obj.dict() # if new_dict != self.dict_orig: # json.dump(new_dict, self.file) # self.file.truncate() # self.file.close() # class ModelHasFile(object): # @classmethod # def file_editor(cls, file_name): # return PydanticFile(cls, file_name)
[docs]class Task(BaseModel, Compat): info: TaskInfo id: str dispatch: Union[TaskDispatch, EmptyDict] = cast(EmptyDict, {}) process: Union[TaskProcessing, EmptyDict,List[TaskProcessing]] = cast(EmptyDict, {}) study: Union[TaskStudy, EmptyDict] = cast(EmptyDict, {}) nomad_info: Optional[Any]
[docs] class Config: extra = "forbid"
[docs]class TaskHasStudy(Task): study: TaskStudy