Files
HartOMat/schaeffler_mcp_server.py
T
Hartmut 2a9337b8a3 feat: MCP server for Claude Code integration
Exposes the render pipeline, product library, material system, and database
as MCP tools. 12 tools + 2 resources:

Tools: query_database, list_orders, get_order_detail, search_products,
check_materials, list_materials, dispatch_renders, set_material_override,
list_output_types, get_worker_activity, get_render_stats, get_queue_status

Resources: schaeffler://schema, schaeffler://output-types

- Uses FastMCP (Python SDK) with stdio transport
- .mcp.json for automatic team-wide registration
- uv-managed dependencies (no global install needed)
- Documentation in docs/mcp-server.md

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-14 23:46:52 +01:00

357 lines
12 KiB
Python

#!/usr/bin/env python3
"""Schaeffler Automat MCP Server.
Exposes the render pipeline, product library, material system, and order
management as MCP tools for Claude Code.
Requirements (install once):
uv pip install "mcp[cli]" psycopg2-binary httpx
Register in Claude Code:
claude mcp add schaeffler -- python schaeffler_mcp_server.py
"""
import json
import os
from datetime import datetime
import httpx
import psycopg2
import psycopg2.extras
from mcp.server.fastmcp import FastMCP
# ── Configuration ────────────────────────────────────────────────────────────
DB_URL = os.environ.get(
"DATABASE_URL",
"postgresql://schaeffler:schaeffler@localhost:5432/schaeffler",
)
API_URL = os.environ.get("API_URL", "http://localhost:8888")
API_EMAIL = os.environ.get("API_EMAIL", "admin@schaeffler.com")
API_PASSWORD = os.environ.get("API_PASSWORD", "Admin1234!")
# ── Server setup ─────────────────────────────────────────────────────────────
mcp = FastMCP(
"Schaeffler Automat",
instructions=(
"MCP server for the Schaeffler Automat render pipeline. "
"Provides tools to query orders, products, materials, render status, "
"worker health, and run read-only SQL against the PostgreSQL database."
),
)
# ── Helpers ──────────────────────────────────────────────────────────────────
def _db_query(sql: str, params: tuple = ()) -> list[dict]:
"""Execute a read-only SQL query and return rows as dicts."""
conn = psycopg2.connect(DB_URL)
try:
with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur:
cur.execute(sql, params)
rows = cur.fetchall()
result = []
for row in rows:
clean = {}
for k, v in row.items():
if isinstance(v, datetime):
clean[k] = v.isoformat()
elif hasattr(v, "__str__") and not isinstance(
v, (str, int, float, bool, type(None), list, dict)
):
clean[k] = str(v)
else:
clean[k] = v
result.append(clean)
return result
finally:
conn.close()
_token_cache: dict[str, str] = {}
def _api_token() -> str:
"""Get a cached API auth token."""
if "token" in _token_cache:
return _token_cache["token"]
resp = httpx.post(
f"{API_URL}/api/auth/login",
json={"email": API_EMAIL, "password": API_PASSWORD},
)
resp.raise_for_status()
_token_cache["token"] = resp.json()["access_token"]
return _token_cache["token"]
def _api_get(path: str) -> dict | list:
"""Authenticated GET against the backend API."""
resp = httpx.get(
f"{API_URL}{path}",
headers={"Authorization": f"Bearer {_api_token()}"},
timeout=30,
)
resp.raise_for_status()
return resp.json()
def _api_post(path: str, body: dict | None = None) -> dict | list:
"""Authenticated POST against the backend API."""
resp = httpx.post(
f"{API_URL}{path}",
headers={"Authorization": f"Bearer {_api_token()}"},
json=body or {},
timeout=60,
)
resp.raise_for_status()
return resp.json()
# ── Tools ────────────────────────────────────────────────────────────────────
@mcp.tool()
def query_database(sql: str) -> str:
"""Execute a read-only SQL query against the Schaeffler PostgreSQL database.
Only SELECT queries are allowed. The database contains tables for orders,
order_lines, products, cad_files, materials, material_aliases,
output_types, media_assets, render_templates, and more.
Args:
sql: A SELECT SQL query to execute.
"""
sql_upper = sql.strip().upper()
if not sql_upper.startswith("SELECT") and not sql_upper.startswith("WITH"):
return "Error: Only SELECT/WITH queries are allowed (read-only)."
for kw in ("INSERT", "UPDATE", "DELETE", "DROP", "ALTER", "TRUNCATE", "CREATE"):
# Check outside of string literals (simple heuristic)
check = sql_upper.split("--")[0].split("/*")[0]
if f" {kw} " in f" {check} ":
return f"Error: {kw} statements are not allowed (read-only)."
try:
rows = _db_query(sql)
if not rows:
return "Query returned 0 rows."
return json.dumps(rows[:100], indent=2, default=str)
except Exception as e:
return f"Query error: {e}"
@mcp.tool()
def list_orders(status: str = "", limit: int = 10) -> str:
"""List recent orders with their render progress.
Args:
status: Filter by status (draft, submitted, processing, completed, rejected). Empty for all.
limit: Maximum number of orders (default 10, max 50).
"""
limit = min(limit, 50)
params = f"?limit={limit}"
if status:
params += f"&status={status}"
data = _api_get(f"/api/orders{params}")
summary = []
for o in data:
summary.append({
"id": o["id"],
"order_number": o["order_number"],
"status": o["status"],
"items": o.get("item_count", 0),
"lines": o.get("line_count", 0),
"render_progress": o.get("render_progress"),
"created_at": o["created_at"],
})
return json.dumps(summary, indent=2)
@mcp.tool()
def get_order_detail(order_id: str) -> str:
"""Get detailed information about a specific order including all lines and render statuses.
Args:
order_id: UUID of the order.
"""
data = _api_get(f"/api/orders/{order_id}")
return json.dumps(data, indent=2, default=str)
@mcp.tool()
def search_products(query: str = "", category: str = "", limit: int = 20) -> str:
"""Search products in the product library.
Args:
query: Search text (matches name, PIM-ID, baureihe).
category: Filter by category key.
limit: Max results (default 20).
"""
params = f"?limit={min(limit, 50)}"
if query:
params += f"&q={query}"
if category:
params += f"&category={category}"
data = _api_get(f"/api/products{params}")
summary = []
for p in data:
summary.append({
"id": p["id"],
"name": p["name"],
"pim_id": p.get("pim_id"),
"category": p.get("category_key"),
"has_step": p.get("cad_file_id") is not None,
"processing_status": p.get("processing_status"),
"part_count": len(p.get("cad_part_materials") or []),
})
return json.dumps(summary, indent=2)
@mcp.tool()
def check_materials(order_id: str) -> str:
"""Check if all materials in an order are mapped to library materials.
Returns unmapped materials with suggestions.
Args:
order_id: UUID of the order to check.
"""
data = _api_get(f"/api/orders/{order_id}/check-materials")
return json.dumps(data, indent=2)
@mcp.tool()
def list_materials(include_aliases: bool = False) -> str:
"""List all materials in the material library.
Args:
include_aliases: If true, include alias names for each material.
"""
data = _api_get("/api/materials")
if not include_aliases:
for m in data:
m.pop("aliases", None)
return json.dumps(data, indent=2)
@mcp.tool()
def dispatch_renders(order_id: str) -> str:
"""Dispatch (or retry) renders for all pending/failed lines in an order.
Args:
order_id: UUID of the order.
"""
data = _api_post(f"/api/orders/{order_id}/dispatch-renders")
return json.dumps(data, indent=2)
@mcp.tool()
def get_worker_activity() -> str:
"""Get recent worker activity — last 30 STEP processing and render tasks with timing."""
data = _api_get("/api/worker/activity")
return json.dumps(data, indent=2, default=str)
@mcp.tool()
def get_render_stats() -> str:
"""Get render pipeline statistics: throughput, material coverage, product/order counts."""
data = _api_get("/api/admin/dashboard-stats")
return json.dumps(data, indent=2, default=str)
@mcp.tool()
def list_output_types() -> str:
"""List all configured output types with their render settings."""
data = _api_get("/api/output-types?include_inactive=true")
summary = []
for ot in data:
summary.append({
"id": ot["id"],
"name": ot["name"],
"renderer": ot["renderer"],
"format": ot["output_format"],
"is_animation": ot["is_animation"],
"material_override": ot.get("material_override"),
"is_active": ot["is_active"],
"render_settings": ot.get("render_settings", {}),
})
return json.dumps(summary, indent=2)
@mcp.tool()
def set_material_override(order_id: str, material_name: str = "") -> str:
"""Set a material override on all lines of an order (batch).
All product parts will be rendered with this single material.
Pass empty string to clear the override.
Args:
order_id: UUID of the order.
material_name: SCHAEFFLER library material name, or empty to clear.
"""
data = _api_post(
f"/api/orders/{order_id}/batch-material-override",
{"material_override": material_name or None},
)
return json.dumps(data, indent=2)
@mcp.tool()
def get_queue_status() -> str:
"""Get current render queue status — pending, active, completed/failed counts."""
rows = _db_query("""
SELECT
(SELECT count(*) FROM order_lines
WHERE render_status = 'pending') AS pending_renders,
(SELECT count(*) FROM order_lines
WHERE render_status = 'processing') AS active_renders,
(SELECT count(*) FROM order_lines
WHERE render_status = 'completed'
AND render_completed_at > now() - interval '1 hour') AS completed_last_hour,
(SELECT count(*) FROM order_lines
WHERE render_status = 'failed'
AND render_completed_at > now() - interval '1 hour') AS failed_last_hour,
(SELECT count(*) FROM cad_files
WHERE processing_status = 'processing') AS processing_step_files
""")
return json.dumps(rows[0] if rows else {}, indent=2)
# ── Resources ────────────────────────────────────────────────────────────────
@mcp.resource("schaeffler://schema")
def get_database_schema() -> str:
"""Database schema overview — table names and column types."""
rows = _db_query("""
SELECT table_name, column_name, data_type, is_nullable
FROM information_schema.columns
WHERE table_schema = 'public'
ORDER BY table_name, ordinal_position
""")
tables: dict[str, list] = {}
for r in rows:
t = r["table_name"]
if t not in tables:
tables[t] = []
tables[t].append(
f" {r['column_name']}: {r['data_type']}"
+ (" (nullable)" if r["is_nullable"] == "YES" else "")
)
lines = []
for table, cols in sorted(tables.items()):
lines.append(f"\n{table}:")
lines.extend(cols)
return "\n".join(lines)
@mcp.resource("schaeffler://output-types")
def get_output_types_resource() -> str:
"""All configured output types with settings."""
data = _api_get("/api/output-types?include_inactive=true")
return json.dumps(data, indent=2)
# ── Run ──────────────────────────────────────────────────────────────────────
if __name__ == "__main__":
mcp.run()