Read-only MariaDB facade for Claude Code. 4 tools: list_databases / list_tables / describe_table / query. Safety: - 3 databases allowlisted (forecastlab, weewx_db, homeassistant) - SELECT / SHOW / DESCRIBE / EXPLAIN / WITH only - multi-statement injection rejected - 1000 row hard cap (200 default) - prefers MCP_DB_RO_USER if set; falls back to DB_USER Tested: 10/10 read-only guard cases pass.
181 lines
6.0 KiB
Python
181 lines
6.0 KiB
Python
"""
|
|
forecastlab-db MCP server — read-only SQL access to local MariaDB.
|
|
|
|
Exposes:
|
|
list_databases() → visible DBs (allowlisted)
|
|
list_tables(database) → tables in a database
|
|
describe_table(database, table) → column schema
|
|
query(sql, max_rows=200) → execute a read-only SELECT/SHOW
|
|
|
|
Safety:
|
|
- Only databases in ALLOWED_DATABASES are reachable
|
|
- Statements other than SELECT / SHOW / DESCRIBE / EXPLAIN are rejected
|
|
- Multi-statement queries (`;` outside strings) are rejected
|
|
- Hard cap on returned rows
|
|
- Connection uses a read-only DB user (set MCP_DB_RO_USER) — falls back to DB_USER
|
|
|
|
Created: 2026-04-26
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import os
|
|
import re
|
|
import sys
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
from dotenv import load_dotenv
|
|
from mcp.server.fastmcp import FastMCP
|
|
|
|
import mysql.connector
|
|
from mysql.connector import Error as MySQLError
|
|
|
|
load_dotenv("/home/help4bis/.env")
|
|
|
|
DB_HOST = os.getenv("DB_HOST", "127.0.0.1")
|
|
DB_PORT = int(os.getenv("DB_PORT", "3306"))
|
|
DB_USER = os.getenv("MCP_DB_RO_USER", os.getenv("DB_USER", ""))
|
|
DB_PASS = os.getenv("MCP_DB_RO_PASSWORD", os.getenv("DB_PASSWORD", ""))
|
|
|
|
ALLOWED_DATABASES = {
|
|
"forecastlab", # forecasts, accuracy, prices, energy, Gann
|
|
"weewx_db", # real-time weather, archive, lightning
|
|
"homeassistant", # HA recorder
|
|
}
|
|
|
|
ROW_LIMIT_HARD_CAP = 1000
|
|
ROW_LIMIT_DEFAULT = 200
|
|
|
|
# Match SELECT, SHOW, DESCRIBE, EXPLAIN at start (case-insensitive, after optional whitespace).
|
|
RE_READ_ONLY = re.compile(r"^\s*(SELECT|SHOW|DESCRIBE|DESC|EXPLAIN|WITH)\b", re.IGNORECASE)
|
|
# Crude multi-statement guard — more robust would be a real SQL parser.
|
|
RE_MULTI_STMT = re.compile(r";\s*\S", re.MULTILINE)
|
|
|
|
mcp = FastMCP("forecastlab-db")
|
|
|
|
|
|
def _connect(database: str | None = None):
|
|
if database is not None and database not in ALLOWED_DATABASES:
|
|
raise ValueError(
|
|
f"Database {database!r} not allowed. "
|
|
f"Allowed: {sorted(ALLOWED_DATABASES)}"
|
|
)
|
|
return mysql.connector.connect(
|
|
host=DB_HOST,
|
|
port=DB_PORT,
|
|
user=DB_USER,
|
|
password=DB_PASS,
|
|
database=database,
|
|
connection_timeout=10,
|
|
)
|
|
|
|
|
|
def _validate_read_only(sql: str) -> None:
|
|
if not sql or not sql.strip():
|
|
raise ValueError("SQL is empty")
|
|
if RE_MULTI_STMT.search(sql):
|
|
raise ValueError("Multi-statement queries are not allowed")
|
|
if not RE_READ_ONLY.match(sql):
|
|
raise ValueError(
|
|
"Only SELECT / SHOW / DESCRIBE / EXPLAIN / WITH are allowed (read-only)"
|
|
)
|
|
|
|
|
|
@mcp.tool()
|
|
def list_databases() -> list[str]:
|
|
"""Return the list of databases this MCP is allowed to query."""
|
|
return sorted(ALLOWED_DATABASES)
|
|
|
|
|
|
@mcp.tool()
|
|
def list_tables(database: str) -> list[str]:
|
|
"""List tables in an allowed database."""
|
|
if database not in ALLOWED_DATABASES:
|
|
raise ValueError(
|
|
f"Database {database!r} not allowed. "
|
|
f"Allowed: {sorted(ALLOWED_DATABASES)}"
|
|
)
|
|
with _connect(database) as conn:
|
|
with conn.cursor() as cur:
|
|
cur.execute("SHOW TABLES")
|
|
return [row[0] for row in cur.fetchall()]
|
|
|
|
|
|
@mcp.tool()
|
|
def describe_table(database: str, table: str) -> list[dict[str, Any]]:
|
|
"""Return column schema (Field, Type, Null, Key, Default, Extra) for a table."""
|
|
if database not in ALLOWED_DATABASES:
|
|
raise ValueError(
|
|
f"Database {database!r} not allowed. "
|
|
f"Allowed: {sorted(ALLOWED_DATABASES)}"
|
|
)
|
|
if not re.match(r"^[A-Za-z0-9_]+$", table):
|
|
raise ValueError("Table name must be alphanumeric / underscore only")
|
|
with _connect(database) as conn:
|
|
with conn.cursor(dictionary=True) as cur:
|
|
cur.execute(f"DESCRIBE `{table}`")
|
|
return cur.fetchall()
|
|
|
|
|
|
@mcp.tool()
|
|
def query(
|
|
sql: str,
|
|
database: str | None = None,
|
|
max_rows: int = ROW_LIMIT_DEFAULT,
|
|
) -> dict[str, Any]:
|
|
"""Execute a read-only SQL query. Returns {'columns': [...], 'rows': [...], 'truncated': bool}.
|
|
|
|
Args:
|
|
sql: The SELECT / SHOW / DESCRIBE / EXPLAIN / WITH statement.
|
|
database: Optional database to USE first. Must be in the allowlist.
|
|
max_rows: Cap on returned rows (default 200, hard ceiling 1000).
|
|
"""
|
|
_validate_read_only(sql)
|
|
if database is not None and database not in ALLOWED_DATABASES:
|
|
raise ValueError(
|
|
f"Database {database!r} not allowed. "
|
|
f"Allowed: {sorted(ALLOWED_DATABASES)}"
|
|
)
|
|
cap = min(max(int(max_rows), 1), ROW_LIMIT_HARD_CAP)
|
|
|
|
try:
|
|
with _connect(database) as conn:
|
|
with conn.cursor(dictionary=True) as cur:
|
|
cur.execute(sql)
|
|
rows = cur.fetchmany(cap + 1)
|
|
truncated = len(rows) > cap
|
|
rows = rows[:cap]
|
|
# Get column order from cursor description for stable ordering
|
|
columns = [d[0] for d in cur.description] if cur.description else []
|
|
# mysql-connector dictionary cursor returns keys in insertion order
|
|
# so dict iteration is column order; convert datetimes to ISO strings
|
|
clean = []
|
|
for r in rows:
|
|
clean.append(
|
|
{
|
|
k: (v.isoformat() if hasattr(v, "isoformat") else v)
|
|
for k, v in r.items()
|
|
}
|
|
)
|
|
return {
|
|
"columns": columns,
|
|
"rows": clean,
|
|
"row_count": len(clean),
|
|
"truncated": truncated,
|
|
"max_rows_requested": cap,
|
|
}
|
|
except MySQLError as e:
|
|
raise RuntimeError(f"DB error: {e}") from e
|
|
|
|
|
|
def main() -> int:
|
|
if not DB_USER or not DB_PASS:
|
|
print("ERROR: DB_USER / DB_PASSWORD missing from /home/help4bis/.env", file=sys.stderr)
|
|
return 1
|
|
mcp.run(transport="stdio")
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
sys.exit(main())
|