"""SSE endpoint for live task log streaming.""" from __future__ import annotations import asyncio import json import logging from fastapi import APIRouter, Depends from fastapi.responses import StreamingResponse from app.utils.auth import get_current_user from app.config import settings router = APIRouter(prefix="/tasks", tags=["task-logs"]) logger = logging.getLogger(__name__) @router.get("/{task_id}/logs") async def stream_task_logs( task_id: str, current_user=Depends(get_current_user), ): """SSE stream of task log lines. Use fetch() with Authorization header on the frontend.""" import redis.asyncio as aioredis async def event_stream(): r = aioredis.from_url(settings.redis_url) try: # Send heartbeat first yield "data: {\"type\":\"connected\"}\n\n" # Send existing log lines existing = await r.lrange(f"task_logs:{task_id}", 0, -1) for line in existing: data = line.decode() if isinstance(line, bytes) else line yield f"data: {data}\n\n" # Subscribe and stream new entries pubsub = r.pubsub() await pubsub.subscribe(f"task_logs_ch:{task_id}") timeout_seconds = 600 # 10 minutes max deadline = asyncio.get_event_loop().time() + timeout_seconds while asyncio.get_event_loop().time() < deadline: try: msg = await asyncio.wait_for( pubsub.get_message(ignore_subscribe_messages=True), timeout=2.0 ) if msg and msg["type"] == "message": data = msg["data"].decode() if isinstance(msg["data"], bytes) else msg["data"] yield f"data: {data}\n\n" # Check if task completed try: parsed = json.loads(data) if parsed.get("level") == "done": break except Exception: pass else: # Heartbeat every 2s yield ": heartbeat\n\n" except asyncio.TimeoutError: yield ": heartbeat\n\n" except Exception as exc: logger.error("SSE stream error for task %s: %s", task_id, exc) finally: try: await r.aclose() except Exception: pass return StreamingResponse( event_stream(), media_type="text/event-stream", headers={ "Cache-Control": "no-cache", "X-Accel-Buffering": "no", }, )