"""
helper.py
=========
Various internal helper functions for mercure.
"""
# Standard python includes
import asyncio
import inspect
import os
import re
import threading
from datetime import datetime
from datetime import time as _time
from pathlib import Path
from typing import Callable, Optional, Tuple
import common.influxdb
import dateutil
import graphyte
# Global variable to broadcast when the process should terminate
terminate = False
loop = asyncio.get_event_loop()
[docs]def validate_folders(config) -> Tuple[bool, str]:
for folder in (config.incoming_folder, config.studies_folder, config.outgoing_folder,
config.success_folder, config.error_folder, config.discard_folder,
config.processing_folder, config.jobs_folder):
if not Path(folder).is_dir():
try:
Path(folder).mkdir(parents=False)
print(f"Created directory: {folder}")
except Exception:
return False, f"Folder {folder} does not exist."
if not os.access(folder, os.R_OK | os.W_OK):
return False, f"No read/write access to {folder}"
return True, ""
[docs]def localize_log_timestamps(logstring: str, config) -> str:
if config.mercure.local_time == "UTC":
return logstring
try:
local_tz = dateutil.tz.gettz(config.mercure.local_time) # type: ignore
except Exception:
return logstring
timestamp_pattern = re.compile(r'^(\S+)(.*?)$', re.MULTILINE)
def replace_timestamp(match):
timestamp, rest_of_line = match.groups()
try:
parsed_dt = dateutil.parser.isoparse(timestamp) # type: ignore
dt_localtime: datetime = parsed_dt.astimezone(local_tz)
localized_timestamp = dt_localtime.isoformat(timespec='seconds')
return f"{localized_timestamp}{rest_of_line}"
except Exception:
return match.group(0) # Return the original line if parsing fails
return timestamp_pattern.sub(replace_timestamp, logstring)
[docs]def get_now_str() -> str:
"""Returns the current time as string with mercure-wide formatting"""
return datetime.now().strftime("%Y-%m-%d %H:%M:%S")
[docs]def get_runner() -> str:
"""Returns the name of the mechanism that is used for running mercure
in the current installation (systemd, docker, nomad)."""
return os.getenv("MERCURE_RUNNER", "systemd")
[docs]def trigger_terminate() -> None:
"""Trigger that the processing loop should terminate after finishing the currently active task."""
global terminate
terminate = True
[docs]def is_terminated() -> bool:
"""Checks if the process will terminate after the current task."""
return terminate
[docs]def send_to_graphite(*args, **kwargs) -> None:
"""Wrapper for asynchronous graphite call to avoid wait time of main loop."""
if graphyte.default_sender is None:
return
graphyte.default_sender.send(*args, **kwargs)
[docs]def send_to_influxdb(*args, **kwargs) -> None:
"""Wrapper for asynchronous influxdb call to avoid wait time of main loop."""
if common.influxdb.default_sender is None:
return
common.influxdb.default_sender.send(*args, **kwargs)
[docs]def g_log(*args, **kwargs) -> None:
global loop
"""Sends diagnostic information to graphite (if configured)."""
try:
loop = asyncio.get_running_loop()
loop.call_soon(send_to_graphite, *args, **kwargs)
loop.call_soon(send_to_influxdb, *args, **kwargs)
except Exception:
send_to_graphite(*args, **kwargs)
send_to_influxdb(*args, **kwargs)
def _is_offpeak(offpeak_start: str, offpeak_end: str, current_time: _time) -> bool:
"""Check if the provided time is within the offpeak time range."""
try:
start_time = datetime.strptime(offpeak_start, "%H:%M").time()
end_time = datetime.strptime(offpeak_end, "%H:%M").time()
except Exception:
print(f"Unable to parse offpeak time: {offpeak_start}, {offpeak_end}", None) # handle_error
return True
if start_time < end_time:
return current_time >= start_time and current_time <= end_time
# End time is after midnight
return current_time >= start_time or current_time <= end_time
[docs]class AsyncTimer(object):
def __init__(self, interval: int, func):
self.func = func
self.time = interval
self.is_running = False
self._task: Optional[asyncio.Task] = None
[docs] def start(self) -> None:
if not self.is_running:
self.is_running = True
# Start task to call func periodically:
self._task = asyncio.ensure_future(self._run())
[docs] def stop(self) -> None:
"""Signal to stop after the current"""
self.is_running = False
async def _run(self) -> None:
global terminate
while self.is_running:
await asyncio.sleep(self.time)
if terminate:
self.stop()
if not self.is_running:
break
if inspect.isawaitable(res := self.func()):
await res
[docs] def run_until_complete(self, loop=None) -> None:
self.start()
if not self._task:
raise Exception("Unexpected error: AsyncTimer._task is None")
loop = loop or asyncio.get_event_loop()
loop.run_until_complete(self._task)
[docs]class RepeatedTimer(object):
"""
Helper class for running a continuous timer that is suspended
while the worker function is running
"""
_timer: Optional[threading.Timer]
def __init__(self, interval: float, function: Callable, exit_function: Callable, *args, **kwargs):
self._timer = None
self.interval = interval
self.function = function
self.exit_function = exit_function
self.args = args
self.kwargs = kwargs
self.is_running = False
def _run(self) -> None:
"""Callback function for the timer event. Will execute the defined function and restart
the timer after completion, unless the eventloop has been asked to shut down."""
global terminate
self.is_running = False
self.function(*self.args, **self.kwargs)
if not terminate:
self.start()
else:
self.exit_function(*self.args, **self.kwargs)
[docs] def start(self) -> None:
"""Starts the timer for triggering the calllback after the defined interval."""
if not self.is_running:
self._timer = threading.Timer(self.interval, self._run)
assert self._timer is not None
self._timer.start()
self.is_running = True
[docs] def stop(self) -> None:
"""Stops the timer and executes the defined exit callback function."""
if self.is_running:
assert self._timer is not None
self._timer.cancel()
self.is_running = False
self.exit_function(*self.args, **self.kwargs)
[docs]class FileLock:
"""Helper class that implements a file lock. The lock file will be removed also from the destructor so that
no spurious lock files remain if exceptions are raised."""
lockCreated = False
def __init__(self, path_for_lockfile: Path):
self.lockfile = path_for_lockfile
# TODO: Handle case if lock file cannot be created
self.lockfile.touch(exist_ok=False)
self.lockCreated = True
# Destructor to ensure that the lock file gets deleted
# if the calling function is left somewhere as result
# of an unhandled exception
def __del__(self) -> None:
self.free()
[docs] def free(self) -> None:
if self.lockCreated:
try:
self.lockfile.unlink()
except FileNotFoundError:
# Lock file was already removed by someone else
pass
self.lockCreated = False