File size: 4,771 Bytes
c32bf13
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
import sqlite3
import json
from datetime import datetime
from pathlib import Path

DB_PATH = Path(__file__).parent.parent / "hospital_copilot.db"


def get_conn():
    conn = sqlite3.connect(DB_PATH)
    conn.row_factory = sqlite3.Row
    return conn


def init_db():
    with get_conn() as conn:
        conn.executescript("""
            CREATE TABLE IF NOT EXISTS patients (
                id          INTEGER PRIMARY KEY AUTOINCREMENT,
                name        TEXT NOT NULL,
                dob         TEXT,
                gender      TEXT,
                phone       TEXT,
                language    TEXT DEFAULT 'en',
                created_at  TEXT DEFAULT (datetime('now'))
            );

            CREATE TABLE IF NOT EXISTS sessions (
                id          INTEGER PRIMARY KEY AUTOINCREMENT,
                patient_id  INTEGER NOT NULL REFERENCES patients(id),
                doctor      TEXT,
                date        TEXT DEFAULT (datetime('now')),
                transcript  TEXT,
                status      TEXT DEFAULT 'open'
            );

            CREATE TABLE IF NOT EXISTS notes (
                id          INTEGER PRIMARY KEY AUTOINCREMENT,
                session_id  INTEGER NOT NULL REFERENCES sessions(id),
                soap_note   TEXT,
                summary_en  TEXT,
                summary_twi TEXT,
                created_at  TEXT DEFAULT (datetime('now'))
            );

            CREATE TABLE IF NOT EXISTS symptoms (
                id          INTEGER PRIMARY KEY AUTOINCREMENT,
                session_id  INTEGER NOT NULL REFERENCES sessions(id),
                data        TEXT,
                created_at  TEXT DEFAULT (datetime('now'))
            );
        """)


# --- Patient helpers ---

def create_patient(name: str, dob: str = "", gender: str = "", phone: str = "", language: str = "en") -> int:
    with get_conn() as conn:
        cur = conn.execute(
            "INSERT INTO patients (name, dob, gender, phone, language) VALUES (?, ?, ?, ?, ?)",
            (name, dob, gender, phone, language),
        )
        return cur.lastrowid


def get_all_patients() -> list[dict]:
    with get_conn() as conn:
        rows = conn.execute("SELECT * FROM patients ORDER BY name").fetchall()
        return [dict(r) for r in rows]


def get_patient(patient_id: int) -> dict | None:
    with get_conn() as conn:
        row = conn.execute("SELECT * FROM patients WHERE id = ?", (patient_id,)).fetchone()
        return dict(row) if row else None


# --- Session helpers ---

def create_session(patient_id: int, doctor: str = "Dr. Unknown") -> int:
    with get_conn() as conn:
        cur = conn.execute(
            "INSERT INTO sessions (patient_id, doctor) VALUES (?, ?)",
            (patient_id, doctor),
        )
        return cur.lastrowid


def update_transcript(session_id: int, transcript: str):
    with get_conn() as conn:
        conn.execute(
            "UPDATE sessions SET transcript = ? WHERE id = ?",
            (transcript, session_id),
        )


def close_session(session_id: int):
    with get_conn() as conn:
        conn.execute(
            "UPDATE sessions SET status = 'closed' WHERE id = ?",
            (session_id,),
        )


def get_sessions_for_patient(patient_id: int) -> list[dict]:
    with get_conn() as conn:
        rows = conn.execute(
            "SELECT * FROM sessions WHERE patient_id = ? ORDER BY date DESC",
            (patient_id,),
        ).fetchall()
        return [dict(r) for r in rows]


# --- Notes helpers ---

def save_note(session_id: int, soap_note: str, summary_en: str, summary_twi: str) -> int:
    with get_conn() as conn:
        cur = conn.execute(
            "INSERT INTO notes (session_id, soap_note, summary_en, summary_twi) VALUES (?, ?, ?, ?)",
            (session_id, soap_note, summary_en, summary_twi),
        )
        return cur.lastrowid


def get_note_for_session(session_id: int) -> dict | None:
    with get_conn() as conn:
        row = conn.execute(
            "SELECT * FROM notes WHERE session_id = ? ORDER BY created_at DESC LIMIT 1",
            (session_id,),
        ).fetchone()
        return dict(row) if row else None


# --- Symptom helpers ---

def save_symptoms(session_id: int, symptoms: dict):
    with get_conn() as conn:
        conn.execute(
            "INSERT INTO symptoms (session_id, data) VALUES (?, ?)",
            (session_id, json.dumps(symptoms)),
        )


def get_symptoms_for_session(session_id: int) -> dict:
    with get_conn() as conn:
        row = conn.execute(
            "SELECT data FROM symptoms WHERE session_id = ? ORDER BY created_at DESC LIMIT 1",
            (session_id,),
        ).fetchone()
        return json.loads(row["data"]) if row else {}