| import sqlite3 |
| import os |
| import json |
| from typing import Optional |
|
|
| try: |
| import psycopg2 |
| from psycopg2 import pool |
| except ImportError: |
| psycopg2 = None |
|
|
| DB_FILE = os.getenv("MEP_SQLITE_PATH", "ledger.db") |
| DB_URL = os.getenv("MEP_DATABASE_URL") |
| PG_POOL_MIN = int(os.getenv("MEP_PG_POOL_MIN", "1")) |
| PG_POOL_MAX = int(os.getenv("MEP_PG_POOL_MAX", "5")) |
| _pg_pool: Optional["pool.SimpleConnectionPool"] = None |
|
|
| def _is_postgres() -> bool: |
| return bool(DB_URL) |
|
|
| def _get_pg_pool(): |
| global _pg_pool |
| if _pg_pool is None: |
| if psycopg2 is None: |
| raise RuntimeError("psycopg2 is required for Postgres") |
| _pg_pool = pool.SimpleConnectionPool(PG_POOL_MIN, PG_POOL_MAX, DB_URL) |
| return _pg_pool |
|
|
| def _get_conn(): |
| if _is_postgres(): |
| return _get_pg_pool().getconn() |
| return sqlite3.connect(DB_FILE, check_same_thread=False) |
|
|
| def _release_conn(conn): |
| if _is_postgres(): |
| _get_pg_pool().putconn(conn) |
| else: |
| conn.close() |
|
|
| def _row_to_dict(cursor, row): |
| if row is None: |
| return None |
| columns = [desc[0] for desc in cursor.description] |
| return dict(zip(columns, row)) |
|
|
| def init_db(): |
| conn = _get_conn() |
| cursor = conn.cursor() |
| cursor.execute(''' |
| CREATE TABLE IF NOT EXISTS ledger ( |
| node_id TEXT PRIMARY KEY, |
| pub_pem TEXT NOT NULL, |
| balance REAL NOT NULL |
| ) |
| ''') |
| cursor.execute(''' |
| CREATE TABLE IF NOT EXISTS tasks ( |
| task_id TEXT PRIMARY KEY, |
| consumer_id TEXT NOT NULL, |
| provider_id TEXT, |
| payload TEXT NOT NULL, |
| bounty REAL NOT NULL, |
| status TEXT NOT NULL, |
| target_node TEXT, |
| model_requirement TEXT, |
| result_payload TEXT, |
| created_at REAL NOT NULL, |
| updated_at REAL NOT NULL |
| ) |
| ''') |
| cursor.execute(''' |
| CREATE TABLE IF NOT EXISTS idempotency ( |
| node_id TEXT NOT NULL, |
| endpoint TEXT NOT NULL, |
| idem_key TEXT NOT NULL, |
| response TEXT NOT NULL, |
| status_code INTEGER NOT NULL, |
| created_at REAL NOT NULL, |
| PRIMARY KEY (node_id, endpoint, idem_key) |
| ) |
| ''') |
| conn.commit() |
| _release_conn(conn) |
|
|
| def register_node(node_id: str, pub_pem: str) -> float: |
| conn = _get_conn() |
| cursor = conn.cursor() |
| if _is_postgres(): |
| cursor.execute("SELECT balance FROM ledger WHERE node_id = %s", (node_id,)) |
| else: |
| cursor.execute("SELECT balance FROM ledger WHERE node_id = ?", (node_id,)) |
| row = cursor.fetchone() |
| if not row: |
| if _is_postgres(): |
| cursor.execute( |
| "INSERT INTO ledger (node_id, pub_pem, balance) VALUES (%s, %s, %s) ON CONFLICT (node_id) DO NOTHING", |
| (node_id, pub_pem, 10.0) |
| ) |
| else: |
| cursor.execute( |
| "INSERT OR IGNORE INTO ledger (node_id, pub_pem, balance) VALUES (?, ?, ?)", |
| (node_id, pub_pem, 10.0) |
| ) |
| conn.commit() |
| if _is_postgres(): |
| cursor.execute("SELECT balance FROM ledger WHERE node_id = %s", (node_id,)) |
| else: |
| cursor.execute("SELECT balance FROM ledger WHERE node_id = ?", (node_id,)) |
| row = cursor.fetchone() |
| _release_conn(conn) |
| return row[0] if row else 10.0 |
|
|
| def get_pub_pem(node_id: str) -> Optional[str]: |
| conn = _get_conn() |
| cursor = conn.cursor() |
| if _is_postgres(): |
| cursor.execute("SELECT pub_pem FROM ledger WHERE node_id = %s", (node_id,)) |
| else: |
| cursor.execute("SELECT pub_pem FROM ledger WHERE node_id = ?", (node_id,)) |
| row = cursor.fetchone() |
| _release_conn(conn) |
| return row[0] if row else None |
|
|
| def get_balance(node_id: str) -> Optional[float]: |
| conn = _get_conn() |
| cursor = conn.cursor() |
| if _is_postgres(): |
| cursor.execute("SELECT balance FROM ledger WHERE node_id = %s", (node_id,)) |
| else: |
| cursor.execute("SELECT balance FROM ledger WHERE node_id = ?", (node_id,)) |
| row = cursor.fetchone() |
| _release_conn(conn) |
| return row[0] if row else None |
|
|
| def set_balance(node_id: str, balance: float): |
| conn = _get_conn() |
| cursor = conn.cursor() |
| if _is_postgres(): |
| cursor.execute("UPDATE ledger SET balance = %s WHERE node_id = %s", (balance, node_id)) |
| else: |
| cursor.execute("UPDATE ledger SET balance = ? WHERE node_id = ?", (balance, node_id)) |
| conn.commit() |
| _release_conn(conn) |
|
|
| def add_balance(node_id: str, amount: float): |
| conn = _get_conn() |
| cursor = conn.cursor() |
| if _is_postgres(): |
| cursor.execute("UPDATE ledger SET balance = balance + %s WHERE node_id = %s", (amount, node_id)) |
| else: |
| cursor.execute("UPDATE ledger SET balance = balance + ? WHERE node_id = ?", (amount, node_id)) |
| conn.commit() |
| _release_conn(conn) |
|
|
| def deduct_balance(node_id: str, amount: float) -> bool: |
| conn = _get_conn() |
| cursor = conn.cursor() |
| if _is_postgres(): |
| cursor.execute( |
| "UPDATE ledger SET balance = balance - %s WHERE node_id = %s AND balance >= %s", |
| (amount, node_id, amount) |
| ) |
| else: |
| cursor.execute( |
| "UPDATE ledger SET balance = balance - ? WHERE node_id = ? AND balance >= ?", |
| (amount, node_id, amount) |
| ) |
| updated = cursor.rowcount |
| conn.commit() |
| _release_conn(conn) |
| return updated > 0 |
|
|
| def create_task(task_id: str, consumer_id: str, payload: str, bounty: float, status: str, target_node: Optional[str], model_requirement: Optional[str], created_at: float): |
| conn = _get_conn() |
| cursor = conn.cursor() |
| if _is_postgres(): |
| cursor.execute( |
| "INSERT INTO tasks (task_id, consumer_id, provider_id, payload, bounty, status, target_node, model_requirement, result_payload, created_at, updated_at) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)", |
| (task_id, consumer_id, None, payload, bounty, status, target_node, model_requirement, None, created_at, created_at) |
| ) |
| else: |
| cursor.execute( |
| "INSERT INTO tasks (task_id, consumer_id, provider_id, payload, bounty, status, target_node, model_requirement, result_payload, created_at, updated_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", |
| (task_id, consumer_id, None, payload, bounty, status, target_node, model_requirement, None, created_at, created_at) |
| ) |
| conn.commit() |
| _release_conn(conn) |
|
|
| def update_task_assignment(task_id: str, provider_id: str, status: str, updated_at: float): |
| conn = _get_conn() |
| cursor = conn.cursor() |
| if _is_postgres(): |
| cursor.execute( |
| "UPDATE tasks SET provider_id = %s, status = %s, updated_at = %s WHERE task_id = %s", |
| (provider_id, status, updated_at, task_id) |
| ) |
| else: |
| cursor.execute( |
| "UPDATE tasks SET provider_id = ?, status = ?, updated_at = ? WHERE task_id = ?", |
| (provider_id, status, updated_at, task_id) |
| ) |
| conn.commit() |
| _release_conn(conn) |
|
|
| def update_task_result(task_id: str, provider_id: str, result_payload: str, status: str, updated_at: float): |
| conn = _get_conn() |
| cursor = conn.cursor() |
| if _is_postgres(): |
| cursor.execute( |
| "UPDATE tasks SET provider_id = %s, result_payload = %s, status = %s, updated_at = %s WHERE task_id = %s", |
| (provider_id, result_payload, status, updated_at, task_id) |
| ) |
| else: |
| cursor.execute( |
| "UPDATE tasks SET provider_id = ?, result_payload = ?, status = ?, updated_at = ? WHERE task_id = ?", |
| (provider_id, result_payload, status, updated_at, task_id) |
| ) |
| conn.commit() |
| _release_conn(conn) |
|
|
| def get_task(task_id: str) -> Optional[dict]: |
| conn = _get_conn() |
| if not _is_postgres(): |
| conn.row_factory = sqlite3.Row |
| cursor = conn.cursor() |
| if _is_postgres(): |
| cursor.execute("SELECT * FROM tasks WHERE task_id = %s", (task_id,)) |
| else: |
| cursor.execute("SELECT * FROM tasks WHERE task_id = ?", (task_id,)) |
| row = cursor.fetchone() |
| if not row: |
| _release_conn(conn) |
| return None |
| if _is_postgres(): |
| result = _row_to_dict(cursor, row) |
| _release_conn(conn) |
| return result |
| result = dict(row) |
| _release_conn(conn) |
| return result |
|
|
| def get_active_tasks() -> list: |
| conn = _get_conn() |
| if not _is_postgres(): |
| conn.row_factory = sqlite3.Row |
| cursor = conn.cursor() |
| if _is_postgres(): |
| cursor.execute("SELECT * FROM tasks WHERE status IN ('bidding', 'assigned')") |
| else: |
| cursor.execute("SELECT * FROM tasks WHERE status IN ('bidding', 'assigned')") |
| rows = cursor.fetchall() |
| if _is_postgres(): |
| result = [_row_to_dict(cursor, row) for row in rows] |
| _release_conn(conn) |
| return result |
| result = [dict(row) for row in rows] |
| _release_conn(conn) |
| return result |
|
|
| def get_idempotency(node_id: str, endpoint: str, idem_key: str) -> Optional[dict]: |
| conn = _get_conn() |
| cursor = conn.cursor() |
| if _is_postgres(): |
| cursor.execute( |
| "SELECT response, status_code FROM idempotency WHERE node_id = %s AND endpoint = %s AND idem_key = %s", |
| (node_id, endpoint, idem_key) |
| ) |
| else: |
| cursor.execute( |
| "SELECT response, status_code FROM idempotency WHERE node_id = ? AND endpoint = ? AND idem_key = ?", |
| (node_id, endpoint, idem_key) |
| ) |
| row = cursor.fetchone() |
| if not row: |
| _release_conn(conn) |
| return None |
| response = json.loads(row[0]) |
| result = {"response": response, "status_code": row[1]} |
| _release_conn(conn) |
| return result |
|
|
| def set_idempotency(node_id: str, endpoint: str, idem_key: str, response: dict, status_code: int, created_at: float): |
| conn = _get_conn() |
| cursor = conn.cursor() |
| payload = json.dumps(response) |
| if _is_postgres(): |
| cursor.execute( |
| "INSERT INTO idempotency (node_id, endpoint, idem_key, response, status_code, created_at) VALUES (%s, %s, %s, %s, %s, %s) ON CONFLICT (node_id, endpoint, idem_key) DO NOTHING", |
| (node_id, endpoint, idem_key, payload, status_code, created_at) |
| ) |
| else: |
| cursor.execute( |
| "INSERT OR IGNORE INTO idempotency (node_id, endpoint, idem_key, response, status_code, created_at) VALUES (?, ?, ?, ?, ?, ?)", |
| (node_id, endpoint, idem_key, payload, status_code, created_at) |
| ) |
| conn.commit() |
| _release_conn(conn) |
|
|
| init_db() |
|
|