Source code for dispatch.target_types.xnat

"""
xnat.py
=======
"""

import datetime
import glob
import os
import tempfile
import zipfile
from contextlib import contextmanager
from pathlib import Path
from urllib.parse import urlparse

import aiohttp
import common.config as config
import pyxnat
from common.types import Task, TaskDispatch, XnatTarget
from pydicom import dcmread
from webinterface.common import async_run

from .base import TargetHandler
from .registry import handler_for

logger = config.get_logger()


[docs]def get_domain(url: str) -> str: parsed_url = urlparse(url) if parsed_url.scheme and parsed_url.netloc: return parsed_url.netloc else: return ""
[docs]@handler_for(XnatTarget) class XnatTargetHandler(TargetHandler[XnatTarget]): view_template = "targets/xnat.html" edit_template = "targets/xnat-edit.html" test_template = "targets/xnat-test.html" icon = "fa-hdd" display_name = "XNAT"
[docs] def send_to_target( self, task_id: str, target: XnatTarget, dispatch_info: TaskDispatch, source_folder: Path, task: Task ) -> str: try: _send_dicom_to_xnat( target=target, folder=source_folder, dispatch_info=dispatch_info ) except ConnectionError as e: self.handle_error(e, "") raise return ""
[docs] def handle_error(self, e, command) -> None: logger.error(e)
[docs] async def test_connection(self, target: XnatTarget, target_name: str): url = f"{target.host}/data/auth" async with aiohttp.ClientSession() as session: ping_ok = False if target.host: ping_result, *_ = await async_run( f"ping -w 1 -c 1 {get_domain(target.host)}" ) if ping_result == 0: ping_ok = True try: async with session.get( url, auth=aiohttp.BasicAuth(target.user, target.password) ) as resp: response_ok = resp.status == 200 text = await resp.text() if not response_ok else "" return dict(ping=ping_ok, loggedin=response_ok, err=text) except Exception as e: return dict(ping=ping_ok, loggedin=False, err=str(e))
def _send_dicom_to_xnat(target: XnatTarget, dispatch_info: TaskDispatch, folder: Path): logger.info( f"Connecting to {dispatch_info.target_name}({target.host}) XNAT server..." ) with InterfaceManager(server=target.host, user=target.user, # type: ignore password=target.password).open() as session: # type: ignore project_id = target.project_id dicom_file_path = glob.glob(os.path.join(folder, "*.dcm"))[0] dcmFile = dcmread(dicom_file_path, stop_before_pixels=True) subject_id = f"{dcmFile.PatientID}" # TODO make experiment_id more generic. experiment_id = f"{subject_id}_{datetime.datetime.strptime(dcmFile.StudyDate, '%Y%m%d').strftime('%Y-%m-%d')}" logger.info(f"Uploading {folder} to {dispatch_info.target_name} ...") _upload_dicom_session_to_xnat( session=session, project_id=project_id, subject_id=subject_id, experiment_label=experiment_id, dicom_path=folder, overwrite_dicom=True, ) def _upload_dicom_session_to_xnat( session: pyxnat.Interface, project_id, subject_id, experiment_label, dicom_path, overwrite_dicom=True, ): """ Uploads the dicoms from the given path to an XNAT server using the Image Session Import Service API. If the dicom_path contains more than one scan, all will be uploaded to the session. :param session: a pyxnat.Interface instance :param project_id: (str) XNAT's project ID or label :param subject_id: (str) XNAT's subject ID or label :param experiment_label: (str) XNAT's experiment label or ID :param dicom_path: path to directory containing dicom scan(s) :param scan_type: the value for the xnat:mrScanData/type field :param overwrite_dicom: if True, it will delete any existing Scan with same ID before uploading :return: a list of the uploaded scan_uris """ # Create a zip file with the dicom_path in a temporary directory with tempfile.TemporaryDirectory() as tmp_path: zip_filepath = os.path.join(tmp_path, "dicom.zip") # rename *.dcm files of dicom_path incrementaly and zip them with zipfile.ZipFile(zip_filepath, "w") as zip_file: i = 0 for root, dirs, files in os.walk(dicom_path): for file in files: if file.endswith(".dcm"): zip_file.write(os.path.join(root, file), f"{i}.dcm") i += 1 # Upload zip file to XNAT server using the Image Session Import API with open(zip_filepath, "rb") as data: resp = session.post( uri="/data/services/import", params={ "PROJECT_ID": project_id, "SUBJECT_ID": subject_id, "EXPT_LABEL": experiment_label, "rename": "true", "overwrite": "delete" if overwrite_dicom else "append", "inbody": "true", }, headers={"Content-Type": "application/zip"}, data=data, ) if resp.status_code != 200: raise ConnectionError( f"Response not 200 OK while uploading DICOM with Image Session Import Service API. " f"Response code: {resp.status_code} " f"Response: {resp}" )
[docs]class InterfaceManager(object): """Manager for `pyxnat.Interface` that enables the use of the `with` python context. Using the InterfaceManager along the `with` python context avoids having to call the `disconnect()` method after each connection. **Methods** - `open()`: to use along the `with` context, yields an instance of `pyxnat.Interface` and disconnects automatically when the context ends. - `open_persistent()`: returns a persistent instance of `pyxnat.Interface`. User must use the `disconnect()` method when the use of the session is finished. ```python interface = InterfaceManager(server='www.myxnat.org', user='user', password='password') with interface.open() as session: session.get(...) session = interface.open_persistent() session.get(...) session.disconnect() ``` """ def __init__(self, server, user, password): self.host = server self.user = user self.psswd = password
[docs] @contextmanager def open(self): try: sess = pyxnat.Interface( server=self.host, user=self.user, password=self.psswd ) yield sess finally: sess.disconnect()
[docs] def open_persistent(self): return pyxnat.Interface(server=self.host, user=self.user, password=self.psswd)