Files
HartOMat/backend/app/tasks/gpu_tasks.py
T
Hartmut 1321ef2bd4 refactor: rename thumbnail_rendering queue to asset_pipeline
The queue handles far more than thumbnails: OCC tessellation, USD master
generation, GLB production, order line renders, and workflow renders.
asset_pipeline better reflects its role as the render-worker's primary queue.

Updated all references in: task decorators, celery_app.py, beat_tasks.py,
docker-compose.yml worker command, worker.py MONITORED_QUEUES, admin.py,
CLAUDE.md, LEARNINGS.md, Dockerfile, helpTexts.ts, test files,
and all .claude/commands/*.md skill files.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-12 22:28:38 +01:00

89 lines
3.3 KiB
Python

"""Celery task for GPU health probe."""
import logging
from app.tasks.celery_app import celery_app
logger = logging.getLogger(__name__)
@celery_app.task(name="app.tasks.gpu_tasks.probe_gpu", queue="asset_pipeline")
def probe_gpu() -> dict:
"""Run Blender GPU probe on the render-worker. Stores result in system_settings."""
import subprocess
import json
from datetime import datetime, timezone
from pathlib import Path
from app.services.render_blender import find_blender
result = {
"status": "unknown",
"device_type": None,
"devices": [],
"error": None,
"probed_at": datetime.now(timezone.utc).isoformat(),
}
try:
blender_bin = find_blender()
if not blender_bin:
result["status"] = "error"
result["error"] = "Blender binary not found — check BLENDER_BIN env or PATH"
else:
probe_script = Path("/render-scripts/gpu_probe.py")
if not probe_script.exists():
result["status"] = "error"
result["error"] = f"gpu_probe.py not found at {probe_script}"
else:
proc = subprocess.run(
[blender_bin, "--background", "--python", str(probe_script)],
capture_output=True, text=True, timeout=60,
)
for line in proc.stdout.splitlines():
if "GPU_PROBE_OK:" in line:
result["status"] = "ok"
# Parse device_type and devices from line:
# GPU_PROBE_OK: device_type=OPTIX devices=[...]
parts = line.split("GPU_PROBE_OK:", 1)[1].strip()
for p in parts.split():
if p.startswith("device_type="):
result["device_type"] = p.split("=", 1)[1]
break
elif "GPU_PROBE_FAIL:" in line:
result["status"] = "failed"
result["error"] = line.split("GPU_PROBE_FAIL:", 1)[1].strip()
break
if result["status"] == "unknown":
result["status"] = "failed" if proc.returncode != 0 else "unknown"
result["error"] = proc.stderr[:500] if proc.stderr else "No probe output"
except subprocess.TimeoutExpired:
result["status"] = "error"
result["error"] = "GPU probe timed out after 60s"
except Exception as exc:
result["status"] = "error"
result["error"] = str(exc)
# Save to system_settings
_save_probe_result(result)
return result
def _save_probe_result(result: dict) -> None:
import json
from sqlalchemy import create_engine, text
from app.config import settings as app_settings
sync_url = app_settings.database_url.replace("+asyncpg", "")
eng = create_engine(sync_url)
try:
with eng.connect() as conn:
conn.execute(text("""
INSERT INTO system_settings (key, value) VALUES (:key, :value)
ON CONFLICT (key) DO UPDATE SET value = EXCLUDED.value
"""), {"key": "gpu_probe_last_result", "value": json.dumps(result)})
conn.commit()
finally:
eng.dispose()