"""
database.py
===========
Database functions needed for the bookkeeper service.
"""
import bookkeeping.config as bk_config
# App-specific includes
import common.monitor as monitor
import databases
# Standard python includes
import sqlalchemy
from common import config
from sqlalchemy.sql import func
# Starlette-related includes
logger = config.get_logger()
###################################################################################
# Definition of database tables
###################################################################################
database: databases.Database
metadata: sqlalchemy.MetaData
mercure_events: sqlalchemy.Table
webgui_events: sqlalchemy.Table
dicom_files: sqlalchemy.Table
dicom_series: sqlalchemy.Table
task_events: sqlalchemy.Table
file_events: sqlalchemy.Table
dicom_series_map: sqlalchemy.Table
series_sequence_data: sqlalchemy.Table
tasks_table: sqlalchemy.Table
tests_table: sqlalchemy.Table
processor_logs_table: sqlalchemy.Table
processor_outputs_table: sqlalchemy.Table
[docs]def init_database(url=None, schema=None) -> databases.Database:
global database, metadata, mercure_events, webgui_events, dicom_files, dicom_series, task_events
global file_events, dicom_series_map, series_sequence_data, tasks_table, tests_table
global processor_logs_table, processor_outputs_table
database = databases.Database(url or bk_config.DATABASE_URL)
metadata = sqlalchemy.MetaData(schema=(schema or bk_config.DATABASE_SCHEMA))
# SQLite does not support JSONB natively, so we use TEXT instead
JSONB = sqlalchemy.types.Text() if 'sqlite://' in (url or bk_config.DATABASE_URL) else sqlalchemy.dialects.postgresql.JSONB
mercure_events = sqlalchemy.Table(
"mercure_events",
metadata,
sqlalchemy.Column("id", sqlalchemy.Integer, primary_key=True, autoincrement=True),
sqlalchemy.Column("time", sqlalchemy.DateTime),
sqlalchemy.Column("sender", sqlalchemy.String, default="Unknown"),
sqlalchemy.Column("event", sqlalchemy.String, default=monitor.m_events.UNKNOWN),
sqlalchemy.Column("severity", sqlalchemy.Integer, default=monitor.severity.INFO),
sqlalchemy.Column("description", sqlalchemy.String, default=""),
)
webgui_events = sqlalchemy.Table(
"webgui_events",
metadata,
sqlalchemy.Column("id", sqlalchemy.Integer, primary_key=True, autoincrement=True),
sqlalchemy.Column("time", sqlalchemy.DateTime),
sqlalchemy.Column("sender", sqlalchemy.String, default="Unknown"),
sqlalchemy.Column("event", sqlalchemy.String, default=monitor.w_events.UNKNOWN),
sqlalchemy.Column("user", sqlalchemy.String, default=""),
sqlalchemy.Column("description", sqlalchemy.String, default=""),
)
dicom_files = sqlalchemy.Table(
"dicom_files",
metadata,
sqlalchemy.Column("id", sqlalchemy.Integer, primary_key=True, autoincrement=True),
sqlalchemy.Column("time", sqlalchemy.DateTime),
sqlalchemy.Column("filename", sqlalchemy.String),
sqlalchemy.Column("file_uid", sqlalchemy.String),
sqlalchemy.Column("series_uid", sqlalchemy.String),
)
dicom_series = sqlalchemy.Table(
"dicom_series",
metadata,
sqlalchemy.Column("id", sqlalchemy.Integer, primary_key=True, autoincrement=True),
sqlalchemy.Column("time", sqlalchemy.DateTime),
sqlalchemy.Column("series_uid", sqlalchemy.String, unique=True),
sqlalchemy.Column("study_uid", sqlalchemy.String),
sqlalchemy.Column("tag_patientname", sqlalchemy.String),
sqlalchemy.Column("tag_patientid", sqlalchemy.String),
sqlalchemy.Column("tag_accessionnumber", sqlalchemy.String),
sqlalchemy.Column("tag_seriesnumber", sqlalchemy.String),
sqlalchemy.Column("tag_studyid", sqlalchemy.String),
sqlalchemy.Column("tag_patientbirthdate", sqlalchemy.String),
sqlalchemy.Column("tag_patientsex", sqlalchemy.String),
sqlalchemy.Column("tag_acquisitiondate", sqlalchemy.String),
sqlalchemy.Column("tag_acquisitiontime", sqlalchemy.String),
sqlalchemy.Column("tag_modality", sqlalchemy.String),
sqlalchemy.Column("tag_bodypartexamined", sqlalchemy.String),
sqlalchemy.Column("tag_studydescription", sqlalchemy.String),
sqlalchemy.Column("tag_seriesdescription", sqlalchemy.String),
sqlalchemy.Column("tag_protocolname", sqlalchemy.String),
sqlalchemy.Column("tag_codevalue", sqlalchemy.String),
sqlalchemy.Column("tag_codemeaning", sqlalchemy.String),
sqlalchemy.Column("tag_sequencename", sqlalchemy.String),
sqlalchemy.Column("tag_scanningsequence", sqlalchemy.String),
sqlalchemy.Column("tag_sequencevariant", sqlalchemy.String),
sqlalchemy.Column("tag_slicethickness", sqlalchemy.String),
sqlalchemy.Column("tag_contrastbolusagent", sqlalchemy.String),
sqlalchemy.Column("tag_referringphysicianname", sqlalchemy.String),
sqlalchemy.Column("tag_manufacturer", sqlalchemy.String),
sqlalchemy.Column("tag_manufacturermodelname", sqlalchemy.String),
sqlalchemy.Column("tag_magneticfieldstrength", sqlalchemy.String),
sqlalchemy.Column("tag_deviceserialnumber", sqlalchemy.String),
sqlalchemy.Column("tag_softwareversions", sqlalchemy.String),
sqlalchemy.Column("tag_stationname", sqlalchemy.String),
)
task_events = sqlalchemy.Table(
"task_events",
metadata,
sqlalchemy.Column("id", sqlalchemy.Integer, primary_key=True, autoincrement=True),
sqlalchemy.Column("task_id", sqlalchemy.String, sqlalchemy.ForeignKey("tasks.id"), nullable=True),
sqlalchemy.Column("time", sqlalchemy.DateTime),
sqlalchemy.Column("sender", sqlalchemy.String, default="Unknown"),
sqlalchemy.Column("event", sqlalchemy.String),
# sqlalchemy.Column("series_uid", sqlalchemy.String),
sqlalchemy.Column("file_count", sqlalchemy.Integer),
sqlalchemy.Column("target", sqlalchemy.String),
sqlalchemy.Column("info", sqlalchemy.String),
sqlalchemy.Column("client_timestamp", sqlalchemy.Integer),
)
file_events = sqlalchemy.Table(
"file_events",
metadata,
sqlalchemy.Column("id", sqlalchemy.Integer, primary_key=True, autoincrement=True),
sqlalchemy.Column("time", sqlalchemy.DateTime),
sqlalchemy.Column("dicom_file", sqlalchemy.Integer),
sqlalchemy.Column("event", sqlalchemy.Integer),
)
dicom_series_map = sqlalchemy.Table(
"dicom_series_map",
metadata,
sqlalchemy.Column("id_file", sqlalchemy.Integer, primary_key=True),
sqlalchemy.Column("id_series", sqlalchemy.Integer),
)
series_sequence_data = sqlalchemy.Table(
"series_sequence_data",
metadata,
sqlalchemy.Column("uid", sqlalchemy.String, primary_key=True),
sqlalchemy.Column("data", sqlalchemy.JSON),
)
tasks_table = sqlalchemy.Table(
"tasks",
metadata,
sqlalchemy.Column("id", sqlalchemy.String, primary_key=True),
sqlalchemy.Column("parent_id", sqlalchemy.String, nullable=True),
sqlalchemy.Column("time", sqlalchemy.DateTime),
sqlalchemy.Column("series_uid", sqlalchemy.String, nullable=True),
sqlalchemy.Column("study_uid", sqlalchemy.String, nullable=True),
sqlalchemy.Column("data", JSONB),
)
tests_table = sqlalchemy.Table(
"tests",
metadata,
sqlalchemy.Column("id", sqlalchemy.String, primary_key=True),
sqlalchemy.Column("type", sqlalchemy.String, nullable=True),
sqlalchemy.Column("rule_type", sqlalchemy.String, nullable=True),
sqlalchemy.Column("time_begin", sqlalchemy.DateTime, nullable=True),
sqlalchemy.Column("time_end", sqlalchemy.DateTime, nullable=True),
sqlalchemy.Column("status", sqlalchemy.String, nullable=True),
sqlalchemy.Column("task_id", sqlalchemy.String, nullable=True),
sqlalchemy.Column("data", JSONB, nullable=True),
)
processor_logs_table = sqlalchemy.Table(
"processor_logs",
metadata,
sqlalchemy.Column("id", sqlalchemy.Integer, primary_key=True, autoincrement=True),
sqlalchemy.Column("task_id", sqlalchemy.String, sqlalchemy.ForeignKey("tasks.id"), nullable=True),
sqlalchemy.Column("module_name", sqlalchemy.String, nullable=True),
sqlalchemy.Column("logs", sqlalchemy.String, nullable=True),
sqlalchemy.Column("time", sqlalchemy.DateTime, nullable=True),
)
processor_outputs_table = sqlalchemy.Table(
"processor_outputs",
metadata,
sqlalchemy.Column("id", sqlalchemy.Integer, primary_key=True, autoincrement=True),
sqlalchemy.Column("time", sqlalchemy.DateTime(timezone=True), server_default=func.now()),
sqlalchemy.Column("task_id", sqlalchemy.String, sqlalchemy.ForeignKey("tasks.id"), nullable=True),
sqlalchemy.Column("task_acc", sqlalchemy.String),
sqlalchemy.Column("task_mrn", sqlalchemy.String),
sqlalchemy.Column("module", sqlalchemy.String),
sqlalchemy.Column("index", sqlalchemy.Integer),
sqlalchemy.Column("settings", JSONB),
sqlalchemy.Column("output", JSONB),
)
return database