initial: forecastlab_db read-only MCP server
Read-only MariaDB facade for Claude Code. 4 tools: list_databases / list_tables / describe_table / query. Safety: - 3 databases allowlisted (forecastlab, weewx_db, homeassistant) - SELECT / SHOW / DESCRIBE / EXPLAIN / WITH only - multi-statement injection rejected - 1000 row hard cap (200 default) - prefers MCP_DB_RO_USER if set; falls back to DB_USER Tested: 10/10 read-only guard cases pass.
This commit is contained in:
180
forecastlab_db/server.py
Normal file
180
forecastlab_db/server.py
Normal file
@@ -0,0 +1,180 @@
|
||||
"""
|
||||
forecastlab-db MCP server — read-only SQL access to local MariaDB.
|
||||
|
||||
Exposes:
|
||||
list_databases() → visible DBs (allowlisted)
|
||||
list_tables(database) → tables in a database
|
||||
describe_table(database, table) → column schema
|
||||
query(sql, max_rows=200) → execute a read-only SELECT/SHOW
|
||||
|
||||
Safety:
|
||||
- Only databases in ALLOWED_DATABASES are reachable
|
||||
- Statements other than SELECT / SHOW / DESCRIBE / EXPLAIN are rejected
|
||||
- Multi-statement queries (`;` outside strings) are rejected
|
||||
- Hard cap on returned rows
|
||||
- Connection uses a read-only DB user (set MCP_DB_RO_USER) — falls back to DB_USER
|
||||
|
||||
Created: 2026-04-26
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import os
|
||||
import re
|
||||
import sys
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
from dotenv import load_dotenv
|
||||
from mcp.server.fastmcp import FastMCP
|
||||
|
||||
import mysql.connector
|
||||
from mysql.connector import Error as MySQLError
|
||||
|
||||
load_dotenv("/home/help4bis/.env")
|
||||
|
||||
DB_HOST = os.getenv("DB_HOST", "127.0.0.1")
|
||||
DB_PORT = int(os.getenv("DB_PORT", "3306"))
|
||||
DB_USER = os.getenv("MCP_DB_RO_USER", os.getenv("DB_USER", ""))
|
||||
DB_PASS = os.getenv("MCP_DB_RO_PASSWORD", os.getenv("DB_PASSWORD", ""))
|
||||
|
||||
ALLOWED_DATABASES = {
|
||||
"forecastlab", # forecasts, accuracy, prices, energy, Gann
|
||||
"weewx_db", # real-time weather, archive, lightning
|
||||
"homeassistant", # HA recorder
|
||||
}
|
||||
|
||||
ROW_LIMIT_HARD_CAP = 1000
|
||||
ROW_LIMIT_DEFAULT = 200
|
||||
|
||||
# Match SELECT, SHOW, DESCRIBE, EXPLAIN at start (case-insensitive, after optional whitespace).
|
||||
RE_READ_ONLY = re.compile(r"^\s*(SELECT|SHOW|DESCRIBE|DESC|EXPLAIN|WITH)\b", re.IGNORECASE)
|
||||
# Crude multi-statement guard — more robust would be a real SQL parser.
|
||||
RE_MULTI_STMT = re.compile(r";\s*\S", re.MULTILINE)
|
||||
|
||||
mcp = FastMCP("forecastlab-db")
|
||||
|
||||
|
||||
def _connect(database: str | None = None):
|
||||
if database is not None and database not in ALLOWED_DATABASES:
|
||||
raise ValueError(
|
||||
f"Database {database!r} not allowed. "
|
||||
f"Allowed: {sorted(ALLOWED_DATABASES)}"
|
||||
)
|
||||
return mysql.connector.connect(
|
||||
host=DB_HOST,
|
||||
port=DB_PORT,
|
||||
user=DB_USER,
|
||||
password=DB_PASS,
|
||||
database=database,
|
||||
connection_timeout=10,
|
||||
)
|
||||
|
||||
|
||||
def _validate_read_only(sql: str) -> None:
|
||||
if not sql or not sql.strip():
|
||||
raise ValueError("SQL is empty")
|
||||
if RE_MULTI_STMT.search(sql):
|
||||
raise ValueError("Multi-statement queries are not allowed")
|
||||
if not RE_READ_ONLY.match(sql):
|
||||
raise ValueError(
|
||||
"Only SELECT / SHOW / DESCRIBE / EXPLAIN / WITH are allowed (read-only)"
|
||||
)
|
||||
|
||||
|
||||
@mcp.tool()
|
||||
def list_databases() -> list[str]:
|
||||
"""Return the list of databases this MCP is allowed to query."""
|
||||
return sorted(ALLOWED_DATABASES)
|
||||
|
||||
|
||||
@mcp.tool()
|
||||
def list_tables(database: str) -> list[str]:
|
||||
"""List tables in an allowed database."""
|
||||
if database not in ALLOWED_DATABASES:
|
||||
raise ValueError(
|
||||
f"Database {database!r} not allowed. "
|
||||
f"Allowed: {sorted(ALLOWED_DATABASES)}"
|
||||
)
|
||||
with _connect(database) as conn:
|
||||
with conn.cursor() as cur:
|
||||
cur.execute("SHOW TABLES")
|
||||
return [row[0] for row in cur.fetchall()]
|
||||
|
||||
|
||||
@mcp.tool()
|
||||
def describe_table(database: str, table: str) -> list[dict[str, Any]]:
|
||||
"""Return column schema (Field, Type, Null, Key, Default, Extra) for a table."""
|
||||
if database not in ALLOWED_DATABASES:
|
||||
raise ValueError(
|
||||
f"Database {database!r} not allowed. "
|
||||
f"Allowed: {sorted(ALLOWED_DATABASES)}"
|
||||
)
|
||||
if not re.match(r"^[A-Za-z0-9_]+$", table):
|
||||
raise ValueError("Table name must be alphanumeric / underscore only")
|
||||
with _connect(database) as conn:
|
||||
with conn.cursor(dictionary=True) as cur:
|
||||
cur.execute(f"DESCRIBE `{table}`")
|
||||
return cur.fetchall()
|
||||
|
||||
|
||||
@mcp.tool()
|
||||
def query(
|
||||
sql: str,
|
||||
database: str | None = None,
|
||||
max_rows: int = ROW_LIMIT_DEFAULT,
|
||||
) -> dict[str, Any]:
|
||||
"""Execute a read-only SQL query. Returns {'columns': [...], 'rows': [...], 'truncated': bool}.
|
||||
|
||||
Args:
|
||||
sql: The SELECT / SHOW / DESCRIBE / EXPLAIN / WITH statement.
|
||||
database: Optional database to USE first. Must be in the allowlist.
|
||||
max_rows: Cap on returned rows (default 200, hard ceiling 1000).
|
||||
"""
|
||||
_validate_read_only(sql)
|
||||
if database is not None and database not in ALLOWED_DATABASES:
|
||||
raise ValueError(
|
||||
f"Database {database!r} not allowed. "
|
||||
f"Allowed: {sorted(ALLOWED_DATABASES)}"
|
||||
)
|
||||
cap = min(max(int(max_rows), 1), ROW_LIMIT_HARD_CAP)
|
||||
|
||||
try:
|
||||
with _connect(database) as conn:
|
||||
with conn.cursor(dictionary=True) as cur:
|
||||
cur.execute(sql)
|
||||
rows = cur.fetchmany(cap + 1)
|
||||
truncated = len(rows) > cap
|
||||
rows = rows[:cap]
|
||||
# Get column order from cursor description for stable ordering
|
||||
columns = [d[0] for d in cur.description] if cur.description else []
|
||||
# mysql-connector dictionary cursor returns keys in insertion order
|
||||
# so dict iteration is column order; convert datetimes to ISO strings
|
||||
clean = []
|
||||
for r in rows:
|
||||
clean.append(
|
||||
{
|
||||
k: (v.isoformat() if hasattr(v, "isoformat") else v)
|
||||
for k, v in r.items()
|
||||
}
|
||||
)
|
||||
return {
|
||||
"columns": columns,
|
||||
"rows": clean,
|
||||
"row_count": len(clean),
|
||||
"truncated": truncated,
|
||||
"max_rows_requested": cap,
|
||||
}
|
||||
except MySQLError as e:
|
||||
raise RuntimeError(f"DB error: {e}") from e
|
||||
|
||||
|
||||
def main() -> int:
|
||||
if not DB_USER or not DB_PASS:
|
||||
print("ERROR: DB_USER / DB_PASSWORD missing from /home/help4bis/.env", file=sys.stderr)
|
||||
return 1
|
||||
mcp.run(transport="stdio")
|
||||
return 0
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
sys.exit(main())
|
||||
Reference in New Issue
Block a user