f5ca91ee02
- Layout: mobile hamburger menu + overlay backdrop + close button; content area always full-width - Media browser: filter chips (default still+turntable); advanced toggle for GLB/STL; thumbnail_url previews for non-image types; video hover-play for turntable - Backend: asset_types multi-filter, thumbnail_url in MediaAssetOut, download proxy endpoint for MinIO/local files - Admin: "Import Existing Media" button → POST /api/admin/import-media-assets - Billing: fix invoice create 500 (MissingGreenlet — use selectinload after commit); PDF download uses axios blob instead of bare <a href> (auth header missing); fix storage.upload() accepting str|Path - SSE task logs: task_logs.py core + router, LiveRenderLog component - CadPreview: fix infinite loop when no gltf_geometry assets; loading screen before ThreeDViewer render - render-worker: add trimesh layer to Dockerfile Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
79 lines
2.7 KiB
Python
79 lines
2.7 KiB
Python
"""SSE endpoint for live task log streaming."""
|
|
from __future__ import annotations
|
|
import asyncio
|
|
import json
|
|
import logging
|
|
from fastapi import APIRouter, Depends
|
|
from fastapi.responses import StreamingResponse
|
|
from app.utils.auth import get_current_user
|
|
from app.config import settings
|
|
|
|
router = APIRouter(prefix="/tasks", tags=["task-logs"])
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
@router.get("/{task_id}/logs")
|
|
async def stream_task_logs(
|
|
task_id: str,
|
|
current_user=Depends(get_current_user),
|
|
):
|
|
"""SSE stream of task log lines. Use fetch() with Authorization header on the frontend."""
|
|
import redis.asyncio as aioredis
|
|
|
|
async def event_stream():
|
|
r = aioredis.from_url(settings.redis_url)
|
|
try:
|
|
# Send heartbeat first
|
|
yield "data: {\"type\":\"connected\"}\n\n"
|
|
|
|
# Send existing log lines
|
|
existing = await r.lrange(f"task_logs:{task_id}", 0, -1)
|
|
for line in existing:
|
|
data = line.decode() if isinstance(line, bytes) else line
|
|
yield f"data: {data}\n\n"
|
|
|
|
# Subscribe and stream new entries
|
|
pubsub = r.pubsub()
|
|
await pubsub.subscribe(f"task_logs_ch:{task_id}")
|
|
|
|
timeout_seconds = 600 # 10 minutes max
|
|
deadline = asyncio.get_event_loop().time() + timeout_seconds
|
|
|
|
while asyncio.get_event_loop().time() < deadline:
|
|
try:
|
|
msg = await asyncio.wait_for(
|
|
pubsub.get_message(ignore_subscribe_messages=True),
|
|
timeout=2.0
|
|
)
|
|
if msg and msg["type"] == "message":
|
|
data = msg["data"].decode() if isinstance(msg["data"], bytes) else msg["data"]
|
|
yield f"data: {data}\n\n"
|
|
# Check if task completed
|
|
try:
|
|
parsed = json.loads(data)
|
|
if parsed.get("level") == "done":
|
|
break
|
|
except Exception:
|
|
pass
|
|
else:
|
|
# Heartbeat every 2s
|
|
yield ": heartbeat\n\n"
|
|
except asyncio.TimeoutError:
|
|
yield ": heartbeat\n\n"
|
|
except Exception as exc:
|
|
logger.error("SSE stream error for task %s: %s", task_id, exc)
|
|
finally:
|
|
try:
|
|
await r.aclose()
|
|
except Exception:
|
|
pass
|
|
|
|
return StreamingResponse(
|
|
event_stream(),
|
|
media_type="text/event-stream",
|
|
headers={
|
|
"Cache-Control": "no-cache",
|
|
"X-Accel-Buffering": "no",
|
|
},
|
|
)
|