From 98b3eadcb2b54ef710820ffd137c811de5110a62 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Hartmut=20N=C3=B6renberg?= Date: Tue, 7 Apr 2026 09:57:39 +0200 Subject: [PATCH] feat: extract workflow notifications phase 3 --- .../pipeline/tasks/render_order_line.py | 87 +++------ .../rendering/workflow_runtime_services.py | 84 +++++++++ .../domains/test_workflow_runtime_services.py | 177 ++++++++++++++++++ docs/workflows/WORKFLOW_DELIVERY_CHECKLIST.md | 2 +- .../WORKFLOW_IMPLEMENTATION_BACKLOG.md | 2 +- docs/workflows/WORKFLOW_MIGRATION_PLAN.md | 2 +- 6 files changed, 291 insertions(+), 63 deletions(-) diff --git a/backend/app/domains/pipeline/tasks/render_order_line.py b/backend/app/domains/pipeline/tasks/render_order_line.py index 013366e..fe32d0c 100644 --- a/backend/app/domains/pipeline/tasks/render_order_line.py +++ b/backend/app/domains/pipeline/tasks/render_order_line.py @@ -5,6 +5,7 @@ Covers: - render_order_line_task — full still/turntable render pipeline for one order line """ import logging +from datetime import datetime from app.tasks.celery_app import celery_app from app.core.task_logs import log_task_event @@ -70,10 +71,11 @@ def render_order_line_task(self, order_line_id: str): emit(order_line_id, "Celery render task started") try: - from sqlalchemy import create_engine, select + from sqlalchemy import create_engine from sqlalchemy.orm import Session from app.config import settings as app_settings from app.domains.rendering.workflow_runtime_services import ( + emit_order_line_render_notifications, persist_order_line_output, prepare_order_line_render_context, resolve_order_line_template_context, @@ -455,7 +457,6 @@ def render_order_line_task(self, order_line_id: str): else: pl.step_error("blender_still", "render_to_file returned False") - new_status = "completed" if success else "failed" render_end = datetime.utcnow() elapsed = (render_end - render_start).total_seconds() try: @@ -476,51 +477,18 @@ def render_order_line_task(self, order_line_id: str): else: emit(order_line_id, f"Render failed after {elapsed:.1f}s", "error") - # Broadcast WebSocket event for live UI updates - try: - from app.core.websocket import publish_event_sync - _tenant_id = str(line.product.cad_file.tenant_id) if ( + emit_order_line_render_notifications( + success=success, + order_line_id=order_line_id, + tenant_id=str(line.product.cad_file.tenant_id) if ( line.product and line.product.cad_file and line.product.cad_file.tenant_id - ) else None - if _tenant_id: - publish_event_sync(_tenant_id, { - "type": "render_complete" if success else "render_failed", - "order_line_id": order_line_id, - "order_id": str(line.order_id), - "status": new_status, - }) - except Exception: - logger.debug("WebSocket publish skipped (non-fatal)") - - # Emit per-render activity event (channel=activity, not shown in bell dropdown) - try: - from app.models.order import Order as OrderModel - order_row = session.execute( - select(OrderModel.created_by, OrderModel.order_number) - .where(OrderModel.id == line.order_id) - ).one_or_none() - if order_row: - from app.services.notification_service import emit_notification_sync, CHANNEL_ACTIVITY - details: dict = { - "order_number": order_row[1], - "product_name": product_name, - "output_type": ot_name, - } - if not success and isinstance(render_log, dict): - err = render_log.get("error") or render_log.get("stderr", "") - if err: - details["error"] = str(err)[:300] - emit_notification_sync( - actor_user_id=None, - target_user_id=str(order_row[0]), - action="render.completed" if success else "render.failed", - entity_type="order", - entity_id=str(line.order_id), - details=details, - channel=CHANNEL_ACTIVITY, - ) - except Exception: - logger.exception("Failed to emit render activity event") + ) else None, + product_name=product_name, + output_type_name=ot_name, + render_log=render_log if isinstance(render_log, dict) else None, + session=session, + line=line, + ) # Check if all lines for this order are done → auto-advance order_id_str = str(line.order_id) @@ -571,6 +539,9 @@ def render_order_line_task(self, order_line_id: str): try: from sqlalchemy import select as sel2 from app.models.order import Order as OrderModel2 + from app.domains.rendering.workflow_runtime_services import ( + emit_order_line_render_notifications, + ) eng4 = create_engine(sync_url2) with SyncSession(eng4) as s4: set_tenant_context_sync(s4, _tenant_id) @@ -581,20 +552,16 @@ def render_order_line_task(self, order_line_id: str): ).one_or_none() eng4.dispose() if order_row2: - from app.services.notification_service import emit_notification_sync, CHANNEL_ACTIVITY - emit_notification_sync( - actor_user_id=None, - target_user_id=str(order_row2[0]), - action="render.failed", - entity_type="order", - entity_id=None, - details={ - "order_number": order_row2[1], - "product_name": "unknown", - "output_type": "unknown", - "error": str(exc)[:300], - }, - channel=CHANNEL_ACTIVITY, + emit_order_line_render_notifications( + success=False, + order_line_id=order_line_id, + order_number=order_row2[1], + order_creator_id=str(order_row2[0]), + product_name="unknown", + output_type_name="unknown", + render_log={"error": str(exc)}, + emit_websocket=False, + activity_entity_id=None, ) except Exception: logger.exception("Failed to emit render failure activity event") diff --git a/backend/app/domains/rendering/workflow_runtime_services.py b/backend/app/domains/rendering/workflow_runtime_services.py index 9c602cb..33e7f29 100644 --- a/backend/app/domains/rendering/workflow_runtime_services.py +++ b/backend/app/domains/rendering/workflow_runtime_services.py @@ -227,6 +227,90 @@ def _resolve_output_mime_type(output_path: str) -> str: return "image/png" +def _extract_render_error(render_log: dict[str, Any] | None) -> str | None: + if not isinstance(render_log, dict): + return None + error_value = render_log.get("error") or render_log.get("stderr", "") + if not error_value: + return None + return str(error_value)[:300] + + +def emit_order_line_render_notifications( + *, + success: bool, + order_line_id: str, + order_id: str | None = None, + order_number: str | None = None, + order_creator_id: str | None = None, + tenant_id: str | None = None, + product_name: str, + output_type_name: str, + render_log: dict[str, Any] | None = None, + session: Session | None = None, + line: OrderLine | None = None, + emit_websocket: bool = True, + emit_activity: bool = True, + activity_entity_id: str | None = None, +) -> None: + """Emit the legacy websocket and activity notifications for an order-line render.""" + resolved_order_id = order_id or (str(line.order_id) if line is not None else None) + resolved_entity_id = activity_entity_id if activity_entity_id is not None else resolved_order_id + + if session is not None and resolved_order_id and (order_creator_id is None or order_number is None): + order_row = session.execute( + select(Order.created_by, Order.order_number).where(Order.id == resolved_order_id) + ).one_or_none() + if order_row: + if order_creator_id is None: + order_creator_id = str(order_row[0]) + if order_number is None: + order_number = order_row[1] + + if emit_websocket and tenant_id: + try: + from app.core.websocket import publish_event_sync + + publish_event_sync( + tenant_id, + { + "type": "render_complete" if success else "render_failed", + "order_line_id": order_line_id, + "order_id": resolved_order_id, + "status": "completed" if success else "failed", + }, + ) + except Exception: + logger.debug("WebSocket publish skipped (non-fatal)") + + if not emit_activity or not order_creator_id: + return + + try: + from app.services.notification_service import CHANNEL_ACTIVITY, emit_notification_sync + + details: dict[str, Any] = { + "order_number": order_number, + "product_name": product_name, + "output_type": output_type_name, + } + error_message = _extract_render_error(render_log) + if not success and error_message: + details["error"] = error_message + + emit_notification_sync( + actor_user_id=None, + target_user_id=order_creator_id, + action="render.completed" if success else "render.failed", + entity_type="order", + entity_id=resolved_entity_id, + details=details, + channel=CHANNEL_ACTIVITY, + ) + except Exception: + logger.exception("Failed to emit render activity event") + + def persist_order_line_output( session: Session, line: OrderLine, diff --git a/backend/tests/domains/test_workflow_runtime_services.py b/backend/tests/domains/test_workflow_runtime_services.py index 18efb66..29ad629 100644 --- a/backend/tests/domains/test_workflow_runtime_services.py +++ b/backend/tests/domains/test_workflow_runtime_services.py @@ -17,12 +17,14 @@ from app.domains.products.models import CadFile, Product from app.domains.rendering.models import OutputType, RenderTemplate from app.domains.rendering.workflow_runtime_services import ( auto_populate_materials_for_cad, + emit_order_line_render_notifications, persist_order_line_output, resolve_cad_bbox, prepare_order_line_render_context, resolve_order_line_material_map, resolve_order_line_template_context, ) +from app.domains.tenants.models import Tenant import app.models # noqa: F401 @@ -543,3 +545,178 @@ def test_persist_order_line_output_marks_failure_without_result_path(sync_sessio assert line.result_path is None assert line.render_log == {"error": "boom"} assert assets == [] + + +def test_emit_order_line_render_notifications_emits_websocket_and_activity( + sync_session, + tmp_path, +): + line = _seed_order_line_graph(sync_session, tmp_path) + tenant = Tenant(name="Workflow Tenant", slug=f"workflow-{uuid.uuid4().hex[:8]}") + sync_session.add(tenant) + sync_session.commit() + + line.product.cad_file.tenant_id = tenant.id + line.product.tenant_id = tenant.id + line.order.tenant_id = tenant.id + sync_session.commit() + + websocket_events: list[tuple[str, dict]] = [] + activity_events: list[dict] = [] + + def _capture_websocket(tenant_id: str, event: dict) -> None: + websocket_events.append((tenant_id, event)) + + def _capture_activity(**payload) -> None: + activity_events.append(payload) + + monkeypatch = pytest.MonkeyPatch() + monkeypatch.setattr( + "app.core.websocket.publish_event_sync", + _capture_websocket, + ) + monkeypatch.setattr( + "app.services.notification_service.emit_notification_sync", + _capture_activity, + ) + + try: + emit_order_line_render_notifications( + success=True, + order_line_id=str(line.id), + tenant_id=str(tenant.id), + product_name=line.product.name or "product", + output_type_name=line.output_type.name, + session=sync_session, + line=line, + ) + finally: + monkeypatch.undo() + + assert websocket_events == [ + ( + str(tenant.id), + { + "type": "render_complete", + "order_line_id": str(line.id), + "order_id": str(line.order_id), + "status": "completed", + }, + ) + ] + assert activity_events == [ + { + "actor_user_id": None, + "target_user_id": str(line.order.created_by), + "action": "render.completed", + "entity_type": "order", + "entity_id": str(line.order_id), + "details": { + "order_number": line.order.order_number, + "product_name": line.product.name, + "output_type": line.output_type.name, + }, + "channel": "activity", + } + ] + + +def test_emit_order_line_render_notifications_truncates_failure_error_and_skips_websocket_without_tenant( + sync_session, + tmp_path, +): + line = _seed_order_line_graph(sync_session, tmp_path) + activity_events: list[dict] = [] + websocket_events: list[tuple[str, dict]] = [] + + def _capture_websocket(tenant_id: str, event: dict) -> None: + websocket_events.append((tenant_id, event)) + + def _capture_activity(**payload) -> None: + activity_events.append(payload) + + monkeypatch = pytest.MonkeyPatch() + monkeypatch.setattr( + "app.core.websocket.publish_event_sync", + _capture_websocket, + ) + monkeypatch.setattr( + "app.services.notification_service.emit_notification_sync", + _capture_activity, + ) + + try: + emit_order_line_render_notifications( + success=False, + order_line_id=str(line.id), + product_name=line.product.name or "product", + output_type_name=line.output_type.name, + render_log={"error": "x" * 400}, + session=sync_session, + line=line, + ) + finally: + monkeypatch.undo() + + assert websocket_events == [] + assert activity_events == [ + { + "actor_user_id": None, + "target_user_id": str(line.order.created_by), + "action": "render.failed", + "entity_type": "order", + "entity_id": str(line.order_id), + "details": { + "order_number": line.order.order_number, + "product_name": line.product.name, + "output_type": line.output_type.name, + "error": "x" * 300, + }, + "channel": "activity", + } + ] + + +def test_emit_order_line_render_notifications_supports_retry_exhausted_activity_payload(): + activity_events: list[dict] = [] + + def _capture_activity(**payload) -> None: + activity_events.append(payload) + + monkeypatch = pytest.MonkeyPatch() + monkeypatch.setattr( + "app.services.notification_service.emit_notification_sync", + _capture_activity, + ) + + try: + emit_order_line_render_notifications( + success=False, + order_line_id="line-1", + order_number="ORD-FAIL", + order_creator_id="user-1", + product_name="unknown", + output_type_name="unknown", + render_log={"error": "retry exhausted"}, + emit_websocket=False, + activity_entity_id=None, + ) + finally: + monkeypatch.undo() + + assert activity_events == [ + { + "actor_user_id": None, + "target_user_id": "user-1", + "action": "render.failed", + "entity_type": "order", + "entity_id": None, + "details": { + "order_number": "ORD-FAIL", + "product_name": "unknown", + "output_type": "unknown", + "error": "retry exhausted", + }, + "channel": "activity", + } + ] diff --git a/docs/workflows/WORKFLOW_DELIVERY_CHECKLIST.md b/docs/workflows/WORKFLOW_DELIVERY_CHECKLIST.md index 704d8a8..aec0697 100644 --- a/docs/workflows/WORKFLOW_DELIVERY_CHECKLIST.md +++ b/docs/workflows/WORKFLOW_DELIVERY_CHECKLIST.md @@ -22,7 +22,7 @@ - [ ] Missing legacy steps extracted into reusable executors - [ ] Extracted node behavior matches legacy services - [ ] Node-level tests cover success and failure paths -- Progress: `order_line_setup`, `resolve_template`, `material_map_resolve`, `auto_populate_materials`, `glb_bbox`, and `output_save` are extracted and covered by targeted backend tests; remaining parity nodes are still open. +- Progress: `order_line_setup`, `resolve_template`, `material_map_resolve`, `auto_populate_materials`, `glb_bbox`, `output_save`, and `notify` are extracted and covered by targeted backend tests; remaining parity nodes are still open. ### Phase 4 diff --git a/docs/workflows/WORKFLOW_IMPLEMENTATION_BACKLOG.md b/docs/workflows/WORKFLOW_IMPLEMENTATION_BACKLOG.md index bd78e70..5b3d014 100644 --- a/docs/workflows/WORKFLOW_IMPLEMENTATION_BACKLOG.md +++ b/docs/workflows/WORKFLOW_IMPLEMENTATION_BACKLOG.md @@ -57,7 +57,7 @@ - `E3-T5` Extract `auto_populate_materials`. `completed` - `E3-T6` Extract `glb_bbox`. `completed` - `E3-T7` Extract `output_save`. `completed` -- `E3-T8` Extract `notify`. +- `E3-T8` Extract `notify`. `completed` - `E3-T9` Add executor tests for all extracted nodes. ### Legacy Sources diff --git a/docs/workflows/WORKFLOW_MIGRATION_PLAN.md b/docs/workflows/WORKFLOW_MIGRATION_PLAN.md index 192c004..90c2202 100644 --- a/docs/workflows/WORKFLOW_MIGRATION_PLAN.md +++ b/docs/workflows/WORKFLOW_MIGRATION_PLAN.md @@ -8,7 +8,7 @@ Bring `/workflows` to full production parity with the existing legacy render pip - Phase 1 completed on canonical config storage, preset migration, and legacy-safe runtime extraction. - Phase 2 completed on backend node registry, node definitions API, and schema-driven editor palette/settings. -- Phase 3 in progress: `order_line_setup` and `resolve_template` are extracted behind the legacy task boundary and validated with targeted backend tests. +- Phase 3 in progress: `order_line_setup`, `resolve_template`, `material_map_resolve`, `auto_populate_materials`, `glb_bbox`, `output_save`, and `notify` are extracted behind the legacy task boundary and validated with targeted backend tests. ## Non-Negotiables