Source code for dispatch.target_types.builtin

"""
builtin.py
==========
"""

from pathlib import Path
from shlex import split
from typing import Dict, Generator, List

import common.config as config
from common.constants import mercure_names
from common.types import DicomTarget, DicomTLSTarget, DummyTarget, SftpTarget, Task
from pydicom import Dataset
from webinterface.common import async_run
from webinterface.dicom_client import DicomClientCouldNotFind, SimpleDicomClient

from .base import ProgressInfo, SubprocessTargetHandler, TargetHandler
from .registry import handler_for

DCMSEND_ERROR_CODES = {
    1: "EXITCODE_COMMANDLINE_SYNTAX_ERROR",
    21: "EXITCODE_NO_INPUT_FILES",
    22: "EXITCODE_INVALID_INPUT_FILE",
    23: "EXITCODE_NO_VALID_INPUT_FILES",
    43: "EXITCODE_CANNOT_WRITE_REPORT_FILE",
    60: "EXITCODE_CANNOT_INITIALIZE_NETWORK",
    61: "EXITCODE_CANNOT_NEGOTIATE_ASSOCIATION",
    62: "EXITCODE_CANNOT_SEND_REQUEST",
    65: "EXITCODE_CANNOT_ADD_PRESENTATION_CONTEXT",
}
logger = config.get_logger()


