diff --git a/backend/alembic/versions/054_worker_configs.py b/backend/alembic/versions/054_worker_configs.py new file mode 100644 index 0000000..c8c8ca8 --- /dev/null +++ b/backend/alembic/versions/054_worker_configs.py @@ -0,0 +1,36 @@ +"""Add worker_configs table for dynamic concurrency settings. + +Revision ID: 054 +Revises: 053 +""" +from alembic import op +import sqlalchemy as sa + +revision = "054" +down_revision = "053" +branch_labels = None +depends_on = None + + +def upgrade() -> None: + op.create_table( + "worker_configs", + sa.Column("queue_name", sa.String(100), primary_key=True), + sa.Column("max_concurrency", sa.Integer, nullable=False, server_default="8"), + sa.Column("min_concurrency", sa.Integer, nullable=False, server_default="2"), + sa.Column("enabled", sa.Boolean, nullable=False, server_default="true"), + sa.Column("updated_at", sa.DateTime, nullable=False, server_default=sa.text("now()")), + ) + # Seed default rows + op.execute(""" + INSERT INTO worker_configs (queue_name, max_concurrency, min_concurrency, enabled) + VALUES + ('step_processing', 8, 2, true), + ('thumbnail_rendering', 1, 1, true), + ('ai_validation', 4, 1, true) + ON CONFLICT DO NOTHING + """) + + +def downgrade() -> None: + op.drop_table("worker_configs") diff --git a/backend/app/api/routers/worker.py b/backend/app/api/routers/worker.py index fc61346..b52e94f 100644 --- a/backend/app/api/routers/worker.py +++ b/backend/app/api/routers/worker.py @@ -1,7 +1,8 @@ """Worker activity router — exposes recent background task status.""" from datetime import datetime +from typing import Optional -from fastapi import APIRouter, Depends +from fastapi import APIRouter, Depends, HTTPException from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy import select from sqlalchemy.orm import selectinload @@ -14,7 +15,8 @@ from app.models.order import Order from app.models.order_line import OrderLine from app.models.product import Product from app.models.user import User -from app.utils.auth import get_current_user, require_admin_or_pm +from app.models.worker_config import WorkerConfig +from app.utils.auth import get_current_user, require_admin_or_pm, require_admin router = APIRouter(prefix="/worker", tags=["worker"]) @@ -569,3 +571,86 @@ async def render_health( last_render_age_minutes=last_render_age_minutes, details=details, ) + + +# --------------------------------------------------------------------------- +# Worker concurrency configuration +# --------------------------------------------------------------------------- + +class WorkerConfigOut(BaseModel): + queue_name: str + max_concurrency: int + min_concurrency: int + enabled: bool + updated_at: str + + +class WorkerConfigUpdate(BaseModel): + max_concurrency: Optional[int] = None + min_concurrency: Optional[int] = None + enabled: Optional[bool] = None + + +@router.get("/configs", response_model=list[WorkerConfigOut]) +async def list_worker_configs( + user: User = Depends(require_admin), + db: AsyncSession = Depends(get_db), +): + """List all worker concurrency configurations (admin only).""" + result = await db.execute(select(WorkerConfig).order_by(WorkerConfig.queue_name)) + configs = result.scalars().all() + return [ + WorkerConfigOut( + queue_name=cfg.queue_name, + max_concurrency=cfg.max_concurrency, + min_concurrency=cfg.min_concurrency, + enabled=cfg.enabled, + updated_at=cfg.updated_at.isoformat() if cfg.updated_at else datetime.utcnow().isoformat(), + ) + for cfg in configs + ] + + +@router.put("/configs/{queue_name}", response_model=WorkerConfigOut) +async def update_worker_config( + queue_name: str, + body: WorkerConfigUpdate, + user: User = Depends(require_admin), + db: AsyncSession = Depends(get_db), +): + """Update concurrency settings for a specific queue (admin only).""" + result = await db.execute( + select(WorkerConfig).where(WorkerConfig.queue_name == queue_name) + ) + cfg = result.scalar_one_or_none() + if not cfg: + raise HTTPException(404, detail=f"No worker config found for queue '{queue_name}'") + + if body.max_concurrency is not None: + if body.max_concurrency < 1: + raise HTTPException(400, detail="max_concurrency must be >= 1") + cfg.max_concurrency = body.max_concurrency + + if body.min_concurrency is not None: + if body.min_concurrency < 1: + raise HTTPException(400, detail="min_concurrency must be >= 1") + cfg.min_concurrency = body.min_concurrency + + if body.enabled is not None: + cfg.enabled = body.enabled + + # Validate min <= max after updates + if cfg.min_concurrency > cfg.max_concurrency: + raise HTTPException(400, detail="min_concurrency cannot exceed max_concurrency") + + cfg.updated_at = datetime.utcnow() + await db.commit() + await db.refresh(cfg) + + return WorkerConfigOut( + queue_name=cfg.queue_name, + max_concurrency=cfg.max_concurrency, + min_concurrency=cfg.min_concurrency, + enabled=cfg.enabled, + updated_at=cfg.updated_at.isoformat(), + ) diff --git a/backend/app/models/__init__.py b/backend/app/models/__init__.py index f16f158..cda2b12 100644 --- a/backend/app/models/__init__.py +++ b/backend/app/models/__init__.py @@ -17,11 +17,12 @@ from app.domains.admin.models import DashboardConfig # Also re-export SystemSetting (no domain assigned — stays as-is) from app.models.system_setting import SystemSetting +from app.models.worker_config import WorkerConfig __all__ = [ "Tenant", "User", "Template", "CadFile", "Product", "Order", "OrderItem", "OrderLine", "AuditLog", "PricingTier", "OutputType", "RenderTemplate", "ProductRenderPosition", "WorkflowDefinition", "WorkflowRun", "WorkflowNodeResult", "Material", "MaterialAlias", "AssetLibrary", "MediaAsset", "MediaAssetType", "SystemSetting", - "DashboardConfig", + "DashboardConfig", "WorkerConfig", ] diff --git a/backend/app/models/worker_config.py b/backend/app/models/worker_config.py new file mode 100644 index 0000000..c962065 --- /dev/null +++ b/backend/app/models/worker_config.py @@ -0,0 +1,14 @@ +from datetime import datetime +from sqlalchemy import String, Integer, Boolean, DateTime +from sqlalchemy.orm import Mapped, mapped_column +from app.database import Base + + +class WorkerConfig(Base): + __tablename__ = "worker_configs" + + queue_name: Mapped[str] = mapped_column(String(100), primary_key=True) + max_concurrency: Mapped[int] = mapped_column(Integer, nullable=False, default=8) + min_concurrency: Mapped[int] = mapped_column(Integer, nullable=False, default=2) + enabled: Mapped[bool] = mapped_column(Boolean, nullable=False, default=True) + updated_at: Mapped[datetime] = mapped_column(DateTime, nullable=False, default=datetime.utcnow) diff --git a/backend/app/tasks/beat_tasks.py b/backend/app/tasks/beat_tasks.py index dbad854..e654dfe 100644 --- a/backend/app/tasks/beat_tasks.py +++ b/backend/app/tasks/beat_tasks.py @@ -10,6 +10,50 @@ from celery import shared_task logger = logging.getLogger(__name__) +@shared_task(name="app.tasks.beat_tasks.apply_worker_concurrency", queue="step_processing") +def apply_worker_concurrency() -> None: + """Read worker_configs from DB and broadcast pool_grow to workers. + + Runs every 5 minutes via Celery Beat. Signals all workers to adjust their + pool size to match the max_concurrency setting for each enabled queue. + """ + try: + from sqlalchemy import create_engine, select + from sqlalchemy.orm import Session + from app.config import settings as app_settings + from app.models.worker_config import WorkerConfig + from app.tasks.celery_app import celery_app + + sync_url = app_settings.database_url.replace("+asyncpg", "") + eng = create_engine(sync_url) + try: + with Session(eng) as session: + configs = session.execute( + select(WorkerConfig).where(WorkerConfig.enabled == True) # noqa: E712 + ).scalars().all() + for cfg in configs: + try: + celery_app.control.broadcast( + "pool_grow", + arguments={"n": cfg.max_concurrency}, + destination=None, # all workers + reply=False, + ) + logger.info( + "[WORKER_SCALE] Signalled pool_grow n=%d for queue %s", + cfg.max_concurrency, cfg.queue_name, + ) + except Exception as exc: + logger.warning( + "[WORKER_SCALE] pool_grow failed for %s: %s", + cfg.queue_name, exc, + ) + finally: + eng.dispose() + except Exception as exc: + logger.error("apply_worker_concurrency failed: %s", exc) + + @shared_task(name="app.tasks.beat_tasks.broadcast_queue_status", queue="step_processing") def broadcast_queue_status() -> None: """Broadcast current queue depths to all WebSocket clients every 10s. diff --git a/backend/app/tasks/celery_app.py b/backend/app/tasks/celery_app.py index 15a1c55..76d569e 100644 --- a/backend/app/tasks/celery_app.py +++ b/backend/app/tasks/celery_app.py @@ -46,5 +46,9 @@ celery_app.conf.update( "task": "app.tasks.beat_tasks.recover_stalled_renders", "schedule": 300.0, # every 5 minutes }, + "apply-worker-concurrency-every-5m": { + "task": "app.tasks.beat_tasks.apply_worker_concurrency", + "schedule": 300.0, # every 5 minutes + }, }, ) diff --git a/docker-compose.yml b/docker-compose.yml index 03c5123..9603694 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -87,7 +87,7 @@ services: build: context: ./backend dockerfile: Dockerfile - command: celery -A app.tasks.celery_app worker --loglevel=info -Q step_processing,ai_validation --concurrency=${CELERY_WORKER_CONCURRENCY:-8} + command: celery -A app.tasks.celery_app worker --loglevel=info -Q step_processing,ai_validation --autoscale=${MAX_CONCURRENCY:-8},${MIN_CONCURRENCY:-2} --concurrency=${MIN_CONCURRENCY:-2} environment: - POSTGRES_DB=${POSTGRES_DB:-schaeffler} - POSTGRES_USER=${POSTGRES_USER:-schaeffler} @@ -121,6 +121,7 @@ services: dockerfile: render-worker/Dockerfile args: - BLENDER_VERSION=${BLENDER_VERSION:-5.0.1} + command: bash -c "python3 /check_version.py && celery -A app.tasks.celery_app worker --loglevel=info -Q thumbnail_rendering --autoscale=1,1 --concurrency=1" environment: - POSTGRES_DB=${POSTGRES_DB:-schaeffler} - POSTGRES_USER=${POSTGRES_USER:-schaeffler} diff --git a/frontend/src/api/worker.ts b/frontend/src/api/worker.ts index 1e48e47..65a6141 100644 --- a/frontend/src/api/worker.ts +++ b/frontend/src/api/worker.ts @@ -166,3 +166,34 @@ export async function scaleWorkers(req: ScaleRequest): Promise { const res = await api.post('/worker/scale', req) return res.data } + +// --------------------------------------------------------------------------- +// Worker concurrency configuration +// --------------------------------------------------------------------------- + +export interface WorkerConfig { + queue_name: string + max_concurrency: number + min_concurrency: number + enabled: boolean + updated_at: string +} + +export interface WorkerConfigUpdate { + max_concurrency?: number + min_concurrency?: number + enabled?: boolean +} + +export async function getWorkerConfigs(): Promise { + const res = await api.get('/worker/configs') + return res.data +} + +export async function updateWorkerConfig( + queueName: string, + update: WorkerConfigUpdate, +): Promise { + const res = await api.put(`/worker/configs/${queueName}`, update) + return res.data +} diff --git a/frontend/src/pages/WorkerManagement.tsx b/frontend/src/pages/WorkerManagement.tsx index 1b22025..4a87396 100644 --- a/frontend/src/pages/WorkerManagement.tsx +++ b/frontend/src/pages/WorkerManagement.tsx @@ -1,13 +1,16 @@ import { useState } from 'react' import { useQuery, useMutation, useQueryClient } from '@tanstack/react-query' import { toast } from 'sonner' -import { RefreshCw, ChevronDown, ChevronRight, Cpu, Layers, Minus, Plus } from 'lucide-react' +import { RefreshCw, ChevronDown, ChevronRight, Cpu, Layers, Minus, Plus, Settings2 } from 'lucide-react' import { getCeleryWorkers, getQueueStatus, scaleWorkers, + getWorkerConfigs, + updateWorkerConfig, type CeleryWorker, type ScaleRequest, + type WorkerConfig, } from '../api/worker' // --------------------------------------------------------------------------- @@ -161,6 +164,93 @@ function QueueDepthRow({ queue, depth }: { queue: string; depth: number }) { ) } +// --------------------------------------------------------------------------- +// Concurrency config row +// --------------------------------------------------------------------------- + +function ConcurrencyConfigRow({ config }: { config: WorkerConfig }) { + const qc = useQueryClient() + const [minVal, setMinVal] = useState(config.min_concurrency) + const [maxVal, setMaxVal] = useState(config.max_concurrency) + + const saveMut = useMutation({ + mutationFn: () => + updateWorkerConfig(config.queue_name, { + min_concurrency: minVal, + max_concurrency: maxVal, + }), + onSuccess: () => { + toast.success(`Saved concurrency for ${config.queue_name}`) + qc.invalidateQueries({ queryKey: ['worker-configs'] }) + }, + onError: (e: unknown) => { + const detail = (e as { response?: { data?: { detail?: string } } })?.response?.data?.detail + toast.error(detail ?? `Failed to save ${config.queue_name}`) + }, + }) + + const isDirty = minVal !== config.min_concurrency || maxVal !== config.max_concurrency + + return ( +
+
+

