#!/usr/bin/env python3 """HartOMat MCP Server. Exposes the render pipeline, product library, material system, and order management as MCP tools for Claude Code. Requirements (install once): uv pip install "mcp[cli]" psycopg2-binary httpx Register in Claude Code: claude mcp add hartomat -- python hartomat_mcp_server.py """ import json import os from datetime import datetime import httpx import psycopg2 import psycopg2.extras from mcp.server.fastmcp import FastMCP # ── Configuration ──────────────────────────────────────────────────────────── DB_URL = os.environ.get( "DATABASE_URL", "postgresql://hartomat:hartomat@localhost:5432/hartomat", ) API_URL = os.environ.get("API_URL", "http://localhost:8888") API_EMAIL = os.environ.get("API_EMAIL", "admin@hartomat.com") API_PASSWORD = os.environ.get("API_PASSWORD", "Admin1234!") # ── Server setup ───────────────────────────────────────────────────────────── mcp = FastMCP( "HartOMat", instructions=( "MCP server for the HartOMat render pipeline. " "Provides tools to query orders, products, materials, render status, " "worker health, and run read-only SQL against the PostgreSQL database." ), ) # ── Helpers ────────────────────────────────────────────────────────────────── def _db_query(sql: str, params: tuple = ()) -> list[dict]: """Execute a read-only SQL query and return rows as dicts.""" conn = psycopg2.connect(DB_URL) try: with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur: cur.execute(sql, params) rows = cur.fetchall() result = [] for row in rows: clean = {} for k, v in row.items(): if isinstance(v, datetime): clean[k] = v.isoformat() elif hasattr(v, "__str__") and not isinstance( v, (str, int, float, bool, type(None), list, dict) ): clean[k] = str(v) else: clean[k] = v result.append(clean) return result finally: conn.close() _token_cache: dict[str, str] = {} def _api_token() -> str: """Get a cached API auth token.""" if "token" in _token_cache: return _token_cache["token"] resp = httpx.post( f"{API_URL}/api/auth/login", json={"email": API_EMAIL, "password": API_PASSWORD}, ) resp.raise_for_status() _token_cache["token"] = resp.json()["access_token"] return _token_cache["token"] def _api_get(path: str) -> dict | list: """Authenticated GET against the backend API.""" resp = httpx.get( f"{API_URL}{path}", headers={"Authorization": f"Bearer {_api_token()}"}, timeout=30, ) resp.raise_for_status() return resp.json() def _api_post(path: str, body: dict | None = None) -> dict | list: """Authenticated POST against the backend API.""" resp = httpx.post( f"{API_URL}{path}", headers={"Authorization": f"Bearer {_api_token()}"}, json=body or {}, timeout=60, ) resp.raise_for_status() return resp.json() # ── Tools ──────────────────────────────────────────────────────────────────── @mcp.tool() def query_database(sql: str) -> str: """Execute a read-only SQL query against the HartOMat PostgreSQL database. Only SELECT queries are allowed. The database contains tables for orders, order_lines, products, cad_files, materials, material_aliases, output_types, media_assets, render_templates, and more. Args: sql: A SELECT SQL query to execute. """ sql_upper = sql.strip().upper() if not sql_upper.startswith("SELECT") and not sql_upper.startswith("WITH"): return "Error: Only SELECT/WITH queries are allowed (read-only)." for kw in ("INSERT", "UPDATE", "DELETE", "DROP", "ALTER", "TRUNCATE", "CREATE"): # Check outside of string literals (simple heuristic) check = sql_upper.split("--")[0].split("/*")[0] if f" {kw} " in f" {check} ": return f"Error: {kw} statements are not allowed (read-only)." try: rows = _db_query(sql) if not rows: return "Query returned 0 rows." return json.dumps(rows[:100], indent=2, default=str) except Exception as e: return f"Query error: {e}" @mcp.tool() def list_orders(status: str = "", limit: int = 10) -> str: """List recent orders with their render progress. Args: status: Filter by status (draft, submitted, processing, completed, rejected). Empty for all. limit: Maximum number of orders (default 10, max 50). """ limit = min(limit, 50) params = f"?limit={limit}" if status: params += f"&status={status}" data = _api_get(f"/api/orders{params}") summary = [] for o in data: summary.append({ "id": o["id"], "order_number": o["order_number"], "status": o["status"], "items": o.get("item_count", 0), "lines": o.get("line_count", 0), "render_progress": o.get("render_progress"), "created_at": o["created_at"], }) return json.dumps(summary, indent=2) @mcp.tool() def get_order_detail(order_id: str) -> str: """Get detailed information about a specific order including all lines and render statuses. Args: order_id: UUID of the order. """ data = _api_get(f"/api/orders/{order_id}") return json.dumps(data, indent=2, default=str) @mcp.tool() def search_products(query: str = "", category: str = "", limit: int = 20) -> str: """Search products in the product library. Args: query: Search text (matches name, PIM-ID, baureihe). category: Filter by category key. limit: Max results (default 20). """ params = f"?limit={min(limit, 50)}" if query: params += f"&q={query}" if category: params += f"&category={category}" data = _api_get(f"/api/products{params}") summary = [] for p in data: summary.append({ "id": p["id"], "name": p["name"], "pim_id": p.get("pim_id"), "category": p.get("category_key"), "has_step": p.get("cad_file_id") is not None, "processing_status": p.get("processing_status"), "part_count": len(p.get("cad_part_materials") or []), }) return json.dumps(summary, indent=2) @mcp.tool() def check_materials(order_id: str) -> str: """Check if all materials in an order are mapped to library materials. Returns unmapped materials with suggestions. Args: order_id: UUID of the order to check. """ data = _api_get(f"/api/orders/{order_id}/check-materials") return json.dumps(data, indent=2) @mcp.tool() def list_materials(include_aliases: bool = False) -> str: """List all materials in the material library. Args: include_aliases: If true, include alias names for each material. """ data = _api_get("/api/materials") if not include_aliases: for m in data: m.pop("aliases", None) return json.dumps(data, indent=2) @mcp.tool() def dispatch_renders(order_id: str) -> str: """Dispatch (or retry) renders for all pending/failed lines in an order. Args: order_id: UUID of the order. """ data = _api_post(f"/api/orders/{order_id}/dispatch-renders") return json.dumps(data, indent=2) @mcp.tool() def get_worker_activity() -> str: """Get recent worker activity — last 30 STEP processing and render tasks with timing.""" data = _api_get("/api/worker/activity") return json.dumps(data, indent=2, default=str) @mcp.tool() def get_render_stats() -> str: """Get render pipeline statistics: throughput, material coverage, product/order counts.""" data = _api_get("/api/admin/dashboard-stats") return json.dumps(data, indent=2, default=str) @mcp.tool() def list_output_types() -> str: """List all configured output types with their render settings.""" data = _api_get("/api/output-types?include_inactive=true") summary = [] for ot in data: summary.append({ "id": ot["id"], "name": ot["name"], "renderer": ot["renderer"], "format": ot["output_format"], "is_animation": ot["is_animation"], "material_override": ot.get("material_override"), "is_active": ot["is_active"], "render_settings": ot.get("render_settings", {}), }) return json.dumps(summary, indent=2) @mcp.tool() def set_material_override(order_id: str, material_name: str = "") -> str: """Set a material override on all lines of an order (batch). All product parts will be rendered with this single material. Pass empty string to clear the override. Args: order_id: UUID of the order. material_name: HARTOMAT library material name, or empty to clear. """ data = _api_post( f"/api/orders/{order_id}/batch-material-override", {"material_override": material_name or None}, ) return json.dumps(data, indent=2) @mcp.tool() def create_order( product_ids: list[str], output_type_id: str = "", output_type_name: str = "", render_overrides: dict | None = None, material_override: str = "", notes: str = "", ) -> str: """Create a new order with the given products and output type. Either output_type_id or output_type_name must be provided. If both are given, output_type_id takes priority. render_overrides allows overriding render settings like output_format, width, height, samples, engine per line. Args: product_ids: List of product UUIDs to include in the order. output_type_id: UUID of the output type (takes priority over name). output_type_name: Name of the output type (used if output_type_id is empty). render_overrides: Optional dict of render setting overrides (e.g. {"output_format": "webp", "width": 2048, "height": 2048}). material_override: Optional HARTOMAT library material name to apply to all lines. notes: Optional notes for the order. """ # Resolve output_type_id from name if needed ot_id = output_type_id or None if not ot_id and output_type_name: rows = _db_query( "SELECT id FROM output_types WHERE name ILIKE %s AND is_active = true LIMIT 1", (output_type_name,), ) if rows: ot_id = rows[0]["id"] else: return f"Error: No active output type found matching '{output_type_name}'." lines = [] for pid in product_ids: line: dict = {"product_id": pid} if ot_id: line["output_type_id"] = ot_id if render_overrides: line["render_overrides"] = render_overrides if material_override: line["material_override"] = material_override lines.append(line) body: dict = {"lines": lines} if notes: body["notes"] = notes try: data = _api_post("/api/orders", body) return json.dumps( { "order_id": data["id"], "order_number": data["order_number"], "status": data["status"], "line_count": data.get("line_count", len(lines)), "render_overrides": render_overrides, }, indent=2, ) except Exception as e: return f"Error creating order: {e}" @mcp.tool() def set_render_overrides(order_id: str, render_overrides: dict | None = None) -> str: """Set render overrides on all lines of an order (batch). Overrides any output type render settings at order time. Common overrides: output_format (png/jpg/webp), width, height, samples, engine (cycles/eevee), transparent_bg, bg_color, noise_threshold, denoiser. Pass None/empty to clear all overrides. Args: order_id: UUID of the order. render_overrides: Dict of render setting overrides, or None to clear. """ data = _api_post( f"/api/orders/{order_id}/batch-render-overrides", {"render_overrides": render_overrides}, ) return json.dumps(data, indent=2) @mcp.tool() def get_queue_status() -> str: """Get current render queue status — pending, active, completed/failed counts.""" rows = _db_query(""" SELECT (SELECT count(*) FROM order_lines WHERE render_status = 'pending') AS pending_renders, (SELECT count(*) FROM order_lines WHERE render_status = 'processing') AS active_renders, (SELECT count(*) FROM order_lines WHERE render_status = 'completed' AND render_completed_at > now() - interval '1 hour') AS completed_last_hour, (SELECT count(*) FROM order_lines WHERE render_status = 'failed' AND render_completed_at > now() - interval '1 hour') AS failed_last_hour, (SELECT count(*) FROM cad_files WHERE processing_status = 'processing') AS processing_step_files """) return json.dumps(rows[0] if rows else {}, indent=2) @mcp.tool() def get_product_detail(product_id: str) -> str: """Get detailed information about a product: metadata, CAD file, parts, materials, render history. Args: product_id: UUID of the product. """ rows = _db_query(""" SELECT p.id, p.name, p.pim_id, p.category_key, p.ebene1, p.ebene2, p.baureihe, p.produkt_baureihe, p.lagertyp, p.name_cad_modell, p.is_active, p.cad_file_id, p.cad_part_materials, p.components, cf.original_name AS step_filename, cf.processing_status AS step_status, cf.parsed_objects, cf.file_hash AS step_hash, p.created_at, p.updated_at FROM products p LEFT JOIN cad_files cf ON cf.id = p.cad_file_id WHERE p.id = %s """, (product_id,)) if not rows: return f"Product {product_id} not found." product = rows[0] # Get render history renders = _db_query(""" SELECT ol.id AS line_id, ol.render_status, ol.result_path, ol.material_override, ol.render_started_at, ol.render_completed_at, ol.render_backend_used, ot.name AS output_type_name, o.order_number, ol.render_log->>'engine_used' AS engine, ol.render_log->>'render_duration_s' AS render_duration_s, ol.render_log->>'total_duration_s' AS total_duration_s FROM order_lines ol JOIN orders o ON o.id = ol.order_id LEFT JOIN output_types ot ON ot.id = ol.output_type_id WHERE ol.product_id = %s ORDER BY ol.render_completed_at DESC NULLS LAST LIMIT 20 """, (product_id,)) # Get media assets assets = _db_query(""" SELECT id, asset_type, storage_key, file_size_bytes, mime_type, created_at FROM media_assets WHERE product_id = %s ORDER BY created_at DESC LIMIT 20 """, (product_id,)) product["renders"] = renders product["media_assets"] = assets return json.dumps(product, indent=2, default=str) @mcp.tool() def get_render_detail(order_line_id: str) -> str: """Get full render metadata for a specific order line: timing, engine, materials, log. Args: order_line_id: UUID of the order line. """ rows = _db_query(""" SELECT ol.id, ol.order_id, ol.product_id, ol.output_type_id, ol.render_status, ol.result_path, ol.material_override, ol.render_started_at, ol.render_completed_at, ol.render_backend_used, ol.render_log, ol.unit_price, ol.item_status, p.name AS product_name, p.pim_id, ot.name AS output_type_name, ot.renderer, ot.output_format, ot.render_settings, ot.material_override AS ot_material_override, o.order_number FROM order_lines ol JOIN products p ON p.id = ol.product_id LEFT JOIN output_types ot ON ot.id = ol.output_type_id JOIN orders o ON o.id = ol.order_id WHERE ol.id = %s """, (order_line_id,)) if not rows: return f"Order line {order_line_id} not found." return json.dumps(rows[0], indent=2, default=str) @mcp.tool() def get_completed_renders( product_name: str = "", order_number: str = "", output_type: str = "", limit: int = 20, ) -> str: """List completed renders with metadata: product, output type, timing, file path. Args: product_name: Filter by product name (partial match). order_number: Filter by order number (partial match). output_type: Filter by output type name (partial match). limit: Max results (default 20, max 100). """ limit = min(limit, 100) conditions = ["ol.render_status = 'completed'"] params: list = [] if product_name: conditions.append("p.name ILIKE %s") params.append(f"%{product_name}%") if order_number: conditions.append("o.order_number ILIKE %s") params.append(f"%{order_number}%") if output_type: conditions.append("ot.name ILIKE %s") params.append(f"%{output_type}%") where = " AND ".join(conditions) rows = _db_query(f""" SELECT ol.id AS line_id, o.order_number, p.name AS product_name, p.pim_id, ot.name AS output_type_name, ot.output_format, ol.result_path, ol.material_override, ol.render_started_at, ol.render_completed_at, ol.render_backend_used, ol.render_log->>'engine_used' AS engine, ol.render_log->>'render_duration_s' AS render_duration_s, ol.render_log->>'total_duration_s' AS total_duration_s, ol.render_log->>'output_size_bytes' AS output_size_bytes FROM order_lines ol JOIN products p ON p.id = ol.product_id LEFT JOIN output_types ot ON ot.id = ol.output_type_id JOIN orders o ON o.id = ol.order_id WHERE {where} ORDER BY ol.render_completed_at DESC LIMIT %s """, tuple(params) + (limit,)) return json.dumps(rows, indent=2, default=str) @mcp.tool() def get_media_assets( product_id: str = "", asset_type: str = "", limit: int = 30, ) -> str: """List media assets (renders, thumbnails, GLBs, USDs) with metadata. Args: product_id: Filter by product UUID. asset_type: Filter by type (thumbnail, still, turntable, gltf_geometry, usd_master, blend_production). limit: Max results (default 30, max 100). """ limit = min(limit, 100) conditions = ["1=1"] params: list = [] if product_id: conditions.append("ma.product_id = %s") params.append(product_id) if asset_type: conditions.append("ma.asset_type = %s") params.append(asset_type) where = " AND ".join(conditions) rows = _db_query(f""" SELECT ma.id, ma.asset_type, ma.storage_key, ma.file_size_bytes, ma.mime_type, ma.product_id, p.name AS product_name, p.pim_id, ma.order_line_id, ma.cad_file_id, ma.created_at FROM media_assets ma LEFT JOIN products p ON p.id = ma.product_id WHERE {where} ORDER BY ma.created_at DESC LIMIT %s """, tuple(params) + (limit,)) return json.dumps(rows, indent=2, default=str) @mcp.tool() def get_failed_renders(limit: int = 20) -> str: """List recently failed renders with error details. Args: limit: Max results (default 20). """ rows = _db_query(""" SELECT ol.id AS line_id, o.order_number, p.name AS product_name, p.pim_id, ot.name AS output_type_name, ol.render_completed_at AS failed_at, ol.render_log->>'error' AS error, ol.render_log->>'engine_used' AS engine, ol.render_backend_used FROM order_lines ol JOIN products p ON p.id = ol.product_id LEFT JOIN output_types ot ON ot.id = ol.output_type_id JOIN orders o ON o.id = ol.order_id WHERE ol.render_status = 'failed' ORDER BY ol.render_completed_at DESC NULLS LAST LIMIT %s """, (min(limit, 50),)) return json.dumps(rows, indent=2, default=str) # ── Resources ──────────────────────────────────────────────────────────────── @mcp.resource("hartomat://schema") def get_database_schema() -> str: """Database schema overview — table names and column types.""" rows = _db_query(""" SELECT table_name, column_name, data_type, is_nullable FROM information_schema.columns WHERE table_schema = 'public' ORDER BY table_name, ordinal_position """) tables: dict[str, list] = {} for r in rows: t = r["table_name"] if t not in tables: tables[t] = [] tables[t].append( f" {r['column_name']}: {r['data_type']}" + (" (nullable)" if r["is_nullable"] == "YES" else "") ) lines = [] for table, cols in sorted(tables.items()): lines.append(f"\n{table}:") lines.extend(cols) return "\n".join(lines) @mcp.resource("hartomat://output-types") def get_output_types_resource() -> str: """All configured output types with settings.""" data = _api_get("/api/output-types?include_inactive=true") return json.dumps(data, indent=2) # ── Run ────────────────────────────────────────────────────────────────────── if __name__ == "__main__": mcp.run()