[docs]@handler_for(DicomTarget) class DicomTargetHandler(SubprocessTargetHandler[DicomTarget]): view_template = "targets/dicom.html" edit_template = "targets/dicom-edit.html" test_template = "targets/dicom-test.html" icon = "fa-database" display_name = "DICOM" can_pull = True def _create_command(self, target: DicomTarget, source_folder: Path, task: Task): target_ip = target.ip if target_ip == "sender": # If results should be looped back to the original sender of the task, insert # the IP/address obtained from the DICOM receiver target_ip = task.info.sender_address if not target_ip: # If not target ip has been provided, insert a value that allows identifying the issue target_ip = "target_missing" target_port = target.port or 104 target_aet_target = target.aet_target or "" target_aet_source = target.aet_source or "" if target.pass_sender_aet: target_aet_source = task.info.sender_aet if target.pass_receiver_aet: target_aet_target = task.info.receiver_aet dcmsend_status_file = str(Path(source_folder) / mercure_names.SENDLOG) command = split( (f"""dcmsend {target_ip} {target_port} +sd {source_folder} -aet {target_aet_source} """ f"""-aec {target_aet_target} -nuc +sp '*.dcm' -to 60 +crf {dcmsend_status_file}""") ) return command, {}
[docs] def find_from_target(self, target: DicomTarget, accession: str, search_filters: Dict[str, List[str]]) -> List[Dataset]: c = SimpleDicomClient(target.ip, target.port, target.aet_target, target.aet_source, None) try: return c.findscu(accession, search_filters) except DicomClientCouldNotFind: return []
[docs] def get_from_target(self, target: DicomTarget, accession: str, search_filters: Dict[str, List[str]], destination_path: str) -> Generator[ProgressInfo, None, None]: config.read_config() c = SimpleDicomClient(target.ip, target.port, target.aet_target, target.aet_source, destination_path) for identifier in c.getscu(accession, search_filters): completed, remaining = (identifier.NumberOfCompletedSuboperations, identifier.NumberOfRemainingSuboperations) progress = f"{ completed } / { completed + remaining }" yield ProgressInfo(completed, remaining, progress)
[docs] def handle_error(self, e, command): dcmsend_error_message = DCMSEND_ERROR_CODES.get(e.returncode, None) logger.exception(f"Failed command:\n {command} \nbecause of {dcmsend_error_message}") raise RuntimeError(f"{dcmsend_error_message}")
[docs] async def test_connection(self, target: DicomTarget, target_name: str): cecho_response = False ping_response = False loopback_mode = False target_ip = target.ip or "" target_port = target.port or "" target_aec = target.aet_target or "ANY-SCP" target_aet = target.aet_source or "ECHOSCU" if target_ip == "sender": loopback_mode = True logger.info(f"Testing target {target_name}") if target_ip and target_port and not loopback_mode: ping_result, *_ = await async_run(f"ping -w 1 -c 1 {target_ip}") if ping_result == 0: ping_response = True cecho_result, *_ = await async_run( f"echoscu -to 2 -aec {target_aec} -aet {target_aet} {target_ip} {target_port}" ) if cecho_result == 0: cecho_response = True return {"ping": ping_response, "c-echo": cecho_response, "loopback_mode": loopback_mode}
[docs]@handler_for(DicomTLSTarget) class DicomTLSTargetHandler(SubprocessTargetHandler[DicomTLSTarget]): view_template = "targets/dicom-tls.html" edit_template = "targets/dicom-tls-edit.html" test_template = "targets/dicom-test.html" icon = "fa-database" display_name = "DICOM+TLS" def _create_command(self, target: DicomTLSTarget, source_folder: Path, task: Task): target_ip = target.ip target_port = target.port or 104 target_aet_target = target.aet_target or "" target_aet_source = target.aet_source or "" if target.pass_sender_aet: target_aet_source = task.info.sender_aet if target.pass_receiver_aet: target_aet_target = task.info.receiver_aet command = split( f"""storescu +tls {target.tls_key} {target.tls_cert} +cf {target.ca_cert} {target_ip} {target_port} """ f"""+sd {source_folder} -aet {target_aet_source} -aec {target_aet_target} +sp '*.dcm' -to 60""" ) return command, {}
[docs] def handle_error(self, e, command): dcmsend_error_message = DCMSEND_ERROR_CODES.get(e.returncode, None) logger.exception(f"Failed command:\n {command} \nbecause of {dcmsend_error_message}") raise RuntimeError(f"{dcmsend_error_message}")
[docs] async def test_connection(self, target: DicomTLSTarget, target_name: str): cecho_response = False ping_response = False target_ip = target.ip or "" target_port = target.port or "" target_aec = target.aet_target or "ANY-SCP" target_aet = target.aet_source or "ECHOSCU" tls_key = target.tls_key tls_cert = target.tls_cert ca_cert = target.ca_cert logger.info(f"Testing TLS target {target_name}") if target_ip and target_port: ping_result, *_ = await async_run(f"ping -w 1 -c 1 {target_ip}") if ping_result == 0: ping_response = True cecho_command = (f"echoscu -to 2 -aec {target_aec} -aet {target_aet} {target_ip}" f" {target_port} +tls {tls_key} {tls_cert} +cf {ca_cert}") logger.info('Running %s' % cecho_command) cecho_result, *_ = await async_run(cecho_command) if cecho_result == 0: cecho_response = True return {"ping": ping_response, "c-echo": cecho_response}
[docs]@handler_for(SftpTarget) class SftpTargetHandler(SubprocessTargetHandler[SftpTarget]): view_template = "targets/sftp.html" edit_template = "targets/sftp-edit.html" test_template = "targets/sftp-test.html" icon = "fa-server" display_name = "SFTP" def _create_command(self, target: SftpTarget, source_folder: Path, task: Task): command = ( "sftp -o StrictHostKeyChecking=no " + f""" "{target.user}@{target.host}:{target.folder}" """ + f""" <<- EOF mkdir "{target.folder}/{source_folder.stem}" put -f -r "{source_folder}" !touch "/tmp/.complete" put -f "/tmp/.complete" "{target.folder}/{source_folder.stem}/.complete" EOF""" ) if target.password: command = f"sshpass -p {target.password} " + command return split(command), dict(shell=True, executable="/bin/bash")
[docs] async def test_connection(self, target: SftpTarget, target_name: str): ping_response = False ping_result, *_ = await async_run(f"ping -w 1 -c 1 {target.host}") ping_response = True if ping_result == 0 else False response = False stderr = b"" command = "sftp -o StrictHostKeyChecking=no " + f""" "{target.user}@{target.host}:{target.folder}" <<< "" """ if target.password: command = f"sshpass -p {target.password} " + command logger.debug(command) result, stdout, stderr = await async_run(command, shell=True, executable="/bin/bash") response = True if result == 0 else False return dict(ping=ping_response, loggedin=response, err=stderr.decode("utf-8") if not response else "")
[docs]@handler_for(DummyTarget) class DummyTargetHandler(TargetHandler): icon = "fa-flask" view_template = "targets/dummy.html" edit_template = "targets/dummy-edit.html" display_name = "Dummy"