feat(phase8.1-8.2): dynamic worker concurrency via worker_configs
- Migration 054: worker_configs table (queue_name PK, max/min_concurrency,
enabled, updated_at); seeds step_processing(8/2), thumbnail_rendering(1/1),
ai_validation(4/1)
- WorkerConfig SQLAlchemy model
- apply_worker_concurrency beat task: reads enabled configs, broadcasts
pool_grow to all Celery workers every 5min
- GET/PUT /api/worker/configs (admin): list + update per-queue concurrency
- docker-compose.yml: worker uses --autoscale=${MAX_CONCURRENCY:-8},${MIN_CONCURRENCY:-2};
render-worker uses --autoscale=1,1 --concurrency=1
- WorkerManagement.tsx: "Concurrency Settings" section with +/- steppers
and Save button per queue
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -0,0 +1,36 @@
|
||||
"""Add worker_configs table for dynamic concurrency settings.
|
||||
|
||||
Revision ID: 054
|
||||
Revises: 053
|
||||
"""
|
||||
from alembic import op
|
||||
import sqlalchemy as sa
|
||||
|
||||
revision = "054"
|
||||
down_revision = "053"
|
||||
branch_labels = None
|
||||
depends_on = None
|
||||
|
||||
|
||||
def upgrade() -> None:
|
||||
op.create_table(
|
||||
"worker_configs",
|
||||
sa.Column("queue_name", sa.String(100), primary_key=True),
|
||||
sa.Column("max_concurrency", sa.Integer, nullable=False, server_default="8"),
|
||||
sa.Column("min_concurrency", sa.Integer, nullable=False, server_default="2"),
|
||||
sa.Column("enabled", sa.Boolean, nullable=False, server_default="true"),
|
||||
sa.Column("updated_at", sa.DateTime, nullable=False, server_default=sa.text("now()")),
|
||||
)
|
||||
# Seed default rows
|
||||
op.execute("""
|
||||
INSERT INTO worker_configs (queue_name, max_concurrency, min_concurrency, enabled)
|
||||
VALUES
|
||||
('step_processing', 8, 2, true),
|
||||
('thumbnail_rendering', 1, 1, true),
|
||||
('ai_validation', 4, 1, true)
|
||||
ON CONFLICT DO NOTHING
|
||||
""")
|
||||
|
||||
|
||||
def downgrade() -> None:
|
||||
op.drop_table("worker_configs")
|
||||
@@ -1,7 +1,8 @@
|
||||
"""Worker activity router — exposes recent background task status."""
|
||||
from datetime import datetime
|
||||
from typing import Optional
|
||||
|
||||
from fastapi import APIRouter, Depends
|
||||
from fastapi import APIRouter, Depends, HTTPException
|
||||
from sqlalchemy.ext.asyncio import AsyncSession
|
||||
from sqlalchemy import select
|
||||
from sqlalchemy.orm import selectinload
|
||||
@@ -14,7 +15,8 @@ from app.models.order import Order
|
||||
from app.models.order_line import OrderLine
|
||||
from app.models.product import Product
|
||||
from app.models.user import User
|
||||
from app.utils.auth import get_current_user, require_admin_or_pm
|
||||
from app.models.worker_config import WorkerConfig
|
||||
from app.utils.auth import get_current_user, require_admin_or_pm, require_admin
|
||||
|
||||
router = APIRouter(prefix="/worker", tags=["worker"])
|
||||
|
||||
@@ -569,3 +571,86 @@ async def render_health(
|
||||
last_render_age_minutes=last_render_age_minutes,
|
||||
details=details,
|
||||
)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Worker concurrency configuration
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class WorkerConfigOut(BaseModel):
|
||||
queue_name: str
|
||||
max_concurrency: int
|
||||
min_concurrency: int
|
||||
enabled: bool
|
||||
updated_at: str
|
||||
|
||||
|
||||
class WorkerConfigUpdate(BaseModel):
|
||||
max_concurrency: Optional[int] = None
|
||||
min_concurrency: Optional[int] = None
|
||||
enabled: Optional[bool] = None
|
||||
|
||||
|
||||
@router.get("/configs", response_model=list[WorkerConfigOut])
|
||||
async def list_worker_configs(
|
||||
user: User = Depends(require_admin),
|
||||
db: AsyncSession = Depends(get_db),
|
||||
):
|
||||
"""List all worker concurrency configurations (admin only)."""
|
||||
result = await db.execute(select(WorkerConfig).order_by(WorkerConfig.queue_name))
|
||||
configs = result.scalars().all()
|
||||
return [
|
||||
WorkerConfigOut(
|
||||
queue_name=cfg.queue_name,
|
||||
max_concurrency=cfg.max_concurrency,
|
||||
min_concurrency=cfg.min_concurrency,
|
||||
enabled=cfg.enabled,
|
||||
updated_at=cfg.updated_at.isoformat() if cfg.updated_at else datetime.utcnow().isoformat(),
|
||||
)
|
||||
for cfg in configs
|
||||
]
|
||||
|
||||
|
||||
@router.put("/configs/{queue_name}", response_model=WorkerConfigOut)
|
||||
async def update_worker_config(
|
||||
queue_name: str,
|
||||
body: WorkerConfigUpdate,
|
||||
user: User = Depends(require_admin),
|
||||
db: AsyncSession = Depends(get_db),
|
||||
):
|
||||
"""Update concurrency settings for a specific queue (admin only)."""
|
||||
result = await db.execute(
|
||||
select(WorkerConfig).where(WorkerConfig.queue_name == queue_name)
|
||||
)
|
||||
cfg = result.scalar_one_or_none()
|
||||
if not cfg:
|
||||
raise HTTPException(404, detail=f"No worker config found for queue '{queue_name}'")
|
||||
|
||||
if body.max_concurrency is not None:
|
||||
if body.max_concurrency < 1:
|
||||
raise HTTPException(400, detail="max_concurrency must be >= 1")
|
||||
cfg.max_concurrency = body.max_concurrency
|
||||
|
||||
if body.min_concurrency is not None:
|
||||
if body.min_concurrency < 1:
|
||||
raise HTTPException(400, detail="min_concurrency must be >= 1")
|
||||
cfg.min_concurrency = body.min_concurrency
|
||||
|
||||
if body.enabled is not None:
|
||||
cfg.enabled = body.enabled
|
||||
|
||||
# Validate min <= max after updates
|
||||
if cfg.min_concurrency > cfg.max_concurrency:
|
||||
raise HTTPException(400, detail="min_concurrency cannot exceed max_concurrency")
|
||||
|
||||
cfg.updated_at = datetime.utcnow()
|
||||
await db.commit()
|
||||
await db.refresh(cfg)
|
||||
|
||||
return WorkerConfigOut(
|
||||
queue_name=cfg.queue_name,
|
||||
max_concurrency=cfg.max_concurrency,
|
||||
min_concurrency=cfg.min_concurrency,
|
||||
enabled=cfg.enabled,
|
||||
updated_at=cfg.updated_at.isoformat(),
|
||||
)
|
||||
|
||||
@@ -17,11 +17,12 @@ from app.domains.admin.models import DashboardConfig
|
||||
|
||||
# Also re-export SystemSetting (no domain assigned — stays as-is)
|
||||
from app.models.system_setting import SystemSetting
|
||||
from app.models.worker_config import WorkerConfig
|
||||
|
||||
__all__ = [
|
||||
"Tenant", "User", "Template", "CadFile", "Product", "Order", "OrderItem", "OrderLine",
|
||||
"AuditLog", "PricingTier", "OutputType", "RenderTemplate", "ProductRenderPosition",
|
||||
"WorkflowDefinition", "WorkflowRun", "WorkflowNodeResult",
|
||||
"Material", "MaterialAlias", "AssetLibrary", "MediaAsset", "MediaAssetType", "SystemSetting",
|
||||
"DashboardConfig",
|
||||
"DashboardConfig", "WorkerConfig",
|
||||
]
|
||||
|
||||
@@ -0,0 +1,14 @@
|
||||
from datetime import datetime
|
||||
from sqlalchemy import String, Integer, Boolean, DateTime
|
||||
from sqlalchemy.orm import Mapped, mapped_column
|
||||
from app.database import Base
|
||||
|
||||
|
||||
class WorkerConfig(Base):
|
||||
__tablename__ = "worker_configs"
|
||||
|
||||
queue_name: Mapped[str] = mapped_column(String(100), primary_key=True)
|
||||
max_concurrency: Mapped[int] = mapped_column(Integer, nullable=False, default=8)
|
||||
min_concurrency: Mapped[int] = mapped_column(Integer, nullable=False, default=2)
|
||||
enabled: Mapped[bool] = mapped_column(Boolean, nullable=False, default=True)
|
||||
updated_at: Mapped[datetime] = mapped_column(DateTime, nullable=False, default=datetime.utcnow)
|
||||
@@ -10,6 +10,50 @@ from celery import shared_task
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@shared_task(name="app.tasks.beat_tasks.apply_worker_concurrency", queue="step_processing")
|
||||
def apply_worker_concurrency() -> None:
|
||||
"""Read worker_configs from DB and broadcast pool_grow to workers.
|
||||
|
||||
Runs every 5 minutes via Celery Beat. Signals all workers to adjust their
|
||||
pool size to match the max_concurrency setting for each enabled queue.
|
||||
"""
|
||||
try:
|
||||
from sqlalchemy import create_engine, select
|
||||
from sqlalchemy.orm import Session
|
||||
from app.config import settings as app_settings
|
||||
from app.models.worker_config import WorkerConfig
|
||||
from app.tasks.celery_app import celery_app
|
||||
|
||||
sync_url = app_settings.database_url.replace("+asyncpg", "")
|
||||
eng = create_engine(sync_url)
|
||||
try:
|
||||
with Session(eng) as session:
|
||||
configs = session.execute(
|
||||
select(WorkerConfig).where(WorkerConfig.enabled == True) # noqa: E712
|
||||
).scalars().all()
|
||||
for cfg in configs:
|
||||
try:
|
||||
celery_app.control.broadcast(
|
||||
"pool_grow",
|
||||
arguments={"n": cfg.max_concurrency},
|
||||
destination=None, # all workers
|
||||
reply=False,
|
||||
)
|
||||
logger.info(
|
||||
"[WORKER_SCALE] Signalled pool_grow n=%d for queue %s",
|
||||
cfg.max_concurrency, cfg.queue_name,
|
||||
)
|
||||
except Exception as exc:
|
||||
logger.warning(
|
||||
"[WORKER_SCALE] pool_grow failed for %s: %s",
|
||||
cfg.queue_name, exc,
|
||||
)
|
||||
finally:
|
||||
eng.dispose()
|
||||
except Exception as exc:
|
||||
logger.error("apply_worker_concurrency failed: %s", exc)
|
||||
|
||||
|
||||
@shared_task(name="app.tasks.beat_tasks.broadcast_queue_status", queue="step_processing")
|
||||
def broadcast_queue_status() -> None:
|
||||
"""Broadcast current queue depths to all WebSocket clients every 10s.
|
||||
|
||||
@@ -46,5 +46,9 @@ celery_app.conf.update(
|
||||
"task": "app.tasks.beat_tasks.recover_stalled_renders",
|
||||
"schedule": 300.0, # every 5 minutes
|
||||
},
|
||||
"apply-worker-concurrency-every-5m": {
|
||||
"task": "app.tasks.beat_tasks.apply_worker_concurrency",
|
||||
"schedule": 300.0, # every 5 minutes
|
||||
},
|
||||
},
|
||||
)
|
||||
|
||||
+2
-1
@@ -87,7 +87,7 @@ services:
|
||||
build:
|
||||
context: ./backend
|
||||
dockerfile: Dockerfile
|
||||
command: celery -A app.tasks.celery_app worker --loglevel=info -Q step_processing,ai_validation --concurrency=${CELERY_WORKER_CONCURRENCY:-8}
|
||||
command: celery -A app.tasks.celery_app worker --loglevel=info -Q step_processing,ai_validation --autoscale=${MAX_CONCURRENCY:-8},${MIN_CONCURRENCY:-2} --concurrency=${MIN_CONCURRENCY:-2}
|
||||
environment:
|
||||
- POSTGRES_DB=${POSTGRES_DB:-schaeffler}
|
||||
- POSTGRES_USER=${POSTGRES_USER:-schaeffler}
|
||||
@@ -121,6 +121,7 @@ services:
|
||||
dockerfile: render-worker/Dockerfile
|
||||
args:
|
||||
- BLENDER_VERSION=${BLENDER_VERSION:-5.0.1}
|
||||
command: bash -c "python3 /check_version.py && celery -A app.tasks.celery_app worker --loglevel=info -Q thumbnail_rendering --autoscale=1,1 --concurrency=1"
|
||||
environment:
|
||||
- POSTGRES_DB=${POSTGRES_DB:-schaeffler}
|
||||
- POSTGRES_USER=${POSTGRES_USER:-schaeffler}
|
||||
|
||||
@@ -166,3 +166,34 @@ export async function scaleWorkers(req: ScaleRequest): Promise<ScaleResponse> {
|
||||
const res = await api.post<ScaleResponse>('/worker/scale', req)
|
||||
return res.data
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Worker concurrency configuration
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
export interface WorkerConfig {
|
||||
queue_name: string
|
||||
max_concurrency: number
|
||||
min_concurrency: number
|
||||
enabled: boolean
|
||||
updated_at: string
|
||||
}
|
||||
|
||||
export interface WorkerConfigUpdate {
|
||||
max_concurrency?: number
|
||||
min_concurrency?: number
|
||||
enabled?: boolean
|
||||
}
|
||||
|
||||
export async function getWorkerConfigs(): Promise<WorkerConfig[]> {
|
||||
const res = await api.get<WorkerConfig[]>('/worker/configs')
|
||||
return res.data
|
||||
}
|
||||
|
||||
export async function updateWorkerConfig(
|
||||
queueName: string,
|
||||
update: WorkerConfigUpdate,
|
||||
): Promise<WorkerConfig> {
|
||||
const res = await api.put<WorkerConfig>(`/worker/configs/${queueName}`, update)
|
||||
return res.data
|
||||
}
|
||||
|
||||
@@ -1,13 +1,16 @@
|
||||
import { useState } from 'react'
|
||||
import { useQuery, useMutation, useQueryClient } from '@tanstack/react-query'
|
||||
import { toast } from 'sonner'
|
||||
import { RefreshCw, ChevronDown, ChevronRight, Cpu, Layers, Minus, Plus } from 'lucide-react'
|
||||
import { RefreshCw, ChevronDown, ChevronRight, Cpu, Layers, Minus, Plus, Settings2 } from 'lucide-react'
|
||||
import {
|
||||
getCeleryWorkers,
|
||||
getQueueStatus,
|
||||
scaleWorkers,
|
||||
getWorkerConfigs,
|
||||
updateWorkerConfig,
|
||||
type CeleryWorker,
|
||||
type ScaleRequest,
|
||||
type WorkerConfig,
|
||||
} from '../api/worker'
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
@@ -161,6 +164,93 @@ function QueueDepthRow({ queue, depth }: { queue: string; depth: number }) {
|
||||
)
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Concurrency config row
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
function ConcurrencyConfigRow({ config }: { config: WorkerConfig }) {
|
||||
const qc = useQueryClient()
|
||||
const [minVal, setMinVal] = useState(config.min_concurrency)
|
||||
const [maxVal, setMaxVal] = useState(config.max_concurrency)
|
||||
|
||||
const saveMut = useMutation({
|
||||
mutationFn: () =>
|
||||
updateWorkerConfig(config.queue_name, {
|
||||
min_concurrency: minVal,
|
||||
max_concurrency: maxVal,
|
||||
}),
|
||||
onSuccess: () => {
|
||||
toast.success(`Saved concurrency for ${config.queue_name}`)
|
||||
qc.invalidateQueries({ queryKey: ['worker-configs'] })
|
||||
},
|
||||
onError: (e: unknown) => {
|
||||
const detail = (e as { response?: { data?: { detail?: string } } })?.response?.data?.detail
|
||||
toast.error(detail ?? `Failed to save ${config.queue_name}`)
|
||||
},
|
||||
})
|
||||
|
||||
const isDirty = minVal !== config.min_concurrency || maxVal !== config.max_concurrency
|
||||
|
||||
return (
|
||||
<div className="rounded-xl border border-border-default p-4 flex items-center justify-between gap-4 flex-wrap">
|
||||
<div className="min-w-0">
|
||||
<p className="text-sm font-medium text-content font-mono">{config.queue_name}</p>
|
||||
<p className="text-xs text-content-muted mt-0.5">
|
||||
{config.enabled ? 'enabled' : 'disabled'} · updated{' '}
|
||||
{new Date(config.updated_at).toLocaleString()}
|
||||
</p>
|
||||
</div>
|
||||
<div className="flex items-center gap-6 shrink-0">
|
||||
{/* Min concurrency */}
|
||||
<div className="flex flex-col items-center gap-1">
|
||||
<span className="text-xs text-content-muted">Min</span>
|
||||
<div className="flex items-center gap-1">
|
||||
<button
|
||||
onClick={() => setMinVal((v) => Math.max(1, v - 1))}
|
||||
className="p-1 rounded-md bg-surface-muted hover:bg-surface-hover text-content transition-colors"
|
||||
>
|
||||
<Minus size={12} />
|
||||
</button>
|
||||
<span className="w-6 text-center text-sm font-semibold text-content">{minVal}</span>
|
||||
<button
|
||||
onClick={() => setMinVal((v) => Math.min(maxVal, v + 1))}
|
||||
className="p-1 rounded-md bg-surface-muted hover:bg-surface-hover text-content transition-colors"
|
||||
>
|
||||
<Plus size={12} />
|
||||
</button>
|
||||
</div>
|
||||
</div>
|
||||
{/* Max concurrency */}
|
||||
<div className="flex flex-col items-center gap-1">
|
||||
<span className="text-xs text-content-muted">Max</span>
|
||||
<div className="flex items-center gap-1">
|
||||
<button
|
||||
onClick={() => setMaxVal((v) => Math.max(minVal, v - 1))}
|
||||
className="p-1 rounded-md bg-surface-muted hover:bg-surface-hover text-content transition-colors"
|
||||
>
|
||||
<Minus size={12} />
|
||||
</button>
|
||||
<span className="w-6 text-center text-sm font-semibold text-content">{maxVal}</span>
|
||||
<button
|
||||
onClick={() => setMaxVal((v) => Math.min(64, v + 1))}
|
||||
className="p-1 rounded-md bg-surface-muted hover:bg-surface-hover text-content transition-colors"
|
||||
>
|
||||
<Plus size={12} />
|
||||
</button>
|
||||
</div>
|
||||
</div>
|
||||
<button
|
||||
onClick={() => saveMut.mutate()}
|
||||
disabled={saveMut.isPending || !isDirty}
|
||||
className={`btn-primary text-xs px-3 py-1.5 ${!isDirty ? 'opacity-50 cursor-not-allowed' : ''}`}
|
||||
>
|
||||
{saveMut.isPending ? 'Saving…' : 'Save'}
|
||||
</button>
|
||||
</div>
|
||||
</div>
|
||||
)
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Main page
|
||||
// ---------------------------------------------------------------------------
|
||||
@@ -180,9 +270,15 @@ export default function WorkerManagement() {
|
||||
refetchInterval: 5_000,
|
||||
})
|
||||
|
||||
const { data: workerConfigs, isLoading: configsLoading } = useQuery({
|
||||
queryKey: ['worker-configs'],
|
||||
queryFn: getWorkerConfigs,
|
||||
})
|
||||
|
||||
function refresh() {
|
||||
qc.invalidateQueries({ queryKey: ['celery-workers'] })
|
||||
qc.invalidateQueries({ queryKey: ['queue-status'] })
|
||||
qc.invalidateQueries({ queryKey: ['worker-configs'] })
|
||||
}
|
||||
|
||||
const workers = workerData?.workers ?? []
|
||||
@@ -263,6 +359,33 @@ export default function WorkerManagement() {
|
||||
)}
|
||||
</section>
|
||||
|
||||
{/* Concurrency settings */}
|
||||
<section>
|
||||
<div className="flex items-center gap-2 mb-3">
|
||||
<Settings2 size={16} className="text-accent" />
|
||||
<h2 className="text-base font-semibold text-content">Concurrency Settings</h2>
|
||||
</div>
|
||||
<p className="text-xs text-content-muted mb-4">
|
||||
Configure min/max concurrency per queue. The beat scheduler applies these settings
|
||||
every 5 minutes via Celery pool signals. Changes are persisted in the database.
|
||||
</p>
|
||||
{configsLoading ? (
|
||||
<div className="space-y-2">
|
||||
{[0, 1, 2].map((i) => (
|
||||
<div key={i} className="h-16 rounded-xl bg-surface-muted animate-pulse" />
|
||||
))}
|
||||
</div>
|
||||
) : !workerConfigs || workerConfigs.length === 0 ? (
|
||||
<p className="text-sm text-content-muted">No worker configs available.</p>
|
||||
) : (
|
||||
<div className="space-y-2">
|
||||
{workerConfigs.map((cfg) => (
|
||||
<ConcurrencyConfigRow key={cfg.queue_name} config={cfg} />
|
||||
))}
|
||||
</div>
|
||||
)}
|
||||
</section>
|
||||
|
||||
{/* Scale controls */}
|
||||
<section>
|
||||
<h2 className="text-base font-semibold text-content mb-3">Scale Services</h2>
|
||||
|
||||
Reference in New Issue
Block a user