Files
HartOMat/schaeffler_mcp_server.py
T
Hartmut b892f72f7e feat: per-line render overrides — override any output type setting at order time
Instead of duplicating output types for every variation (WebP vs PNG,
different resolution), keep one canonical output type and override
specific fields per order line via render_overrides JSONB.

Backend:
- render_overrides JSONB column on OrderLine (DB migration)
- Render task merges overrides with output type settings (format, width,
  height, samples, engine, denoiser, transparent_bg, cycles_device)
- POST /orders/{id}/batch-render-overrides endpoint for bulk override
- PatchLineBody accepts render_overrides for per-line patching

Frontend:
- Batch render overrides section on OrderDetail: output format dropdown
  (PNG/JPG/WebP) + resolution dropdown (512-4096)
- Clear button to remove overrides

MCP:
- create_order tool: accepts product_ids, output_type, render_overrides,
  material_override — enables "render all products as WebP" via Claude
- set_render_overrides tool: batch override on existing orders

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-15 12:26:38 +01:00

648 lines
22 KiB
Python

#!/usr/bin/env python3
"""Schaeffler Automat MCP Server.
Exposes the render pipeline, product library, material system, and order
management as MCP tools for Claude Code.
Requirements (install once):
uv pip install "mcp[cli]" psycopg2-binary httpx
Register in Claude Code:
claude mcp add schaeffler -- python schaeffler_mcp_server.py
"""
import json
import os
from datetime import datetime
import httpx
import psycopg2
import psycopg2.extras
from mcp.server.fastmcp import FastMCP
# ── Configuration ────────────────────────────────────────────────────────────
DB_URL = os.environ.get(
"DATABASE_URL",
"postgresql://schaeffler:schaeffler@localhost:5432/schaeffler",
)
API_URL = os.environ.get("API_URL", "http://localhost:8888")
API_EMAIL = os.environ.get("API_EMAIL", "admin@schaeffler.com")
API_PASSWORD = os.environ.get("API_PASSWORD", "Admin1234!")
# ── Server setup ─────────────────────────────────────────────────────────────
mcp = FastMCP(
"Schaeffler Automat",
instructions=(
"MCP server for the Schaeffler Automat render pipeline. "
"Provides tools to query orders, products, materials, render status, "
"worker health, and run read-only SQL against the PostgreSQL database."
),
)
# ── Helpers ──────────────────────────────────────────────────────────────────
def _db_query(sql: str, params: tuple = ()) -> list[dict]:
"""Execute a read-only SQL query and return rows as dicts."""
conn = psycopg2.connect(DB_URL)
try:
with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur:
cur.execute(sql, params)
rows = cur.fetchall()
result = []
for row in rows:
clean = {}
for k, v in row.items():
if isinstance(v, datetime):
clean[k] = v.isoformat()
elif hasattr(v, "__str__") and not isinstance(
v, (str, int, float, bool, type(None), list, dict)
):
clean[k] = str(v)
else:
clean[k] = v
result.append(clean)
return result
finally:
conn.close()
_token_cache: dict[str, str] = {}
def _api_token() -> str:
"""Get a cached API auth token."""
if "token" in _token_cache:
return _token_cache["token"]
resp = httpx.post(
f"{API_URL}/api/auth/login",
json={"email": API_EMAIL, "password": API_PASSWORD},
)
resp.raise_for_status()
_token_cache["token"] = resp.json()["access_token"]
return _token_cache["token"]
def _api_get(path: str) -> dict | list:
"""Authenticated GET against the backend API."""
resp = httpx.get(
f"{API_URL}{path}",
headers={"Authorization": f"Bearer {_api_token()}"},
timeout=30,
)
resp.raise_for_status()
return resp.json()
def _api_post(path: str, body: dict | None = None) -> dict | list:
"""Authenticated POST against the backend API."""
resp = httpx.post(
f"{API_URL}{path}",
headers={"Authorization": f"Bearer {_api_token()}"},
json=body or {},
timeout=60,
)
resp.raise_for_status()
return resp.json()
# ── Tools ────────────────────────────────────────────────────────────────────
@mcp.tool()
def query_database(sql: str) -> str:
"""Execute a read-only SQL query against the Schaeffler PostgreSQL database.
Only SELECT queries are allowed. The database contains tables for orders,
order_lines, products, cad_files, materials, material_aliases,
output_types, media_assets, render_templates, and more.
Args:
sql: A SELECT SQL query to execute.
"""
sql_upper = sql.strip().upper()
if not sql_upper.startswith("SELECT") and not sql_upper.startswith("WITH"):
return "Error: Only SELECT/WITH queries are allowed (read-only)."
for kw in ("INSERT", "UPDATE", "DELETE", "DROP", "ALTER", "TRUNCATE", "CREATE"):
# Check outside of string literals (simple heuristic)
check = sql_upper.split("--")[0].split("/*")[0]
if f" {kw} " in f" {check} ":
return f"Error: {kw} statements are not allowed (read-only)."
try:
rows = _db_query(sql)
if not rows:
return "Query returned 0 rows."
return json.dumps(rows[:100], indent=2, default=str)
except Exception as e:
return f"Query error: {e}"
@mcp.tool()
def list_orders(status: str = "", limit: int = 10) -> str:
"""List recent orders with their render progress.
Args:
status: Filter by status (draft, submitted, processing, completed, rejected). Empty for all.
limit: Maximum number of orders (default 10, max 50).
"""
limit = min(limit, 50)
params = f"?limit={limit}"
if status:
params += f"&status={status}"
data = _api_get(f"/api/orders{params}")
summary = []
for o in data:
summary.append({
"id": o["id"],
"order_number": o["order_number"],
"status": o["status"],
"items": o.get("item_count", 0),
"lines": o.get("line_count", 0),
"render_progress": o.get("render_progress"),
"created_at": o["created_at"],
})
return json.dumps(summary, indent=2)
@mcp.tool()
def get_order_detail(order_id: str) -> str:
"""Get detailed information about a specific order including all lines and render statuses.
Args:
order_id: UUID of the order.
"""
data = _api_get(f"/api/orders/{order_id}")
return json.dumps(data, indent=2, default=str)
@mcp.tool()
def search_products(query: str = "", category: str = "", limit: int = 20) -> str:
"""Search products in the product library.
Args:
query: Search text (matches name, PIM-ID, baureihe).
category: Filter by category key.
limit: Max results (default 20).
"""
params = f"?limit={min(limit, 50)}"
if query:
params += f"&q={query}"
if category:
params += f"&category={category}"
data = _api_get(f"/api/products{params}")
summary = []
for p in data:
summary.append({
"id": p["id"],
"name": p["name"],
"pim_id": p.get("pim_id"),
"category": p.get("category_key"),
"has_step": p.get("cad_file_id") is not None,
"processing_status": p.get("processing_status"),
"part_count": len(p.get("cad_part_materials") or []),
})
return json.dumps(summary, indent=2)
@mcp.tool()
def check_materials(order_id: str) -> str:
"""Check if all materials in an order are mapped to library materials.
Returns unmapped materials with suggestions.
Args:
order_id: UUID of the order to check.
"""
data = _api_get(f"/api/orders/{order_id}/check-materials")
return json.dumps(data, indent=2)
@mcp.tool()
def list_materials(include_aliases: bool = False) -> str:
"""List all materials in the material library.
Args:
include_aliases: If true, include alias names for each material.
"""
data = _api_get("/api/materials")
if not include_aliases:
for m in data:
m.pop("aliases", None)
return json.dumps(data, indent=2)
@mcp.tool()
def dispatch_renders(order_id: str) -> str:
"""Dispatch (or retry) renders for all pending/failed lines in an order.
Args:
order_id: UUID of the order.
"""
data = _api_post(f"/api/orders/{order_id}/dispatch-renders")
return json.dumps(data, indent=2)
@mcp.tool()
def get_worker_activity() -> str:
"""Get recent worker activity — last 30 STEP processing and render tasks with timing."""
data = _api_get("/api/worker/activity")
return json.dumps(data, indent=2, default=str)
@mcp.tool()
def get_render_stats() -> str:
"""Get render pipeline statistics: throughput, material coverage, product/order counts."""
data = _api_get("/api/admin/dashboard-stats")
return json.dumps(data, indent=2, default=str)
@mcp.tool()
def list_output_types() -> str:
"""List all configured output types with their render settings."""
data = _api_get("/api/output-types?include_inactive=true")
summary = []
for ot in data:
summary.append({
"id": ot["id"],
"name": ot["name"],
"renderer": ot["renderer"],
"format": ot["output_format"],
"is_animation": ot["is_animation"],
"material_override": ot.get("material_override"),
"is_active": ot["is_active"],
"render_settings": ot.get("render_settings", {}),
})
return json.dumps(summary, indent=2)
@mcp.tool()
def set_material_override(order_id: str, material_name: str = "") -> str:
"""Set a material override on all lines of an order (batch).
All product parts will be rendered with this single material.
Pass empty string to clear the override.
Args:
order_id: UUID of the order.
material_name: SCHAEFFLER library material name, or empty to clear.
"""
data = _api_post(
f"/api/orders/{order_id}/batch-material-override",
{"material_override": material_name or None},
)
return json.dumps(data, indent=2)
@mcp.tool()
def create_order(
product_ids: list[str],
output_type_id: str = "",
output_type_name: str = "",
render_overrides: dict | None = None,
material_override: str = "",
notes: str = "",
) -> str:
"""Create a new order with the given products and output type.
Either output_type_id or output_type_name must be provided. If both are given,
output_type_id takes priority. render_overrides allows overriding render settings
like output_format, width, height, samples, engine per line.
Args:
product_ids: List of product UUIDs to include in the order.
output_type_id: UUID of the output type (takes priority over name).
output_type_name: Name of the output type (used if output_type_id is empty).
render_overrides: Optional dict of render setting overrides (e.g. {"output_format": "webp", "width": 2048, "height": 2048}).
material_override: Optional SCHAEFFLER library material name to apply to all lines.
notes: Optional notes for the order.
"""
# Resolve output_type_id from name if needed
ot_id = output_type_id or None
if not ot_id and output_type_name:
rows = _db_query(
"SELECT id FROM output_types WHERE name ILIKE %s AND is_active = true LIMIT 1",
(output_type_name,),
)
if rows:
ot_id = rows[0]["id"]
else:
return f"Error: No active output type found matching '{output_type_name}'."
lines = []
for pid in product_ids:
line: dict = {"product_id": pid}
if ot_id:
line["output_type_id"] = ot_id
if render_overrides:
line["render_overrides"] = render_overrides
if material_override:
line["material_override"] = material_override
lines.append(line)
body: dict = {"lines": lines}
if notes:
body["notes"] = notes
try:
data = _api_post("/api/orders", body)
return json.dumps(
{
"order_id": data["id"],
"order_number": data["order_number"],
"status": data["status"],
"line_count": data.get("line_count", len(lines)),
"render_overrides": render_overrides,
},
indent=2,
)
except Exception as e:
return f"Error creating order: {e}"
@mcp.tool()
def set_render_overrides(order_id: str, render_overrides: dict | None = None) -> str:
"""Set render overrides on all lines of an order (batch).
Overrides any output type render settings at order time. Common overrides:
output_format (png/jpg/webp), width, height, samples, engine (cycles/eevee),
transparent_bg, bg_color, noise_threshold, denoiser.
Pass None/empty to clear all overrides.
Args:
order_id: UUID of the order.
render_overrides: Dict of render setting overrides, or None to clear.
"""
data = _api_post(
f"/api/orders/{order_id}/batch-render-overrides",
{"render_overrides": render_overrides},
)
return json.dumps(data, indent=2)
@mcp.tool()
def get_queue_status() -> str:
"""Get current render queue status — pending, active, completed/failed counts."""
rows = _db_query("""
SELECT
(SELECT count(*) FROM order_lines
WHERE render_status = 'pending') AS pending_renders,
(SELECT count(*) FROM order_lines
WHERE render_status = 'processing') AS active_renders,
(SELECT count(*) FROM order_lines
WHERE render_status = 'completed'
AND render_completed_at > now() - interval '1 hour') AS completed_last_hour,
(SELECT count(*) FROM order_lines
WHERE render_status = 'failed'
AND render_completed_at > now() - interval '1 hour') AS failed_last_hour,
(SELECT count(*) FROM cad_files
WHERE processing_status = 'processing') AS processing_step_files
""")
return json.dumps(rows[0] if rows else {}, indent=2)
@mcp.tool()
def get_product_detail(product_id: str) -> str:
"""Get detailed information about a product: metadata, CAD file, parts, materials, render history.
Args:
product_id: UUID of the product.
"""
rows = _db_query("""
SELECT p.id, p.name, p.pim_id, p.category_key, p.ebene1, p.ebene2,
p.baureihe, p.produkt_baureihe, p.lagertyp, p.name_cad_modell,
p.is_active, p.cad_file_id,
p.cad_part_materials, p.components,
cf.original_name AS step_filename,
cf.processing_status AS step_status,
cf.parsed_objects,
cf.file_hash AS step_hash,
p.created_at, p.updated_at
FROM products p
LEFT JOIN cad_files cf ON cf.id = p.cad_file_id
WHERE p.id = %s
""", (product_id,))
if not rows:
return f"Product {product_id} not found."
product = rows[0]
# Get render history
renders = _db_query("""
SELECT ol.id AS line_id, ol.render_status, ol.result_path,
ol.material_override, ol.render_started_at, ol.render_completed_at,
ol.render_backend_used,
ot.name AS output_type_name,
o.order_number,
ol.render_log->>'engine_used' AS engine,
ol.render_log->>'render_duration_s' AS render_duration_s,
ol.render_log->>'total_duration_s' AS total_duration_s
FROM order_lines ol
JOIN orders o ON o.id = ol.order_id
LEFT JOIN output_types ot ON ot.id = ol.output_type_id
WHERE ol.product_id = %s
ORDER BY ol.render_completed_at DESC NULLS LAST
LIMIT 20
""", (product_id,))
# Get media assets
assets = _db_query("""
SELECT id, asset_type, storage_key, file_size_bytes, mime_type, created_at
FROM media_assets
WHERE product_id = %s
ORDER BY created_at DESC
LIMIT 20
""", (product_id,))
product["renders"] = renders
product["media_assets"] = assets
return json.dumps(product, indent=2, default=str)
@mcp.tool()
def get_render_detail(order_line_id: str) -> str:
"""Get full render metadata for a specific order line: timing, engine, materials, log.
Args:
order_line_id: UUID of the order line.
"""
rows = _db_query("""
SELECT ol.id, ol.order_id, ol.product_id, ol.output_type_id,
ol.render_status, ol.result_path, ol.material_override,
ol.render_started_at, ol.render_completed_at,
ol.render_backend_used, ol.render_log,
ol.unit_price, ol.item_status,
p.name AS product_name, p.pim_id,
ot.name AS output_type_name, ot.renderer, ot.output_format,
ot.render_settings, ot.material_override AS ot_material_override,
o.order_number
FROM order_lines ol
JOIN products p ON p.id = ol.product_id
LEFT JOIN output_types ot ON ot.id = ol.output_type_id
JOIN orders o ON o.id = ol.order_id
WHERE ol.id = %s
""", (order_line_id,))
if not rows:
return f"Order line {order_line_id} not found."
return json.dumps(rows[0], indent=2, default=str)
@mcp.tool()
def get_completed_renders(
product_name: str = "",
order_number: str = "",
output_type: str = "",
limit: int = 20,
) -> str:
"""List completed renders with metadata: product, output type, timing, file path.
Args:
product_name: Filter by product name (partial match).
order_number: Filter by order number (partial match).
output_type: Filter by output type name (partial match).
limit: Max results (default 20, max 100).
"""
limit = min(limit, 100)
conditions = ["ol.render_status = 'completed'"]
params: list = []
if product_name:
conditions.append("p.name ILIKE %s")
params.append(f"%{product_name}%")
if order_number:
conditions.append("o.order_number ILIKE %s")
params.append(f"%{order_number}%")
if output_type:
conditions.append("ot.name ILIKE %s")
params.append(f"%{output_type}%")
where = " AND ".join(conditions)
rows = _db_query(f"""
SELECT ol.id AS line_id, o.order_number,
p.name AS product_name, p.pim_id,
ot.name AS output_type_name, ot.output_format,
ol.result_path, ol.material_override,
ol.render_started_at, ol.render_completed_at,
ol.render_backend_used,
ol.render_log->>'engine_used' AS engine,
ol.render_log->>'render_duration_s' AS render_duration_s,
ol.render_log->>'total_duration_s' AS total_duration_s,
ol.render_log->>'output_size_bytes' AS output_size_bytes
FROM order_lines ol
JOIN products p ON p.id = ol.product_id
LEFT JOIN output_types ot ON ot.id = ol.output_type_id
JOIN orders o ON o.id = ol.order_id
WHERE {where}
ORDER BY ol.render_completed_at DESC
LIMIT %s
""", tuple(params) + (limit,))
return json.dumps(rows, indent=2, default=str)
@mcp.tool()
def get_media_assets(
product_id: str = "",
asset_type: str = "",
limit: int = 30,
) -> str:
"""List media assets (renders, thumbnails, GLBs, USDs) with metadata.
Args:
product_id: Filter by product UUID.
asset_type: Filter by type (thumbnail, still, turntable, gltf_geometry, usd_master, blend_production).
limit: Max results (default 30, max 100).
"""
limit = min(limit, 100)
conditions = ["1=1"]
params: list = []
if product_id:
conditions.append("ma.product_id = %s")
params.append(product_id)
if asset_type:
conditions.append("ma.asset_type = %s")
params.append(asset_type)
where = " AND ".join(conditions)
rows = _db_query(f"""
SELECT ma.id, ma.asset_type, ma.storage_key,
ma.file_size_bytes, ma.mime_type,
ma.product_id, p.name AS product_name, p.pim_id,
ma.order_line_id, ma.cad_file_id,
ma.created_at
FROM media_assets ma
LEFT JOIN products p ON p.id = ma.product_id
WHERE {where}
ORDER BY ma.created_at DESC
LIMIT %s
""", tuple(params) + (limit,))
return json.dumps(rows, indent=2, default=str)
@mcp.tool()
def get_failed_renders(limit: int = 20) -> str:
"""List recently failed renders with error details.
Args:
limit: Max results (default 20).
"""
rows = _db_query("""
SELECT ol.id AS line_id, o.order_number,
p.name AS product_name, p.pim_id,
ot.name AS output_type_name,
ol.render_completed_at AS failed_at,
ol.render_log->>'error' AS error,
ol.render_log->>'engine_used' AS engine,
ol.render_backend_used
FROM order_lines ol
JOIN products p ON p.id = ol.product_id
LEFT JOIN output_types ot ON ot.id = ol.output_type_id
JOIN orders o ON o.id = ol.order_id
WHERE ol.render_status = 'failed'
ORDER BY ol.render_completed_at DESC NULLS LAST
LIMIT %s
""", (min(limit, 50),))
return json.dumps(rows, indent=2, default=str)
# ── Resources ────────────────────────────────────────────────────────────────
@mcp.resource("schaeffler://schema")
def get_database_schema() -> str:
"""Database schema overview — table names and column types."""
rows = _db_query("""
SELECT table_name, column_name, data_type, is_nullable
FROM information_schema.columns
WHERE table_schema = 'public'
ORDER BY table_name, ordinal_position
""")
tables: dict[str, list] = {}
for r in rows:
t = r["table_name"]
if t not in tables:
tables[t] = []
tables[t].append(
f" {r['column_name']}: {r['data_type']}"
+ (" (nullable)" if r["is_nullable"] == "YES" else "")
)
lines = []
for table, cols in sorted(tables.items()):
lines.append(f"\n{table}:")
lines.extend(cols)
return "\n".join(lines)
@mcp.resource("schaeffler://output-types")
def get_output_types_resource() -> str:
"""All configured output types with settings."""
data = _api_get("/api/output-types?include_inactive=true")
return json.dumps(data, indent=2)
# ── Run ──────────────────────────────────────────────────────────────────────
if __name__ == "__main__":
mcp.run()