Source code for common.notification

"""
notification.py
===============
Helper functions for triggering webhook calls.
"""

import asyncio
# Standard python includes
import json
import smtplib
import ssl
import traceback
import typing
from email.message import EmailMessage
from typing import Any, Dict, List, Optional

import aiohttp
import common.config as config
import jinja2.utils
from common import monitor
# App-specific includes
from common.constants import mercure_events
from common.types import Rule, Task, TaskProcessing
from jinja2 import Template

from .helper import loop

# Create local logger instance
logger = config.get_logger()

ssl_context = ssl.create_default_context()


[docs]def setup() -> bool: """Load the SSL certificate if it is configured (after the configuration has been read).""" global ssl_context if config.mercure.webhook_certificate_location: ssl_context.load_verify_locations(config.mercure.webhook_certificate_location) return True
[docs]def post(url: str, payload: Any) -> None: async def do_post(url, payload) -> None: async with aiohttp.ClientSession() as session: try: async with session.post(url, json=payload, ssl=ssl_context) as resp: if resp.status not in (200, 204): logger.warning( f"Webhook notification failed {url}, status: {resp.status}" ) logger.warning(payload) # logger.warning(f"{await resp.text()}") except Exception as e: logger.warning(f"Webhook notification failed {url}, exception: {e}") logger.warning(traceback.format_exc()) asyncio.ensure_future(do_post(url, payload), loop=loop)
[docs]def parse_payload( payload: str, event: mercure_events, rule_name: str, task_id: str, details: str = "", context: dict = {}, *, task: Optional[Task] = None, tags_list: Optional[Dict[str, str]] = None, ) -> str: payload_parsed = payload payload_parsed = payload_parsed.replace("@rule@", rule_name) payload_parsed = payload_parsed.replace("@task_id@", task_id) payload_parsed = payload_parsed.replace("@event@", event.name) if task is not None: context["DeviceSerialNumber"] = task.info.device_serial_number elif tags_list is not None: context["DeviceSerialNumber"] = tags_list.get("DeviceSerialNumber") context = { **dict(rule=rule_name, task_id=task_id, event=event.name, details=details), **context, } return Template(payload_parsed).render(context)
[docs]def send_webhook(url: str, payload: str) -> None: if not url: return # Replace macros in payload try: payload_data = json.loads("{" + payload + "}") post(url, payload_data) # response = requests.post( # url, data=json.dumps(payload_data), headers={"Content-type": "application/json"} # ) # if (response.status_code != 200) and (response.status_code != 204): # logger.error(f"ERROR: Webhook notification failed (status code {response.status_code})") # logger.error(f"ERROR: {response.text}") except Exception: logger.error("ERROR: Webhook notification failed") logger.error(traceback.format_exc()) return
[docs]def send_email(address: str, payload: str, event: mercure_events, rule_name: str, rule_type: str) -> None: if not address: return subject = f"Rule {rule_name}: {event.name}" try: send_email_helper(address, subject, payload, rule_type) except Exception: logger.exception("ERROR: Email notification failed")
[docs]def send_email_helper(to: str, subject: str, content: str, rule_type="plain") -> None: # Create a text/plain message msg = EmailMessage() msg['Subject'] = f'[Mercure] {subject}' msg['From'] = config.mercure.email_notification_from msg['To'] = to msg.set_content(content, subtype=rule_type) # Send the message via our own SMTP server. s = smtplib.SMTP('localhost') try: s.send_message(msg) finally: s.quit()
[docs]def get_task_requested_notification(task: Task) -> bool: process_infos = task.process if not isinstance(process_infos, (TaskProcessing, List)): return False for process in (process_infos if isinstance(process_infos, List) else [process_infos]): if not process.output: continue if (notification_info := process.output.get("__mercure_notification")) and notification_info.get("requested"): return True return False
[docs]def get_task_custom_notification(task: Task) -> Optional[str]: results = [] process_infos = task.process if not isinstance(process_infos, (TaskProcessing, List)): return None for process in (process_infos if isinstance(process_infos, List) else [process_infos]): if not process.output: continue if (notification_info := process.output.get("__mercure_notification")) and (text := notification_info.get("text")): results.append((process.module_name, text)) if not results: return None str_results = [f"{module_name}: {text}" for module_name, text in results] return "\n".join(str_results)
@typing.overload def trigger_notification_for_rule( rule_name: str, task_id: str, event: mercure_events, *, task: Task, details: Optional[str] = "", send_always: bool = False, ): ... @typing.overload def trigger_notification_for_rule( rule_name: str, task_id: str, event: mercure_events, *, tags_list: Dict[str, str], details: Optional[str] = "", send_always: bool = False, ): ...
[docs]def trigger_notification_for_rule( rule_name: str, task_id: str, event: mercure_events, *, task: Optional[Task] = None, tags_list: Optional[Dict[str, str]] = {}, details: Optional[str] = "", send_always: bool = False, ): # logger.warning(f"TRIGGER NOTIFICATION FOR RULE {rule_name} {event=} {details=}\n {task=}") details = details if details is not None else "" current_rule = config.mercure.rules.get(rule_name) # Check if the rule is available if not current_rule or not isinstance(current_rule, Rule): logger.error( f"Rule {rule_name} does not exist in mercure configuration", task_id ) # handle_error return False do_send = send_always # default false # Now fire the webhook if configured if ( event == mercure_events.RECEIVED and current_rule.notification_trigger_reception is True ): do_send = True elif ( event == mercure_events.COMPLETED and current_rule.notification_trigger_completion is True ): do_send = True elif ( event == mercure_events.ERROR and current_rule.notification_trigger_error is True ): do_send = True if not do_send: return False webhook_url = current_rule.get("notification_webhook") if webhook_url: body = current_rule.get("notification_payload_body", "") phi_data = dict( acc="UNKNOWN", mrn="UNKNOWN", patient_name="UNKNOWN", ) if task and config.mercure.phi_notifications: phi_data = dict( acc=task.info.acc, mrn=task.info.mrn, patient_name=task.info.patient_name or "", ) context = dict(body=jinja2.utils.htmlsafe_json_dumps( # type: ignore parse_payload(body, event, rule_name, task_id, details, phi_data, task=task, tags_list=tags_list))[1:-1]) context.update(phi_data) webhook_payload = parse_payload( current_rule.get("notification_payload", ""), event, rule_name, task_id, details, context, task=task, tags_list=tags_list, ) send_webhook(webhook_url, webhook_payload) monitor.send_task_event( monitor.task_event.NOTIFICATION, task_id, 0, webhook_url, "Announced " + event.name, ) email_addresses = current_rule.get("notification_email") if not email_addresses: return True if task and config.mercure.phi_notifications: context = dict( acc=task.info.acc, mrn=task.info.mrn, patient_name=task.info.patient_name, ) else: context = {} email_payload = parse_payload( current_rule.get("notification_email_body", ""), event, rule_name, task_id, details, context, task=task, tags_list=tags_list, ) for email_address in email_addresses.split(", "): send_email( email_address, email_payload, event, rule_name, current_rule.get("notification_email_type", "plain"), ) monitor.send_task_event( monitor.task_event.NOTIFICATION, task_id, 0, email_address, "Announced " + event.name, ) return True