{config.queue_name}

+

+ {config.enabled ? 'enabled' : 'disabled'} · updated{' '} + {new Date(config.updated_at).toLocaleString()} +

+
+
+ {/* Min concurrency */} +
+ Min +
+ + {minVal} + +
+
+ {/* Max concurrency */} +
+ Max +
+ + {maxVal} + +
+
+ +
+
+ ) +} + // --------------------------------------------------------------------------- // Main page // --------------------------------------------------------------------------- @@ -180,9 +270,15 @@ export default function WorkerManagement() { refetchInterval: 5_000, }) + const { data: workerConfigs, isLoading: configsLoading } = useQuery({ + queryKey: ['worker-configs'], + queryFn: getWorkerConfigs, + }) + function refresh() { qc.invalidateQueries({ queryKey: ['celery-workers'] }) qc.invalidateQueries({ queryKey: ['queue-status'] }) + qc.invalidateQueries({ queryKey: ['worker-configs'] }) } const workers = workerData?.workers ?? [] @@ -263,6 +359,33 @@ export default function WorkerManagement() { )} + {/* Concurrency settings */} +
+
+ +

Concurrency Settings

+
+

+ Configure min/max concurrency per queue. The beat scheduler applies these settings + every 5 minutes via Celery pool signals. Changes are persisted in the database. +

+ {configsLoading ? ( +
+ {[0, 1, 2].map((i) => ( +
+ ))} +
+ ) : !workerConfigs || workerConfigs.length === 0 ? ( +

No worker configs available.

+ ) : ( +
+ {workerConfigs.map((cfg) => ( + + ))} +
+ )} +
+ {/* Scale controls */}

Scale Services