""" forecastlab-db MCP server — read-only SQL access to local MariaDB. Exposes: list_databases() → visible DBs (allowlisted) list_tables(database) → tables in a database describe_table(database, table) → column schema query(sql, max_rows=200) → execute a read-only SELECT/SHOW Safety: - Only databases in ALLOWED_DATABASES are reachable - Statements other than SELECT / SHOW / DESCRIBE / EXPLAIN are rejected - Multi-statement queries (`;` outside strings) are rejected - Hard cap on returned rows - Connection uses a read-only DB user (set MCP_DB_RO_USER) — falls back to DB_USER Created: 2026-04-26 """ from __future__ import annotations import os import re import sys from pathlib import Path from typing import Any from dotenv import load_dotenv from mcp.server.fastmcp import FastMCP import mysql.connector from mysql.connector import Error as MySQLError load_dotenv("/home/help4bis/.env") DB_HOST = os.getenv("DB_HOST", "127.0.0.1") DB_PORT = int(os.getenv("DB_PORT", "3306")) DB_USER = os.getenv("MCP_DB_RO_USER", os.getenv("DB_USER", "")) DB_PASS = os.getenv("MCP_DB_RO_PASSWORD", os.getenv("DB_PASSWORD", "")) ALLOWED_DATABASES = { "forecastlab", # forecasts, accuracy, prices, energy, Gann "weewx_db", # real-time weather, archive, lightning "homeassistant", # HA recorder } ROW_LIMIT_HARD_CAP = 1000 ROW_LIMIT_DEFAULT = 200 # Match SELECT, SHOW, DESCRIBE, EXPLAIN at start (case-insensitive, after optional whitespace). RE_READ_ONLY = re.compile(r"^\s*(SELECT|SHOW|DESCRIBE|DESC|EXPLAIN|WITH)\b", re.IGNORECASE) # Crude multi-statement guard — more robust would be a real SQL parser. RE_MULTI_STMT = re.compile(r";\s*\S", re.MULTILINE) mcp = FastMCP("forecastlab-db") def _connect(database: str | None = None): if database is not None and database not in ALLOWED_DATABASES: raise ValueError( f"Database {database!r} not allowed. " f"Allowed: {sorted(ALLOWED_DATABASES)}" ) return mysql.connector.connect( host=DB_HOST, port=DB_PORT, user=DB_USER, password=DB_PASS, database=database, connection_timeout=10, ) def _validate_read_only(sql: str) -> None: if not sql or not sql.strip(): raise ValueError("SQL is empty") if RE_MULTI_STMT.search(sql): raise ValueError("Multi-statement queries are not allowed") if not RE_READ_ONLY.match(sql): raise ValueError( "Only SELECT / SHOW / DESCRIBE / EXPLAIN / WITH are allowed (read-only)" ) @mcp.tool() def list_databases() -> list[str]: """Return the list of databases this MCP is allowed to query.""" return sorted(ALLOWED_DATABASES) @mcp.tool() def list_tables(database: str) -> list[str]: """List tables in an allowed database.""" if database not in ALLOWED_DATABASES: raise ValueError( f"Database {database!r} not allowed. " f"Allowed: {sorted(ALLOWED_DATABASES)}" ) with _connect(database) as conn: with conn.cursor() as cur: cur.execute("SHOW TABLES") return [row[0] for row in cur.fetchall()] @mcp.tool() def describe_table(database: str, table: str) -> list[dict[str, Any]]: """Return column schema (Field, Type, Null, Key, Default, Extra) for a table.""" if database not in ALLOWED_DATABASES: raise ValueError( f"Database {database!r} not allowed. " f"Allowed: {sorted(ALLOWED_DATABASES)}" ) if not re.match(r"^[A-Za-z0-9_]+$", table): raise ValueError("Table name must be alphanumeric / underscore only") with _connect(database) as conn: with conn.cursor(dictionary=True) as cur: cur.execute(f"DESCRIBE `{table}`") return cur.fetchall() @mcp.tool() def query( sql: str, database: str | None = None, max_rows: int = ROW_LIMIT_DEFAULT, ) -> dict[str, Any]: """Execute a read-only SQL query. Returns {'columns': [...], 'rows': [...], 'truncated': bool}. Args: sql: The SELECT / SHOW / DESCRIBE / EXPLAIN / WITH statement. database: Optional database to USE first. Must be in the allowlist. max_rows: Cap on returned rows (default 200, hard ceiling 1000). """ _validate_read_only(sql) if database is not None and database not in ALLOWED_DATABASES: raise ValueError( f"Database {database!r} not allowed. " f"Allowed: {sorted(ALLOWED_DATABASES)}" ) cap = min(max(int(max_rows), 1), ROW_LIMIT_HARD_CAP) try: with _connect(database) as conn: with conn.cursor(dictionary=True) as cur: cur.execute(sql) rows = cur.fetchmany(cap + 1) truncated = len(rows) > cap rows = rows[:cap] # Get column order from cursor description for stable ordering columns = [d[0] for d in cur.description] if cur.description else [] # mysql-connector dictionary cursor returns keys in insertion order # so dict iteration is column order; convert datetimes to ISO strings clean = [] for r in rows: clean.append( { k: (v.isoformat() if hasattr(v, "isoformat") else v) for k, v in r.items() } ) return { "columns": columns, "rows": clean, "row_count": len(clean), "truncated": truncated, "max_rows_requested": cap, } except MySQLError as e: raise RuntimeError(f"DB error: {e}") from e def main() -> int: if not DB_USER or not DB_PASS: print("ERROR: DB_USER / DB_PASSWORD missing from /home/help4bis/.env", file=sys.stderr) return 1 mcp.run(transport="stdio") return 0 if __name__ == "__main__": sys.exit(main())