Files
HartOMat/hartomat_mcp_server.py

648 lines
22 KiB
Python

#!/usr/bin/env python3
"""HartOMat MCP Server.
Exposes the render pipeline, product library, material system, and order
management as MCP tools for Claude Code.
Requirements (install once):
uv pip install "mcp[cli]" psycopg2-binary httpx
Register in Claude Code:
claude mcp add hartomat -- python hartomat_mcp_server.py
"""
import json
import os
from datetime import datetime
import httpx
import psycopg2
import psycopg2.extras
from mcp.server.fastmcp import FastMCP
# ── Configuration ────────────────────────────────────────────────────────────
DB_URL = os.environ.get(
"DATABASE_URL",
"postgresql://hartomat:hartomat@localhost:5432/hartomat",
)
API_URL = os.environ.get("API_URL", "http://localhost:8888")
API_EMAIL = os.environ.get("API_EMAIL", "admin@hartomat.com")
API_PASSWORD = os.environ.get("API_PASSWORD", "Admin1234!")
# ── Server setup ─────────────────────────────────────────────────────────────
mcp = FastMCP(
"HartOMat",
instructions=(
"MCP server for the HartOMat render pipeline. "
"Provides tools to query orders, products, materials, render status, "
"worker health, and run read-only SQL against the PostgreSQL database."
),
)
# ── Helpers ──────────────────────────────────────────────────────────────────
def _db_query(sql: str, params: tuple = ()) -> list[dict]:
"""Execute a read-only SQL query and return rows as dicts."""
conn = psycopg2.connect(DB_URL)
try:
with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur:
cur.execute(sql, params)
rows = cur.fetchall()
result = []
for row in rows:
clean = {}
for k, v in row.items():
if isinstance(v, datetime):
clean[k] = v.isoformat()
elif hasattr(v, "__str__") and not isinstance(
v, (str, int, float, bool, type(None), list, dict)
):
clean[k] = str(v)
else:
clean[k] = v
result.append(clean)
return result
finally:
conn.close()
_token_cache: dict[str, str] = {}
def _api_token() -> str:
"""Get a cached API auth token."""
if "token" in _token_cache:
return _token_cache["token"]
resp = httpx.post(
f"{API_URL}/api/auth/login",
json={"email": API_EMAIL, "password": API_PASSWORD},
)
resp.raise_for_status()
_token_cache["token"] = resp.json()["access_token"]
return _token_cache["token"]
def _api_get(path: str) -> dict | list:
"""Authenticated GET against the backend API."""
resp = httpx.get(
f"{API_URL}{path}",
headers={"Authorization": f"Bearer {_api_token()}"},
timeout=30,
)
resp.raise_for_status()
return resp.json()
def _api_post(path: str, body: dict | None = None) -> dict | list:
"""Authenticated POST against the backend API."""
resp = httpx.post(
f"{API_URL}{path}",
headers={"Authorization": f"Bearer {_api_token()}"},
json=body or {},
timeout=60,
)
resp.raise_for_status()
return resp.json()
# ── Tools ────────────────────────────────────────────────────────────────────
@mcp.tool()
def query_database(sql: str) -> str:
"""Execute a read-only SQL query against the HartOMat PostgreSQL database.
Only SELECT queries are allowed. The database contains tables for orders,
order_lines, products, cad_files, materials, material_aliases,
output_types, media_assets, render_templates, and more.
Args:
sql: A SELECT SQL query to execute.
"""
sql_upper = sql.strip().upper()
if not sql_upper.startswith("SELECT") and not sql_upper.startswith("WITH"):
return "Error: Only SELECT/WITH queries are allowed (read-only)."
for kw in ("INSERT", "UPDATE", "DELETE", "DROP", "ALTER", "TRUNCATE", "CREATE"):
# Check outside of string literals (simple heuristic)
check = sql_upper.split("--")[0].split("/*")[0]
if f" {kw} " in f" {check} ":
return f"Error: {kw} statements are not allowed (read-only)."
try:
rows = _db_query(sql)
if not rows:
return "Query returned 0 rows."
return json.dumps(rows[:100], indent=2, default=str)
except Exception as e:
return f"Query error: {e}"
@mcp.tool()
def list_orders(status: str = "", limit: int = 10) -> str:
"""List recent orders with their render progress.
Args:
status: Filter by status (draft, submitted, processing, completed, rejected). Empty for all.
limit: Maximum number of orders (default 10, max 50).
"""
limit = min(limit, 50)
params = f"?limit={limit}"
if status:
params += f"&status={status}"
data = _api_get(f"/api/orders{params}")
summary = []
for o in data:
summary.append({
"id": o["id"],
"order_number": o["order_number"],
"status": o["status"],
"items": o.get("item_count", 0),
"lines": o.get("line_count", 0),
"render_progress": o.get("render_progress"),
"created_at": o["created_at"],
})
return json.dumps(summary, indent=2)
@mcp.tool()
def get_order_detail(order_id: str) -> str:
"""Get detailed information about a specific order including all lines and render statuses.
Args:
order_id: UUID of the order.
"""
data = _api_get(f"/api/orders/{order_id}")
return json.dumps(data, indent=2, default=str)
@mcp.tool()
def search_products(query: str = "", category: str = "", limit: int = 20) -> str:
"""Search products in the product library.
Args:
query: Search text (matches name, PIM-ID, baureihe).
category: Filter by category key.
limit: Max results (default 20).
"""
params = f"?limit={min(limit, 50)}"
if query:
params += f"&q={query}"
if category:
params += f"&category={category}"
data = _api_get(f"/api/products{params}")
summary = []
for p in data:
summary.append({
"id": p["id"],
"name": p["name"],
"pim_id": p.get("pim_id"),
"category": p.get("category_key"),
"has_step": p.get("cad_file_id") is not None,
"processing_status": p.get("processing_status"),
"part_count": len(p.get("cad_part_materials") or []),
})
return json.dumps(summary, indent=2)
@mcp.tool()
def check_materials(order_id: str) -> str:
"""Check if all materials in an order are mapped to library materials.
Returns unmapped materials with suggestions.
Args:
order_id: UUID of the order to check.
"""
data = _api_get(f"/api/orders/{order_id}/check-materials")
return json.dumps(data, indent=2)
@mcp.tool()
def list_materials(include_aliases: bool = False) -> str:
"""List all materials in the material library.
Args:
include_aliases: If true, include alias names for each material.
"""
data = _api_get("/api/materials")
if not include_aliases:
for m in data:
m.pop("aliases", None)
return json.dumps(data, indent=2)
@mcp.tool()
def dispatch_renders(order_id: str) -> str:
"""Dispatch (or retry) renders for all pending/failed lines in an order.
Args:
order_id: UUID of the order.
"""
data = _api_post(f"/api/orders/{order_id}/dispatch-renders")
return json.dumps(data, indent=2)
@mcp.tool()
def get_worker_activity() -> str:
"""Get recent worker activity — last 30 STEP processing and render tasks with timing."""
data = _api_get("/api/worker/activity")
return json.dumps(data, indent=2, default=str)
@mcp.tool()
def get_render_stats() -> str:
"""Get render pipeline statistics: throughput, material coverage, product/order counts."""
data = _api_get("/api/admin/dashboard-stats")
return json.dumps(data, indent=2, default=str)
@mcp.tool()
def list_output_types() -> str:
"""List all configured output types with their render settings."""
data = _api_get("/api/output-types?include_inactive=true")
summary = []
for ot in data:
summary.append({
"id": ot["id"],
"name": ot["name"],
"renderer": ot["renderer"],
"format": ot["output_format"],
"is_animation": ot["is_animation"],
"material_override": ot.get("material_override"),
"is_active": ot["is_active"],
"render_settings": ot.get("render_settings", {}),
})
return json.dumps(summary, indent=2)
@mcp.tool()
def set_material_override(order_id: str, material_name: str = "") -> str:
"""Set a material override on all lines of an order (batch).
All product parts will be rendered with this single material.
Pass empty string to clear the override.
Args:
order_id: UUID of the order.
material_name: HARTOMAT library material name, or empty to clear.
"""
data = _api_post(
f"/api/orders/{order_id}/batch-material-override",
{"material_override": material_name or None},
)
return json.dumps(data, indent=2)
@mcp.tool()
def create_order(
product_ids: list[str],
output_type_id: str = "",
output_type_name: str = "",
render_overrides: dict | None = None,
material_override: str = "",
notes: str = "",
) -> str:
"""Create a new order with the given products and output type.
Either output_type_id or output_type_name must be provided. If both are given,
output_type_id takes priority. render_overrides allows overriding render settings
like output_format, width, height, samples, engine per line.
Args:
product_ids: List of product UUIDs to include in the order.
output_type_id: UUID of the output type (takes priority over name).
output_type_name: Name of the output type (used if output_type_id is empty).
render_overrides: Optional dict of render setting overrides (e.g. {"output_format": "webp", "width": 2048, "height": 2048}).
material_override: Optional HARTOMAT library material name to apply to all lines.
notes: Optional notes for the order.
"""
# Resolve output_type_id from name if needed
ot_id = output_type_id or None
if not ot_id and output_type_name:
rows = _db_query(
"SELECT id FROM output_types WHERE name ILIKE %s AND is_active = true LIMIT 1",
(output_type_name,),
)
if rows:
ot_id = rows[0]["id"]
else:
return f"Error: No active output type found matching '{output_type_name}'."
lines = []
for pid in product_ids:
line: dict = {"product_id": pid}
if ot_id:
line["output_type_id"] = ot_id
if render_overrides:
line["render_overrides"] = render_overrides
if material_override:
line["material_override"] = material_override
lines.append(line)
body: dict = {"lines": lines}
if notes:
body["notes"] = notes
try:
data = _api_post("/api/orders", body)
return json.dumps(
{
"order_id": data["id"],
"order_number": data["order_number"],
"status": data["status"],
"line_count": data.get("line_count", len(lines)),
"render_overrides": render_overrides,
},
indent=2,
)
except Exception as e:
return f"Error creating order: {e}"
@mcp.tool()
def set_render_overrides(order_id: str, render_overrides: dict | None = None) -> str:
"""Set render overrides on all lines of an order (batch).
Overrides any output type render settings at order time. Common overrides:
output_format (png/jpg/webp), width, height, samples, engine (cycles/eevee),
transparent_bg, bg_color, noise_threshold, denoiser.
Pass None/empty to clear all overrides.
Args:
order_id: UUID of the order.
render_overrides: Dict of render setting overrides, or None to clear.
"""
data = _api_post(
f"/api/orders/{order_id}/batch-render-overrides",
{"render_overrides": render_overrides},
)
return json.dumps(data, indent=2)
@mcp.tool()
def get_queue_status() -> str:
"""Get current render queue status — pending, active, completed/failed counts."""
rows = _db_query("""
SELECT
(SELECT count(*) FROM order_lines
WHERE render_status = 'pending') AS pending_renders,
(SELECT count(*) FROM order_lines
WHERE render_status = 'processing') AS active_renders,
(SELECT count(*) FROM order_lines
WHERE render_status = 'completed'
AND render_completed_at > now() - interval '1 hour') AS completed_last_hour,
(SELECT count(*) FROM order_lines
WHERE render_status = 'failed'
AND render_completed_at > now() - interval '1 hour') AS failed_last_hour,
(SELECT count(*) FROM cad_files
WHERE processing_status = 'processing') AS processing_step_files
""")
return json.dumps(rows[0] if rows else {}, indent=2)
@mcp.tool()
def get_product_detail(product_id: str) -> str:
"""Get detailed information about a product: metadata, CAD file, parts, materials, render history.
Args:
product_id: UUID of the product.
"""
rows = _db_query("""
SELECT p.id, p.name, p.pim_id, p.category_key, p.ebene1, p.ebene2,
p.baureihe, p.produkt_baureihe, p.lagertyp, p.name_cad_modell,
p.is_active, p.cad_file_id,
p.cad_part_materials, p.components,
cf.original_name AS step_filename,
cf.processing_status AS step_status,
cf.parsed_objects,
cf.file_hash AS step_hash,
p.created_at, p.updated_at
FROM products p
LEFT JOIN cad_files cf ON cf.id = p.cad_file_id
WHERE p.id = %s
""", (product_id,))
if not rows:
return f"Product {product_id} not found."
product = rows[0]
# Get render history
renders = _db_query("""
SELECT ol.id AS line_id, ol.render_status, ol.result_path,
ol.material_override, ol.render_started_at, ol.render_completed_at,
ol.render_backend_used,
ot.name AS output_type_name,
o.order_number,
ol.render_log->>'engine_used' AS engine,
ol.render_log->>'render_duration_s' AS render_duration_s,
ol.render_log->>'total_duration_s' AS total_duration_s
FROM order_lines ol
JOIN orders o ON o.id = ol.order_id
LEFT JOIN output_types ot ON ot.id = ol.output_type_id
WHERE ol.product_id = %s
ORDER BY ol.render_completed_at DESC NULLS LAST
LIMIT 20
""", (product_id,))
# Get media assets
assets = _db_query("""
SELECT id, asset_type, storage_key, file_size_bytes, mime_type, created_at
FROM media_assets
WHERE product_id = %s
ORDER BY created_at DESC
LIMIT 20
""", (product_id,))
product["renders"] = renders
product["media_assets"] = assets
return json.dumps(product, indent=2, default=str)
@mcp.tool()
def get_render_detail(order_line_id: str) -> str:
"""Get full render metadata for a specific order line: timing, engine, materials, log.
Args:
order_line_id: UUID of the order line.
"""
rows = _db_query("""
SELECT ol.id, ol.order_id, ol.product_id, ol.output_type_id,
ol.render_status, ol.result_path, ol.material_override,
ol.render_started_at, ol.render_completed_at,
ol.render_backend_used, ol.render_log,
ol.unit_price, ol.item_status,
p.name AS product_name, p.pim_id,
ot.name AS output_type_name, ot.renderer, ot.output_format,
ot.render_settings, ot.material_override AS ot_material_override,
o.order_number
FROM order_lines ol
JOIN products p ON p.id = ol.product_id
LEFT JOIN output_types ot ON ot.id = ol.output_type_id
JOIN orders o ON o.id = ol.order_id
WHERE ol.id = %s
""", (order_line_id,))
if not rows:
return f"Order line {order_line_id} not found."
return json.dumps(rows[0], indent=2, default=str)
@mcp.tool()
def get_completed_renders(
product_name: str = "",
order_number: str = "",
output_type: str = "",
limit: int = 20,
) -> str:
"""List completed renders with metadata: product, output type, timing, file path.
Args:
product_name: Filter by product name (partial match).
order_number: Filter by order number (partial match).
output_type: Filter by output type name (partial match).
limit: Max results (default 20, max 100).
"""
limit = min(limit, 100)
conditions = ["ol.render_status = 'completed'"]
params: list = []
if product_name:
conditions.append("p.name ILIKE %s")
params.append(f"%{product_name}%")
if order_number:
conditions.append("o.order_number ILIKE %s")
params.append(f"%{order_number}%")
if output_type:
conditions.append("ot.name ILIKE %s")
params.append(f"%{output_type}%")
where = " AND ".join(conditions)
rows = _db_query(f"""
SELECT ol.id AS line_id, o.order_number,
p.name AS product_name, p.pim_id,
ot.name AS output_type_name, ot.output_format,
ol.result_path, ol.material_override,
ol.render_started_at, ol.render_completed_at,
ol.render_backend_used,
ol.render_log->>'engine_used' AS engine,
ol.render_log->>'render_duration_s' AS render_duration_s,
ol.render_log->>'total_duration_s' AS total_duration_s,
ol.render_log->>'output_size_bytes' AS output_size_bytes
FROM order_lines ol
JOIN products p ON p.id = ol.product_id
LEFT JOIN output_types ot ON ot.id = ol.output_type_id
JOIN orders o ON o.id = ol.order_id
WHERE {where}
ORDER BY ol.render_completed_at DESC
LIMIT %s
""", tuple(params) + (limit,))
return json.dumps(rows, indent=2, default=str)
@mcp.tool()
def get_media_assets(
product_id: str = "",
asset_type: str = "",
limit: int = 30,
) -> str:
"""List media assets (renders, thumbnails, GLBs, USDs) with metadata.
Args:
product_id: Filter by product UUID.
asset_type: Filter by type (thumbnail, still, turntable, gltf_geometry, usd_master, blend_production).
limit: Max results (default 30, max 100).
"""
limit = min(limit, 100)
conditions = ["1=1"]
params: list = []
if product_id:
conditions.append("ma.product_id = %s")
params.append(product_id)
if asset_type:
conditions.append("ma.asset_type = %s")
params.append(asset_type)
where = " AND ".join(conditions)
rows = _db_query(f"""
SELECT ma.id, ma.asset_type, ma.storage_key,
ma.file_size_bytes, ma.mime_type,
ma.product_id, p.name AS product_name, p.pim_id,
ma.order_line_id, ma.cad_file_id,
ma.created_at
FROM media_assets ma
LEFT JOIN products p ON p.id = ma.product_id
WHERE {where}
ORDER BY ma.created_at DESC
LIMIT %s
""", tuple(params) + (limit,))
return json.dumps(rows, indent=2, default=str)
@mcp.tool()
def get_failed_renders(limit: int = 20) -> str:
"""List recently failed renders with error details.
Args:
limit: Max results (default 20).
"""
rows = _db_query("""
SELECT ol.id AS line_id, o.order_number,
p.name AS product_name, p.pim_id,
ot.name AS output_type_name,
ol.render_completed_at AS failed_at,
ol.render_log->>'error' AS error,
ol.render_log->>'engine_used' AS engine,
ol.render_backend_used
FROM order_lines ol
JOIN products p ON p.id = ol.product_id
LEFT JOIN output_types ot ON ot.id = ol.output_type_id
JOIN orders o ON o.id = ol.order_id
WHERE ol.render_status = 'failed'
ORDER BY ol.render_completed_at DESC NULLS LAST
LIMIT %s
""", (min(limit, 50),))
return json.dumps(rows, indent=2, default=str)
# ── Resources ────────────────────────────────────────────────────────────────
@mcp.resource("hartomat://schema")
def get_database_schema() -> str:
"""Database schema overview — table names and column types."""
rows = _db_query("""
SELECT table_name, column_name, data_type, is_nullable
FROM information_schema.columns
WHERE table_schema = 'public'
ORDER BY table_name, ordinal_position
""")
tables: dict[str, list] = {}
for r in rows:
t = r["table_name"]
if t not in tables:
tables[t] = []
tables[t].append(
f" {r['column_name']}: {r['data_type']}"
+ (" (nullable)" if r["is_nullable"] == "YES" else "")
)
lines = []
for table, cols in sorted(tables.items()):
lines.append(f"\n{table}:")
lines.extend(cols)
return "\n".join(lines)
@mcp.resource("hartomat://output-types")
def get_output_types_resource() -> str:
"""All configured output types with settings."""
data = _api_get("/api/output-types?include_inactive=true")
return json.dumps(data, indent=2)
# ── Run ──────────────────────────────────────────────────────────────────────
if __name__ == "__main__":
mcp.run()