Source code for dispatch.target_types.base

"""
base.py
=======
"""

import subprocess
from dataclasses import dataclass
from pathlib import Path
from subprocess import CalledProcessError, check_output
from typing import Any, Dict, Generator, Generic, List, TypeVar

import common.config as config
from common.types import Task, TaskDispatch
from pydicom import Dataset
from pydicom.datadict import tag_for_keyword

logger = config.get_logger()

TargetTypeVar = TypeVar("TargetTypeVar")


[docs]@dataclass class ProgressInfo(): completed: int = 0 remaining: int = 0 progress: str = ""
[docs]class TargetHandler(Generic[TargetTypeVar]): test_template = "targets/base-test.html" can_pull = False def __init__(self): pass @property def info_short(self) -> str: return ""
[docs] def send_to_target( self, task_id: str, target: TargetTypeVar, dispatch_info: TaskDispatch, source_folder: Path, task: Task, ) -> str: return ""
[docs] class NoSuchTagException(Exception): pass
[docs] def get_from_target(self, target: TargetTypeVar, accession: str, search_filters: Dict[str, List[str]], destination_path: str) -> Generator[ProgressInfo, None, None]: raise Exception()
[docs] def find_from_target(self, target: TargetTypeVar, accession: str, search_filters: Dict[str, List[str]]) -> List[Dataset]: if self is TargetHandler: raise Exception("This method should be overridden by a subclass") for t in search_filters.keys(): if not tag_for_keyword(t): raise TargetHandler.NoSuchTagException(f"Invalid search filter: no such tag '{t}'") return []
[docs] def handle_error(self, e, command) -> None: pass
[docs] async def test_connection(self, target: TargetTypeVar, target_name: str) -> dict: return {}
[docs] def from_form(self, form: dict, factory: Any, current_target: TargetTypeVar) -> Any: return factory(**form)
[docs]class SubprocessTargetHandler(TargetHandler[TargetTypeVar]): def _create_command(self, target: TargetTypeVar, source_folder: Path, task: Task): return ("", {})
[docs] def send_to_target( self, task_id: str, target: TargetTypeVar, dispatch_info: TaskDispatch, source_folder: Path, task: Task, ) -> str: commands, opts = self._create_command(target, source_folder, task) if not isinstance(commands[0], list): commands = [commands] result = "" logger.info(f"Sending {source_folder} to target {dispatch_info.target_name}") for command in commands: try: logger.info(f"Running command {' '.join(command)}") output = check_output( command, encoding="utf-8", stderr=subprocess.STDOUT, **opts ) result += output logger.info(output) # return result # type: ignore # Mypy doesn't know that check_output returns a string here? except CalledProcessError as e: self.handle_error(e, command) raise return result
[docs] def handle_error(self, e: CalledProcessError, command) -> None: logger.error(e.output) logger.error(f"Failed. Command exited with value {e.returncode}: \n {command}")