Source code for common.notification

"""
notification.py
===============
Helper functions for triggering webhook calls.
"""

# Standard python includes
import json
import ssl
from typing import Any, Dict, List, Optional, TYPE_CHECKING
import typing
import aiohttp
import daiquiri
import json
import asyncio
import traceback

import jinja2.utils

from common import monitor
from common.types import EmptyDict, Rule, Task, TaskProcessing
from .helper import loop
import common.config as config
from jinja2 import Template

# App-specific includes
from common.constants import (
    mercure_events,
)


# Create local logger instance
logger = config.get_logger()

ssl_context = ssl.create_default_context()


[docs]def setup() -> bool: """Load the SSL certificate if it is configured (after the configuration has been read).""" global ssl_context if config.mercure.webhook_certificate_location: ssl_context.load_verify_locations(config.mercure.webhook_certificate_location) return True
[docs]def post(url: str, payload: Any) -> None: async def do_post(url, payload) -> None: async with aiohttp.ClientSession() as session: try: async with session.post(url, json=payload, ssl=ssl_context) as resp: if resp.status not in (200, 204): logger.warning( f"Webhook notification failed {url}, status: {resp.status}" ) logger.warning(payload) # logger.warning(f"{await resp.text()}") except Exception as e: logger.warning(f"Webhook notification failed {url}, exception: {e}") logger.warning(traceback.format_exc()) asyncio.ensure_future(do_post(url, payload), loop=loop)
[docs]def parse_payload( payload: str, event: mercure_events, rule_name: str, task_id: str, details: str = "", context: dict = {}, *, task: Optional[Task] = None, tags_list: Optional[Dict[str, str]] = None, ) -> str: payload_parsed = payload payload_parsed = payload_parsed.replace("@rule@", rule_name) payload_parsed = payload_parsed.replace("@task_id@", task_id) payload_parsed = payload_parsed.replace("@event@", event.name) if task is not None: context["DeviceSerialNumber"] = task.info.device_serial_number elif tags_list is not None: context["DeviceSerialNumber"] = tags_list.get("DeviceSerialNumber") context = { **dict(rule=rule_name, task_id=task_id, event=event.name, details=details), **context, } return Template(payload_parsed).render(context)
[docs]def send_webhook(url:str, payload: str) -> None: if not url: return # Replace macros in payload try: payload_data = json.loads("{" + payload + "}") post(url, payload_data) # response = requests.post( # url, data=json.dumps(payload_data), headers={"Content-type": "application/json"} # ) # if (response.status_code != 200) and (response.status_code != 204): # logger.error(f"ERROR: Webhook notification failed (status code {response.status_code})") # logger.error(f"ERROR: {response.text}") except: logger.error(f"ERROR: Webhook notification failed") logger.error(traceback.format_exc()) return
import smtplib from email.message import EmailMessage
[docs]def send_email(address: str, payload: str, event: mercure_events, rule_name: str, rule_type:str) -> None: if not address: return subject = f"Rule {rule_name}: {event.name}" try: send_email_helper(address, subject, payload, rule_type) except: logger.exception(f"ERROR: Email notification failed")
[docs]def send_email_helper(to:str, subject:str, content:str, rule_type="plain") -> None: # Create a text/plain message msg = EmailMessage() msg['Subject'] = f'[Mercure] {subject}' msg['From'] = config.mercure.email_notification_from msg['To'] = to msg.set_content(content, subtype=rule_type) # Send the message via our own SMTP server. s = smtplib.SMTP('localhost') try: s.send_message(msg) finally: s.quit()
[docs]def get_task_requested_notification(task:Task) -> bool: process_infos = task.process if not isinstance(process_infos,(TaskProcessing,List)): return False for process in (process_infos if isinstance(process_infos,List) else [process_infos]): if not process.output: continue if (notification_info := process.output.get("__mercure_notification")) and notification_info.get("requested"): return True return False
[docs]def get_task_custom_notification(task:Task) -> Optional[str]: results = [] process_infos = task.process if not isinstance(process_infos,(TaskProcessing,List)): return None for process in (process_infos if isinstance(process_infos,List) else [process_infos]): if not process.output: continue if (notification_info := process.output.get("__mercure_notification")) and (text:= notification_info.get("text")): results.append((process.module_name,text)) if not results: return None str_results = [ f"{module_name}: {text}" for module_name, text in results] return "\n".join(str_results)
@typing.overload def trigger_notification_for_rule( rule_name: str, task_id: str, event: mercure_events, *, task: Task, details: Optional[str] = "", send_always: bool=False, ): ... @typing.overload def trigger_notification_for_rule( rule_name: str, task_id: str, event: mercure_events, *, tags_list: Dict[str, str], details: Optional[str] = "", send_always: bool = False, ): ...
[docs]def trigger_notification_for_rule( rule_name: str, task_id: str, event: mercure_events, *, task: Optional[Task] = None, tags_list: Optional[Dict[str, str]] = {}, details: Optional[str] = "", send_always: bool = False, ): # logger.warning(f"TRIGGER NOTIFICATION FOR RULE {rule_name} {event=} {details=}\n {task=}") details = details if details is not None else "" current_rule = config.mercure.rules.get(rule_name) # Check if the rule is available if not current_rule or not isinstance(current_rule, Rule): logger.error( f"Rule {rule_name} does not exist in mercure configuration", task_id ) # handle_error return False do_send = send_always # default false # Now fire the webhook if configured if ( event == mercure_events.RECEIVED and current_rule.notification_trigger_reception == True ): do_send = True elif ( event == mercure_events.COMPLETED and current_rule.notification_trigger_completion == True ): do_send = True elif ( event == mercure_events.ERROR and current_rule.notification_trigger_error == True ): do_send = True if not do_send: return False webhook_url = current_rule.get("notification_webhook") if webhook_url: body = current_rule.get("notification_payload_body", "") context = dict(body=jinja2.utils.htmlsafe_json_dumps(parse_payload(body, event, rule_name, task_id, details, {}, task=task, tags_list=tags_list))[1:-1]) # type: ignore webhook_payload = parse_payload( current_rule.get("notification_payload", ""), event, rule_name, task_id, details, context, task=task, tags_list=tags_list, ) send_webhook(webhook_url, webhook_payload) monitor.send_task_event( monitor.task_event.NOTIFICATION, task_id, 0, webhook_url, "Announced " + event.name, ) email_addresses = current_rule.get("notification_email") if not email_addresses: return True if task: context = dict( acc=task.info.acc, mrn=task.info.mrn, patient_name=task.info.patient_name, ) else: context = {} email_payload = parse_payload( current_rule.get("notification_email_body", ""), event, rule_name, task_id, details, context, task=task, tags_list=tags_list, ) for email_address in email_addresses.split(", "): send_email( email_address, email_payload, event, rule_name, current_rule.get("notification_email_type", "plain"), ) monitor.send_task_event( monitor.task_event.NOTIFICATION, task_id, 0, email_address, "Announced " + event.name, ) return True