1321ef2bd4
The queue handles far more than thumbnails: OCC tessellation, USD master generation, GLB production, order line renders, and workflow renders. asset_pipeline better reflects its role as the render-worker's primary queue. Updated all references in: task decorators, celery_app.py, beat_tasks.py, docker-compose.yml worker command, worker.py MONITORED_QUEUES, admin.py, CLAUDE.md, LEARNINGS.md, Dockerfile, helpTexts.ts, test files, and all .claude/commands/*.md skill files. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
89 lines
3.3 KiB
Python
89 lines
3.3 KiB
Python
"""Celery task for GPU health probe."""
|
|
import logging
|
|
from app.tasks.celery_app import celery_app
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
@celery_app.task(name="app.tasks.gpu_tasks.probe_gpu", queue="asset_pipeline")
|
|
def probe_gpu() -> dict:
|
|
"""Run Blender GPU probe on the render-worker. Stores result in system_settings."""
|
|
import subprocess
|
|
import json
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
from app.services.render_blender import find_blender
|
|
|
|
result = {
|
|
"status": "unknown",
|
|
"device_type": None,
|
|
"devices": [],
|
|
"error": None,
|
|
"probed_at": datetime.now(timezone.utc).isoformat(),
|
|
}
|
|
|
|
try:
|
|
blender_bin = find_blender()
|
|
if not blender_bin:
|
|
result["status"] = "error"
|
|
result["error"] = "Blender binary not found — check BLENDER_BIN env or PATH"
|
|
else:
|
|
probe_script = Path("/render-scripts/gpu_probe.py")
|
|
|
|
if not probe_script.exists():
|
|
result["status"] = "error"
|
|
result["error"] = f"gpu_probe.py not found at {probe_script}"
|
|
else:
|
|
proc = subprocess.run(
|
|
[blender_bin, "--background", "--python", str(probe_script)],
|
|
capture_output=True, text=True, timeout=60,
|
|
)
|
|
|
|
for line in proc.stdout.splitlines():
|
|
if "GPU_PROBE_OK:" in line:
|
|
result["status"] = "ok"
|
|
# Parse device_type and devices from line:
|
|
# GPU_PROBE_OK: device_type=OPTIX devices=[...]
|
|
parts = line.split("GPU_PROBE_OK:", 1)[1].strip()
|
|
for p in parts.split():
|
|
if p.startswith("device_type="):
|
|
result["device_type"] = p.split("=", 1)[1]
|
|
break
|
|
elif "GPU_PROBE_FAIL:" in line:
|
|
result["status"] = "failed"
|
|
result["error"] = line.split("GPU_PROBE_FAIL:", 1)[1].strip()
|
|
break
|
|
|
|
if result["status"] == "unknown":
|
|
result["status"] = "failed" if proc.returncode != 0 else "unknown"
|
|
result["error"] = proc.stderr[:500] if proc.stderr else "No probe output"
|
|
|
|
except subprocess.TimeoutExpired:
|
|
result["status"] = "error"
|
|
result["error"] = "GPU probe timed out after 60s"
|
|
except Exception as exc:
|
|
result["status"] = "error"
|
|
result["error"] = str(exc)
|
|
|
|
# Save to system_settings
|
|
_save_probe_result(result)
|
|
return result
|
|
|
|
|
|
def _save_probe_result(result: dict) -> None:
|
|
import json
|
|
from sqlalchemy import create_engine, text
|
|
from app.config import settings as app_settings
|
|
|
|
sync_url = app_settings.database_url.replace("+asyncpg", "")
|
|
eng = create_engine(sync_url)
|
|
try:
|
|
with eng.connect() as conn:
|
|
conn.execute(text("""
|
|
INSERT INTO system_settings (key, value) VALUES (:key, :value)
|
|
ON CONFLICT (key) DO UPDATE SET value = EXCLUDED.value
|
|
"""), {"key": "gpu_probe_last_result", "value": json.dumps(result)})
|
|
conn.commit()
|
|
finally:
|
|
eng.dispose()
|