"""
monitor.py
==========
Helper functions and definitions for monitoring mercure's operations via the bookkeeper module.
"""
# Standard python includes
import asyncio
import asyncio.exceptions
import datetime
import os
import time
from json import JSONDecodeError
from typing import Any, Dict, Optional
import aiohttp
import daiquiri
from common.event_types import m_events, severity, task_event, w_events
# App-specific includes
from common.types import Task, TaskProcessing
# Create local logger instance
logger = daiquiri.getLogger("monitor") # log_helpers.get_logger("monitor", True)
api_key: Optional[str] = None
sender_name = ""
bookkeeper_address = ""
loop: Optional[asyncio.AbstractEventLoop] = None
[docs]class MonitorHTTPError(Exception):
"""Exception raised when a HTTP error occurs."""
def __init__(self, status_code: int, message: str):
self.status_code = status_code
self.message = message
logger.debug("HTTP error: %s", message)
[docs]def set_api_key() -> None:
global api_key
if api_key is None:
from common.config import read_config
try:
c = read_config()
api_key = c.bookkeeper_api_key
except (ResourceWarning, FileNotFoundError):
logger.warning("No API key found. No bookkeeper events will be transmitted.")
return
[docs]async def do_post(endpoint, kwargs, catch_errors=False) -> None:
if api_key is None:
return
logger.debug(f"Posting to {endpoint}: {kwargs}")
try:
async with aiohttp.ClientSession(headers={"Authorization": f"Token {api_key}"},
timeout=aiohttp.ClientTimeout(total=None, connect=120,
sock_connect=120, sock_read=120)
) as session:
async with session.post(bookkeeper_address + "/" + endpoint, **kwargs) as resp:
logger.debug(f"Response from {endpoint}: {resp.status}")
if resp.status != 200:
logger.warning(
f"Failed POST request {kwargs} to bookkeeper endpoint {endpoint}: status: {resp.status}"
)
except aiohttp.client.ClientError as e:
logger.error(f"Failed POST request to {bookkeeper_address}/{endpoint}: {e}")
if not catch_errors:
raise
except asyncio.TimeoutError as e:
logger.error(f"Failed POST request to {bookkeeper_address}/{endpoint} with timeout: {e}")
if not catch_errors:
raise
[docs]def post(endpoint: str, **kwargs) -> None:
if api_key is None:
return None
if not bookkeeper_address:
return None
# create_task requires a running event loop; during boot there might not be one running yet.
asyncio.ensure_future(do_post(endpoint, kwargs, True), loop=loop)
[docs]async def async_post(endpoint: str, **kwargs):
if api_key is None:
return None
if not bookkeeper_address:
return None
return await do_post(
endpoint,
kwargs,
True,
)
[docs]async def get(endpoint: str, payload: Any = {}) -> Any:
if api_key is None:
return
async with aiohttp.ClientSession(headers={"Authorization": f"Token {api_key}"}) as session:
async with session.get(bookkeeper_address + "/" + endpoint, params=payload) as resp:
if resp.status != 200:
logger.error(f"Failed GET request to bookkeeper endpoint {endpoint}: status: {resp.status}")
if resp.content_type == "application/json":
try:
err_json = await resp.json()
except JSONDecodeError:
raise MonitorHTTPError(resp.status, await resp.text())
else:
raise MonitorHTTPError(resp.status, await resp.text())
try:
raise MonitorHTTPError(resp.status, str(err_json["error"]))
except KeyError:
raise MonitorHTTPError(resp.status, "Unknown error")
return await resp.json()
[docs]def send_event(event: m_events, severity: severity = severity.INFO, description: str = "") -> None:
"""Sends information about general mercure events to the bookkeeper (e.g., during module start)."""
logger.debug(
f'Monitor (mercure-event): level {severity.value} {event}: "{description}"'
)
if not bookkeeper_address:
return
payload = {
"sender": sender_name,
"event": event.value,
"severity": severity.value,
"description": description,
}
post("mercure-event", data=payload)
# requests.post(bookkeeper_address + "/mercure-event", data=payload, timeout=1)
[docs]def send_webgui_event(event: w_events, user: str, description="") -> None:
"""Sends information about an event on the webgui to the bookkeeper."""
if not bookkeeper_address:
return
payload = {
"sender": sender_name,
"event": event.value,
"user": user,
"description": description,
}
post("webgui-event", data=payload)
# requests.post(bookkeeper_address + "/webgui-event", data=payload, timeout=1)
[docs]def send_register_series(tags: Dict[str, str]) -> None:
"""Registers a received series on the bookkeeper. This should be called when a series has been
fully received and the DICOM tags have been parsed."""
logger.debug(f"Monitor (register-series): series_uid={tags.get('series_uid', None)}")
# requests.post(bookkeeper_address + "/register-series", data=tags, timeout=1)
post("register-series", data=tags)
[docs]def send_register_task(task_id: str, series_uid: str, parent_id: Optional[str] = None) -> None:
"""Registers a new task on the bookkeeper. This should be called whenever a new task has been created."""
post("register-task", json={"id": task_id, "series_uid": series_uid, "parent_id": parent_id})
[docs]def send_update_task(task: Task) -> None:
"""Registers a new task on the bookkeeper. This should be called whenever a new task has been created."""
task_dict = task.dict()
logger.debug(f"Monitor (update-task): task.id={task_dict['id']} ")
post("update-task", json=task_dict)
[docs]def send_processor_output(task: Task, task_processing: TaskProcessing, index: int, output: dict) -> None:
post("store-processor-output",
json=dict(task_id=task.id, task_acc=task.info.acc, task_mrn=task.info.mrn,
module=task_processing.module_name, index=index,
settings=task_processing.settings, output=output))
[docs]def task_event_payload(event: task_event, task_id: str, file_count: int, target, info):
return {
"sender": sender_name,
"event": event.value,
"file_count": file_count,
"target": target,
"info": info,
"task_id": task_id,
"timestamp": time.monotonic(),
"time": datetime.datetime.now(),
}
[docs]def send_task_event(event: task_event, task_id: str, file_count: int, target: str, info: str) -> None:
"""Send an event related to a specific series to the bookkeeper."""
logger.debug(f"Monitor (task-event): event={event} task_id={task_id} info={info}")
post("task-event", data=task_event_payload(event, task_id, file_count, target, info))
[docs]async def async_send_task_event(event: task_event, task_id: str, file_count: int, target: str, info: str):
logger.debug(f"Monitor (task-event): event={event} task_id={task_id} info={info}")
return await async_post("task-event", data=task_event_payload(event, task_id, file_count, target, info))
[docs]def send_process_logs(task_id: str, module_name: str, logs: str) -> None:
logger.debug(f"Monitor (processor-logs): task_id={task_id}")
payload = {
"sender": sender_name,
"task_id": task_id,
"module_name": module_name,
"time": datetime.datetime.now(),
"logs": logs,
}
post("processor-logs", data=payload)
[docs]async def get_task_events(task_id="") -> Any:
return await get("query/task-events", {"task_id": task_id})
[docs]async def get_series(series_uid="") -> Any:
return await get("query/series", {"series_uid": series_uid})
[docs]async def get_tasks() -> Any:
return await get("query/tasks")
[docs]async def get_tests() -> Any:
return await get("query/tests")
[docs]async def find_tasks(request) -> Any:
return await get("query/find_task", {k: v for k, v in request.query_params.items()})
[docs]async def task_process_logs(task_id="") -> Any:
return await get("query/task_process_logs", {"task_id": task_id})
[docs]async def task_process_results(task_id="") -> Any:
return await get("query/task_process_results", {"task_id": task_id})
[docs]async def get_task_info(task_id="") -> Any:
return await get("query/get_task_info", {"task_id": task_id})