Service dispatcher

dispatcher.py

The dispatcher service of mercure that executes the DICOM transfer to the different targets.

dispatch.dispatcher.dispatch() None[source]
dispatch.dispatcher.exit_dispatcher(args) None[source]

Stop the asyncio event loop.

dispatch.dispatcher.main(args=['-b', 'html', '-d', '_build/doctrees', '.', '_build/html']) None[source]
async dispatch.dispatcher.terminate_process(signalNumber, frame) None[source]

Triggers the shutdown of the service.

dispatch.process_dcmsend_result.create_arg_parser() ArgumentParser[source]

Creates and returns the ArgumentParser object.

dispatch.process_dcmsend_result.parse(result_file) Dict[source]

Parses the dcmsend result file and returns a python dictionary.

dispatch.retry.increase_retry(source_folder, retry_max, retry_delay) bool[source]

Increases the retries counter and set the wait counter to a new time in the future. :return True if increase has been successful or False if maximum retries has been reached

dispatch.retry.update_dispatch_status(source_folder: Path, status: Union[Dict[str, TaskDispatchStatus], EmptyDict]) bool[source]

send.py

The functions for sending DICOM series to target destinations.

dispatch.send.update_fail_stage(source_folder: Path, fail_stage: FailStage) bool[source]

status.py

Helper functions for dispatching processed cases

dispatch.status.is_ready_for_sending(folder) Optional[Task][source]

Checks if a case in the outgoing folder is ready for sending by the dispatcher.

No lock file (.lock) should be in sending folder and no error file (.error), if there is one copy/move is not done yet. Also at least some dicom files should be there for sending. Also checks for a task.json file and if it is valid.

dispatch.status.is_target_json_valid(folder) Optional[Task][source]

Checks if the task.json file exists and is valid. Returns the content of the file (or None if the file is invalid)

Target types

base.py

class dispatch.target_types.base.ProgressInfo(completed: int = 0, remaining: int = 0, progress: str = '')[source]

Bases: object

completed: int = 0
progress: str = ''
remaining: int = 0
class dispatch.target_types.base.SubprocessTargetHandler[source]

Bases: TargetHandler[TargetTypeVar]

handle_error(e: CalledProcessError, command) None[source]
send_to_target(task_id: str, target: TargetTypeVar, dispatch_info: TaskDispatch, source_folder: Path, task: Task) str[source]
class dispatch.target_types.base.TargetHandler[source]

Bases: Generic[TargetTypeVar]

exception NoSuchTagException[source]

Bases: Exception

can_pull = False
find_from_target(target: TargetTypeVar, accession: str, search_filters: Dict[str, List[str]]) List[Dataset][source]
from_form(form: dict, factory: Any, current_target: TargetTypeVar) Any[source]
get_from_target(target: TargetTypeVar, accession: str, search_filters: Dict[str, List[str]], destination_path: str) Generator[ProgressInfo, None, None][source]
handle_error(e, command) None[source]
property info_short: str
send_to_target(task_id: str, target: TargetTypeVar, dispatch_info: TaskDispatch, source_folder: Path, task: Task) str[source]
async test_connection(target: TargetTypeVar, target_name: str) dict[source]
test_template = 'targets/base-test.html'

builtin.py

class dispatch.target_types.builtin.DicomTLSTargetHandler[source]

Bases: SubprocessTargetHandler[DicomTLSTarget]

display_name = 'DICOM+TLS'
edit_template = 'targets/dicom-tls-edit.html'
handle_error(e, command)[source]
icon = 'fa-database'
async test_connection(target: DicomTLSTarget, target_name: str)[source]
test_template = 'targets/dicom-test.html'
view_template = 'targets/dicom-tls.html'
class dispatch.target_types.builtin.DicomTargetHandler[source]

Bases: SubprocessTargetHandler[DicomTarget]

can_pull = True
display_name = 'DICOM'
edit_template = 'targets/dicom-edit.html'
find_from_target(target: DicomTarget, accession: str, search_filters: Dict[str, List[str]]) List[Dataset][source]
get_from_target(target: DicomTarget, accession: str, search_filters: Dict[str, List[str]], destination_path: str) Generator[ProgressInfo, None, None][source]
handle_error(e, command)[source]
icon = 'fa-database'
async test_connection(target: DicomTarget, target_name: str)[source]
test_template = 'targets/dicom-test.html'
view_template = 'targets/dicom.html'
class dispatch.target_types.builtin.DummyTargetHandler[source]

Bases: TargetHandler

display_name = 'Dummy'
edit_template = 'targets/dummy-edit.html'
icon = 'fa-flask'
view_template = 'targets/dummy.html'
class dispatch.target_types.builtin.SftpTargetHandler[source]

