"""
notification.py
===============
Helper functions for triggering webhook calls.
"""
import asyncio
# Standard python includes
import json
import smtplib
import ssl
import traceback
import typing
from email.message import EmailMessage
from typing import Any, Dict, List, Optional
import aiohttp
import common.config as config
import jinja2.utils
from common import monitor
# App-specific includes
from common.constants import mercure_events
from common.types import Rule, Task, TaskProcessing
from jinja2 import Template
from .helper import loop
# Create local logger instance
logger = config.get_logger()
ssl_context = ssl.create_default_context()
[docs]def setup() -> bool:
"""Load the SSL certificate if it is configured (after the configuration has been read)."""
global ssl_context
if config.mercure.webhook_certificate_location:
ssl_context.load_verify_locations(config.mercure.webhook_certificate_location)
return True
[docs]def post(url: str, payload: Any) -> None:
async def do_post(url, payload) -> None:
async with aiohttp.ClientSession() as session:
try:
async with session.post(url, json=payload, ssl=ssl_context) as resp:
if resp.status not in (200, 204):
logger.warning(
f"Webhook notification failed {url}, status: {resp.status}"
)
logger.warning(payload)
# logger.warning(f"{await resp.text()}")
except Exception as e:
logger.warning(f"Webhook notification failed {url}, exception: {e}")
logger.warning(traceback.format_exc())
asyncio.ensure_future(do_post(url, payload), loop=loop)
[docs]def parse_payload(
payload: str,
event: mercure_events,
rule_name: str,
task_id: str,
details: str = "",
context: dict = {},
*,
task: Optional[Task] = None,
tags_list: Optional[Dict[str, str]] = None,
) -> str:
payload_parsed = payload
payload_parsed = payload_parsed.replace("@rule@", rule_name)
payload_parsed = payload_parsed.replace("@task_id@", task_id)
payload_parsed = payload_parsed.replace("@event@", event.name)
if task is not None:
context["DeviceSerialNumber"] = task.info.device_serial_number
elif tags_list is not None:
context["DeviceSerialNumber"] = tags_list.get("DeviceSerialNumber")
context = {
**dict(rule=rule_name, task_id=task_id, event=event.name, details=details),
**context,
}
return Template(payload_parsed).render(context)
[docs]def send_webhook(url: str, payload: str) -> None:
if not url:
return
# Replace macros in payload
try:
payload_data = json.loads("{" + payload + "}")
post(url, payload_data)
# response = requests.post(
# url, data=json.dumps(payload_data), headers={"Content-type": "application/json"}
# )
# if (response.status_code != 200) and (response.status_code != 204):
# logger.error(f"ERROR: Webhook notification failed (status code {response.status_code})")
# logger.error(f"ERROR: {response.text}")
except Exception:
logger.error("ERROR: Webhook notification failed")
logger.error(traceback.format_exc())
return
[docs]def send_email(address: str, payload: str, event: mercure_events, rule_name: str, rule_type: str) -> None:
if not address:
return
subject = f"Rule {rule_name}: {event.name}"
try:
send_email_helper(address, subject, payload, rule_type)
except Exception:
logger.exception("ERROR: Email notification failed")
[docs]def send_email_helper(to: str, subject: str, content: str, rule_type="plain") -> None:
# Create a text/plain message
msg = EmailMessage()
msg['Subject'] = f'[Mercure] {subject}'
msg['From'] = config.mercure.email_notification_from
msg['To'] = to
msg.set_content(content, subtype=rule_type)
# Send the message via our own SMTP server.
s = smtplib.SMTP('localhost')
try:
s.send_message(msg)
finally:
s.quit()
[docs]def get_task_requested_notification(task: Task) -> bool:
process_infos = task.process
if not isinstance(process_infos, (TaskProcessing, List)):
return False
for process in (process_infos if isinstance(process_infos, List) else [process_infos]):
if not process.output:
continue
if (notification_info := process.output.get("__mercure_notification")) and notification_info.get("requested"):
return True
return False
[docs]def get_task_custom_notification(task: Task) -> Optional[str]:
results = []
process_infos = task.process
if not isinstance(process_infos, (TaskProcessing, List)):
return None
for process in (process_infos if isinstance(process_infos, List) else [process_infos]):
if not process.output:
continue
if (notification_info := process.output.get("__mercure_notification")) and (text := notification_info.get("text")):
results.append((process.module_name, text))
if not results:
return None
str_results = [f"{module_name}: {text}" for module_name, text in results]
return "\n".join(str_results)
@typing.overload
def trigger_notification_for_rule(
rule_name: str,
task_id: str,
event: mercure_events,
*,
task: Task,
details: Optional[str] = "",
send_always: bool = False,
):
...
@typing.overload
def trigger_notification_for_rule(
rule_name: str,
task_id: str,
event: mercure_events,
*,
tags_list: Dict[str, str],
details: Optional[str] = "",
send_always: bool = False,
):
...
[docs]def trigger_notification_for_rule(
rule_name: str,
task_id: str,
event: mercure_events,
*,
task: Optional[Task] = None,
tags_list: Optional[Dict[str, str]] = {},
details: Optional[str] = "",
send_always: bool = False,
):
# logger.warning(f"TRIGGER NOTIFICATION FOR RULE {rule_name} {event=} {details=}\n {task=}")
details = details if details is not None else ""
current_rule = config.mercure.rules.get(rule_name)
# Check if the rule is available
if not current_rule or not isinstance(current_rule, Rule):
logger.error(
f"Rule {rule_name} does not exist in mercure configuration", task_id
) # handle_error
return False
do_send = send_always # default false
# Now fire the webhook if configured
if (
event == mercure_events.RECEIVED
and current_rule.notification_trigger_reception is True
):
do_send = True
elif (
event == mercure_events.COMPLETED
and current_rule.notification_trigger_completion is True
):
do_send = True
elif (
event == mercure_events.ERROR
and current_rule.notification_trigger_error is True
):
do_send = True
if not do_send:
return False
webhook_url = current_rule.get("notification_webhook")
if webhook_url:
body = current_rule.get("notification_payload_body", "")
phi_data = dict(
acc="UNKNOWN",
mrn="UNKNOWN",
patient_name="UNKNOWN",
)
if task and config.mercure.phi_notifications:
phi_data = dict(
acc=task.info.acc,
mrn=task.info.mrn,
patient_name=task.info.patient_name or "",
)
context = dict(body=jinja2.utils.htmlsafe_json_dumps( # type: ignore
parse_payload(body, event, rule_name, task_id, details, phi_data, task=task, tags_list=tags_list))[1:-1])
context.update(phi_data)
webhook_payload = parse_payload(
current_rule.get("notification_payload", ""),
event,
rule_name,
task_id,
details,
context,
task=task,
tags_list=tags_list,
)
send_webhook(webhook_url, webhook_payload)
monitor.send_task_event(
monitor.task_event.NOTIFICATION,
task_id,
0,
webhook_url,
"Announced " + event.name,
)
email_addresses = current_rule.get("notification_email")
if not email_addresses:
return True
if task and config.mercure.phi_notifications:
context = dict(
acc=task.info.acc,
mrn=task.info.mrn,
patient_name=task.info.patient_name,
)
else:
context = {}
email_payload = parse_payload(
current_rule.get("notification_email_body", ""),
event,
rule_name,
task_id,
details,
context,
task=task,
tags_list=tags_list,
)
for email_address in email_addresses.split(", "):
send_email(
email_address,
email_payload,
event,
rule_name,
current_rule.get("notification_email_type", "plain"),
)
monitor.send_task_event(
monitor.task_event.NOTIFICATION,
task_id,
0,
email_address,
"Announced " + event.name,
)
return True