Bases: SubprocessTargetHandler[SftpTarget]

display_name = 'SFTP'
edit_template = 'targets/sftp-edit.html'
icon = 'fa-server'
async test_connection(target: SftpTarget, target_name: str)[source]
test_template = 'targets/sftp-test.html'
view_template = 'targets/sftp.html'

dicomweb.py

class dispatch.target_types.dicomweb.DicomWebTargetHandler[source]

Bases: TargetHandler[DicomWebTarget]

can_pull = True
create_client(target: DicomWebTarget) Union[DICOMfileClient, DICOMwebClient][source]
display_name = 'DICOMweb'
edit_template = 'targets/dicomweb-edit.html'
find_from_target(target: DicomWebTarget, accession: str, search_filters: Dict[str, List[str]] = {}) List[Dataset][source]
from_form(form: dict, factory, current_target) DicomWebTarget[source]
get_from_target(target: DicomWebTarget, accession, search_filters, destination_path: str) Generator[ProgressInfo, None, None][source]
icon = 'fa-share-alt'
send_to_target(task_id: str, target: DicomWebTarget, dispatch_info: TaskDispatch, source_folder: Path, task: Task) str[source]
async test_connection(target: DicomWebTarget, target_name: str)[source]
view_template = 'targets/dicomweb.html'

folder.py

class dispatch.target_types.folder.FolderTargetTargetHandler[source]

Bases: TargetHandler[FolderTarget]

display_name = 'Folder'
edit_template = 'targets/folder-edit.html'
from_form(form: dict, factory, current_target: FolderTarget) FolderTarget[source]
icon = 'fa-folder'
send_to_target(task_id: str, target: FolderTarget, dispatch_info: TaskDispatch, source_folder: Path, task: Task) str[source]
async test_connection(target: FolderTarget, target_name: str)[source]
view_template = 'targets/folder.html'

registry.py

dispatch.target_types.registry.get_handler(target: Union[Target, Type[Target], str]) TargetHandler[source]
dispatch.target_types.registry.handler_for(target_type: Type[Target]) Callable[[Type[TargetHandler]], Type[TargetHandler]][source]
dispatch.target_types.registry.target_types() KeysView[Type[Target]][source]
dispatch.target_types.registry.type_from_name(name) Type[Target][source]

rsync.py

class dispatch.target_types.rsync.RsyncTargetHandler[source]

Bases: SubprocessTargetHandler[RsyncTarget]

display_name = 'rsync'
edit_template = 'targets/rsync-edit.html'
get_commands(target) Any[source]
icon = 'fa-server'
async test_connection(target: RsyncTarget, target_name: str)[source]
test_template = 'targets/rsync-test.html'
view_template = 'targets/rsync.html'

s3.py

class dispatch.target_types.s3.S3TargetHandler[source]

Bases: TargetHandler[S3Target]

create_client(target: S3Target)[source]
display_name = 'S3'
edit_template = 'targets/s3-edit.html'
from_form(form: dict, factory, current_target: S3Target) S3Target[source]
icon = 'fa-cloud'
send_to_target(task_id: str, target: S3Target, dispatch_info: TaskDispatch, source_folder: Path, task: Task) str[source]
async test_connection(target: S3Target, target_name: str)[source]
view_template = 'targets/s3.html'

xnat.py

class dispatch.target_types.xnat.InterfaceManager(server, user, password)[source]

Bases: object

Manager for pyxnat.Interface that enables the use of the with python context. Using the InterfaceManager along the with python context avoids having to call the disconnect() method after each connection. Methods - open(): to use along the with context, yields an instance of pyxnat.Interface and disconnects automatically

when the context ends.

  • open_persistent(): returns a persistent instance of pyxnat.Interface. User must use the disconnect() method

    when the use of the session is finished.

```python interface = InterfaceManager(server=’www.myxnat.org’, user=’user’, password=’password’) with interface.open() as session:

session.get(…)

session = interface.open_persistent() session.get(…) session.disconnect() ```

open()[source]
open_persistent()[source]
class dispatch.target_types.xnat.XnatTargetHandler[source]

Bases: TargetHandler[XnatTarget]

display_name = 'XNAT'
edit_template = 'targets/xnat-edit.html'
handle_error(e, command) None[source]
icon = 'fa-hdd'
send_to_target(task_id: str, target: XnatTarget, dispatch_info: TaskDispatch, source_folder: Path, task: Task) str[source]
async test_connection(target: XnatTarget, target_name: str)[source]
test_template = 'targets/xnat-test.html'
view_template = 'targets/xnat.html'
dispatch.target_types.xnat.get_domain(url: str) str[source]