feat: extract workflow notifications phase 3
This commit is contained in:
@@ -5,6 +5,7 @@ Covers:
|
||||
- render_order_line_task — full still/turntable render pipeline for one order line
|
||||
"""
|
||||
import logging
|
||||
from datetime import datetime
|
||||
|
||||
from app.tasks.celery_app import celery_app
|
||||
from app.core.task_logs import log_task_event
|
||||
@@ -70,10 +71,11 @@ def render_order_line_task(self, order_line_id: str):
|
||||
|
||||
emit(order_line_id, "Celery render task started")
|
||||
try:
|
||||
from sqlalchemy import create_engine, select
|
||||
from sqlalchemy import create_engine
|
||||
from sqlalchemy.orm import Session
|
||||
from app.config import settings as app_settings
|
||||
from app.domains.rendering.workflow_runtime_services import (
|
||||
emit_order_line_render_notifications,
|
||||
persist_order_line_output,
|
||||
prepare_order_line_render_context,
|
||||
resolve_order_line_template_context,
|
||||
@@ -455,7 +457,6 @@ def render_order_line_task(self, order_line_id: str):
|
||||
else:
|
||||
pl.step_error("blender_still", "render_to_file returned False")
|
||||
|
||||
new_status = "completed" if success else "failed"
|
||||
render_end = datetime.utcnow()
|
||||
elapsed = (render_end - render_start).total_seconds()
|
||||
try:
|
||||
@@ -476,51 +477,18 @@ def render_order_line_task(self, order_line_id: str):
|
||||
else:
|
||||
emit(order_line_id, f"Render failed after {elapsed:.1f}s", "error")
|
||||
|
||||
# Broadcast WebSocket event for live UI updates
|
||||
try:
|
||||
from app.core.websocket import publish_event_sync
|
||||
_tenant_id = str(line.product.cad_file.tenant_id) if (
|
||||
emit_order_line_render_notifications(
|
||||
success=success,
|
||||
order_line_id=order_line_id,
|
||||
tenant_id=str(line.product.cad_file.tenant_id) if (
|
||||
line.product and line.product.cad_file and line.product.cad_file.tenant_id
|
||||
) else None
|
||||
if _tenant_id:
|
||||
publish_event_sync(_tenant_id, {
|
||||
"type": "render_complete" if success else "render_failed",
|
||||
"order_line_id": order_line_id,
|
||||
"order_id": str(line.order_id),
|
||||
"status": new_status,
|
||||
})
|
||||
except Exception:
|
||||
logger.debug("WebSocket publish skipped (non-fatal)")
|
||||
|
||||
# Emit per-render activity event (channel=activity, not shown in bell dropdown)
|
||||
try:
|
||||
from app.models.order import Order as OrderModel
|
||||
order_row = session.execute(
|
||||
select(OrderModel.created_by, OrderModel.order_number)
|
||||
.where(OrderModel.id == line.order_id)
|
||||
).one_or_none()
|
||||
if order_row:
|
||||
from app.services.notification_service import emit_notification_sync, CHANNEL_ACTIVITY
|
||||
details: dict = {
|
||||
"order_number": order_row[1],
|
||||
"product_name": product_name,
|
||||
"output_type": ot_name,
|
||||
}
|
||||
if not success and isinstance(render_log, dict):
|
||||
err = render_log.get("error") or render_log.get("stderr", "")
|
||||
if err:
|
||||
details["error"] = str(err)[:300]
|
||||
emit_notification_sync(
|
||||
actor_user_id=None,
|
||||
target_user_id=str(order_row[0]),
|
||||
action="render.completed" if success else "render.failed",
|
||||
entity_type="order",
|
||||
entity_id=str(line.order_id),
|
||||
details=details,
|
||||
channel=CHANNEL_ACTIVITY,
|
||||
) else None,
|
||||
product_name=product_name,
|
||||
output_type_name=ot_name,
|
||||
render_log=render_log if isinstance(render_log, dict) else None,
|
||||
session=session,
|
||||
line=line,
|
||||
)
|
||||
except Exception:
|
||||
logger.exception("Failed to emit render activity event")
|
||||
|
||||
# Check if all lines for this order are done → auto-advance
|
||||
order_id_str = str(line.order_id)
|
||||
@@ -571,6 +539,9 @@ def render_order_line_task(self, order_line_id: str):
|
||||
try:
|
||||
from sqlalchemy import select as sel2
|
||||
from app.models.order import Order as OrderModel2
|
||||
from app.domains.rendering.workflow_runtime_services import (
|
||||
emit_order_line_render_notifications,
|
||||
)
|
||||
eng4 = create_engine(sync_url2)
|
||||
with SyncSession(eng4) as s4:
|
||||
set_tenant_context_sync(s4, _tenant_id)
|
||||
@@ -581,20 +552,16 @@ def render_order_line_task(self, order_line_id: str):
|
||||
).one_or_none()
|
||||
eng4.dispose()
|
||||
if order_row2:
|
||||
from app.services.notification_service import emit_notification_sync, CHANNEL_ACTIVITY
|
||||
emit_notification_sync(
|
||||
actor_user_id=None,
|
||||
target_user_id=str(order_row2[0]),
|
||||
action="render.failed",
|
||||
entity_type="order",
|
||||
entity_id=None,
|
||||
details={
|
||||
"order_number": order_row2[1],
|
||||
"product_name": "unknown",
|
||||
"output_type": "unknown",
|
||||
"error": str(exc)[:300],
|
||||
},
|
||||
channel=CHANNEL_ACTIVITY,
|
||||
emit_order_line_render_notifications(
|
||||
success=False,
|
||||
order_line_id=order_line_id,
|
||||
order_number=order_row2[1],
|
||||
order_creator_id=str(order_row2[0]),
|
||||
product_name="unknown",
|
||||
output_type_name="unknown",
|
||||
render_log={"error": str(exc)},
|
||||
emit_websocket=False,
|
||||
activity_entity_id=None,
|
||||
)
|
||||
except Exception:
|
||||
logger.exception("Failed to emit render failure activity event")
|
||||
|
||||
@@ -227,6 +227,90 @@ def _resolve_output_mime_type(output_path: str) -> str:
|
||||
return "image/png"
|
||||
|
||||
|
||||
def _extract_render_error(render_log: dict[str, Any] | None) -> str | None:
|
||||
if not isinstance(render_log, dict):
|
||||
return None
|
||||
error_value = render_log.get("error") or render_log.get("stderr", "")
|
||||
if not error_value:
|
||||
return None
|
||||
return str(error_value)[:300]
|
||||
|
||||
|
||||
def emit_order_line_render_notifications(
|
||||
*,
|
||||
success: bool,
|
||||
order_line_id: str,
|
||||
order_id: str | None = None,
|
||||
order_number: str | None = None,
|
||||
order_creator_id: str | None = None,
|
||||
tenant_id: str | None = None,
|
||||
product_name: str,
|
||||
output_type_name: str,
|
||||
render_log: dict[str, Any] | None = None,
|
||||
session: Session | None = None,
|
||||
line: OrderLine | None = None,
|
||||
emit_websocket: bool = True,
|
||||
emit_activity: bool = True,
|
||||
activity_entity_id: str | None = None,
|
||||
) -> None:
|
||||
"""Emit the legacy websocket and activity notifications for an order-line render."""
|
||||
resolved_order_id = order_id or (str(line.order_id) if line is not None else None)
|
||||
resolved_entity_id = activity_entity_id if activity_entity_id is not None else resolved_order_id
|
||||
|
||||
if session is not None and resolved_order_id and (order_creator_id is None or order_number is None):
|
||||
order_row = session.execute(
|
||||
select(Order.created_by, Order.order_number).where(Order.id == resolved_order_id)
|
||||
).one_or_none()
|
||||
if order_row:
|
||||
if order_creator_id is None:
|
||||
order_creator_id = str(order_row[0])
|
||||
if order_number is None:
|
||||
order_number = order_row[1]
|
||||
|
||||
if emit_websocket and tenant_id:
|
||||
try:
|
||||
from app.core.websocket import publish_event_sync
|
||||
|
||||
publish_event_sync(
|
||||
tenant_id,
|
||||
{
|
||||
"type": "render_complete" if success else "render_failed",
|
||||
"order_line_id": order_line_id,
|
||||
"order_id": resolved_order_id,
|
||||
"status": "completed" if success else "failed",
|
||||
},
|
||||
)
|
||||
except Exception:
|
||||
logger.debug("WebSocket publish skipped (non-fatal)")
|
||||
|
||||
if not emit_activity or not order_creator_id:
|
||||
return
|
||||
|
||||
try:
|
||||
from app.services.notification_service import CHANNEL_ACTIVITY, emit_notification_sync
|
||||
|
||||
details: dict[str, Any] = {
|
||||
"order_number": order_number,
|
||||
"product_name": product_name,
|
||||
"output_type": output_type_name,
|
||||
}
|
||||
error_message = _extract_render_error(render_log)
|
||||
if not success and error_message:
|
||||
details["error"] = error_message
|
||||
|
||||
emit_notification_sync(
|
||||
actor_user_id=None,
|
||||
target_user_id=order_creator_id,
|
||||
action="render.completed" if success else "render.failed",
|
||||
entity_type="order",
|
||||
entity_id=resolved_entity_id,
|
||||
details=details,
|
||||
channel=CHANNEL_ACTIVITY,
|
||||
)
|
||||
except Exception:
|
||||
logger.exception("Failed to emit render activity event")
|
||||
|
||||
|
||||
def persist_order_line_output(
|
||||
session: Session,
|
||||
line: OrderLine,
|
||||
|
||||
@@ -17,12 +17,14 @@ from app.domains.products.models import CadFile, Product
|
||||
from app.domains.rendering.models import OutputType, RenderTemplate
|
||||
from app.domains.rendering.workflow_runtime_services import (
|
||||
auto_populate_materials_for_cad,
|
||||
emit_order_line_render_notifications,
|
||||
persist_order_line_output,
|
||||
resolve_cad_bbox,
|
||||
prepare_order_line_render_context,
|
||||
resolve_order_line_material_map,
|
||||
resolve_order_line_template_context,
|
||||
)
|
||||
from app.domains.tenants.models import Tenant
|
||||
|
||||
import app.models # noqa: F401
|
||||
|
||||
@@ -543,3 +545,178 @@ def test_persist_order_line_output_marks_failure_without_result_path(sync_sessio
|
||||
assert line.result_path is None
|
||||
assert line.render_log == {"error": "boom"}
|
||||
assert assets == []
|
||||
|
||||
|
||||
def test_emit_order_line_render_notifications_emits_websocket_and_activity(
|
||||
sync_session,
|
||||
tmp_path,
|
||||
):
|
||||
line = _seed_order_line_graph(sync_session, tmp_path)
|
||||
tenant = Tenant(name="Workflow Tenant", slug=f"workflow-{uuid.uuid4().hex[:8]}")
|
||||
sync_session.add(tenant)
|
||||
sync_session.commit()
|
||||
|
||||
line.product.cad_file.tenant_id = tenant.id
|
||||
line.product.tenant_id = tenant.id
|
||||
line.order.tenant_id = tenant.id
|
||||
sync_session.commit()
|
||||
|
||||
websocket_events: list[tuple[str, dict]] = []
|
||||
activity_events: list[dict] = []
|
||||
|
||||
def _capture_websocket(tenant_id: str, event: dict) -> None:
|
||||
websocket_events.append((tenant_id, event))
|
||||
|
||||
def _capture_activity(**payload) -> None:
|
||||
activity_events.append(payload)
|
||||
|
||||
monkeypatch = pytest.MonkeyPatch()
|
||||
monkeypatch.setattr(
|
||||
"app.core.websocket.publish_event_sync",
|
||||
_capture_websocket,
|
||||
)
|
||||
monkeypatch.setattr(
|
||||
"app.services.notification_service.emit_notification_sync",
|
||||
_capture_activity,
|
||||
)
|
||||
|
||||
try:
|
||||
emit_order_line_render_notifications(
|
||||
success=True,
|
||||
order_line_id=str(line.id),
|
||||
tenant_id=str(tenant.id),
|
||||
product_name=line.product.name or "product",
|
||||
output_type_name=line.output_type.name,
|
||||
session=sync_session,
|
||||
line=line,
|
||||
)
|
||||
finally:
|
||||
monkeypatch.undo()
|
||||
|
||||
assert websocket_events == [
|
||||
(
|
||||
str(tenant.id),
|
||||
{
|
||||
"type": "render_complete",
|
||||
"order_line_id": str(line.id),
|
||||
"order_id": str(line.order_id),
|
||||
"status": "completed",
|
||||
},
|
||||
)
|
||||
]
|
||||
assert activity_events == [
|
||||
{
|
||||
"actor_user_id": None,
|
||||
"target_user_id": str(line.order.created_by),
|
||||
"action": "render.completed",
|
||||
"entity_type": "order",
|
||||
"entity_id": str(line.order_id),
|
||||
"details": {
|
||||
"order_number": line.order.order_number,
|
||||
"product_name": line.product.name,
|
||||
"output_type": line.output_type.name,
|
||||
},
|
||||
"channel": "activity",
|
||||
}
|
||||
]
|
||||
|
||||
|
||||
def test_emit_order_line_render_notifications_truncates_failure_error_and_skips_websocket_without_tenant(
|
||||
sync_session,
|
||||
tmp_path,
|
||||
):
|
||||
line = _seed_order_line_graph(sync_session, tmp_path)
|
||||
activity_events: list[dict] = []
|
||||
websocket_events: list[tuple[str, dict]] = []
|
||||
|
||||
def _capture_websocket(tenant_id: str, event: dict) -> None:
|
||||
websocket_events.append((tenant_id, event))
|
||||
|
||||
def _capture_activity(**payload) -> None:
|
||||
activity_events.append(payload)
|
||||
|
||||
monkeypatch = pytest.MonkeyPatch()
|
||||
monkeypatch.setattr(
|
||||
"app.core.websocket.publish_event_sync",
|
||||
_capture_websocket,
|
||||
)
|
||||
monkeypatch.setattr(
|
||||
"app.services.notification_service.emit_notification_sync",
|
||||
_capture_activity,
|
||||
)
|
||||
|
||||
try:
|
||||
emit_order_line_render_notifications(
|
||||
success=False,
|
||||
order_line_id=str(line.id),
|
||||
product_name=line.product.name or "product",
|
||||
output_type_name=line.output_type.name,
|
||||
render_log={"error": "x" * 400},
|
||||
session=sync_session,
|
||||
line=line,
|
||||
)
|
||||
finally:
|
||||
monkeypatch.undo()
|
||||
|
||||
assert websocket_events == []
|
||||
assert activity_events == [
|
||||
{
|
||||
"actor_user_id": None,
|
||||
"target_user_id": str(line.order.created_by),
|
||||
"action": "render.failed",
|
||||
"entity_type": "order",
|
||||
"entity_id": str(line.order_id),
|
||||
"details": {
|
||||
"order_number": line.order.order_number,
|
||||
"product_name": line.product.name,
|
||||
"output_type": line.output_type.name,
|
||||
"error": "x" * 300,
|
||||
},
|
||||
"channel": "activity",
|
||||
}
|
||||
]
|
||||
|
||||
|
||||
def test_emit_order_line_render_notifications_supports_retry_exhausted_activity_payload():
|
||||
activity_events: list[dict] = []
|
||||
|
||||
def _capture_activity(**payload) -> None:
|
||||
activity_events.append(payload)
|
||||
|
||||
monkeypatch = pytest.MonkeyPatch()
|
||||
monkeypatch.setattr(
|
||||
"app.services.notification_service.emit_notification_sync",
|
||||
_capture_activity,
|
||||
)
|
||||
|
||||
try:
|
||||
emit_order_line_render_notifications(
|
||||
success=False,
|
||||
order_line_id="line-1",
|
||||
order_number="ORD-FAIL",
|
||||
order_creator_id="user-1",
|
||||
product_name="unknown",
|
||||
output_type_name="unknown",
|
||||
render_log={"error": "retry exhausted"},
|
||||
emit_websocket=False,
|
||||
activity_entity_id=None,
|
||||
)
|
||||
finally:
|
||||
monkeypatch.undo()
|
||||
|
||||
assert activity_events == [
|
||||
{
|
||||
"actor_user_id": None,
|
||||
"target_user_id": "user-1",
|
||||
"action": "render.failed",
|
||||
"entity_type": "order",
|
||||
"entity_id": None,
|
||||
"details": {
|
||||
"order_number": "ORD-FAIL",
|
||||
"product_name": "unknown",
|
||||
"output_type": "unknown",
|
||||
"error": "retry exhausted",
|
||||
},
|
||||
"channel": "activity",
|
||||
}
|
||||
]
|
||||
|
||||
@@ -22,7 +22,7 @@
|
||||
- [ ] Missing legacy steps extracted into reusable executors
|
||||
- [ ] Extracted node behavior matches legacy services
|
||||
- [ ] Node-level tests cover success and failure paths
|
||||
- Progress: `order_line_setup`, `resolve_template`, `material_map_resolve`, `auto_populate_materials`, `glb_bbox`, and `output_save` are extracted and covered by targeted backend tests; remaining parity nodes are still open.
|
||||
- Progress: `order_line_setup`, `resolve_template`, `material_map_resolve`, `auto_populate_materials`, `glb_bbox`, `output_save`, and `notify` are extracted and covered by targeted backend tests; remaining parity nodes are still open.
|
||||
|
||||
### Phase 4
|
||||
|
||||
|
||||
@@ -57,7 +57,7 @@
|
||||
- `E3-T5` Extract `auto_populate_materials`. `completed`
|
||||
- `E3-T6` Extract `glb_bbox`. `completed`
|
||||
- `E3-T7` Extract `output_save`. `completed`
|
||||
- `E3-T8` Extract `notify`.
|
||||
- `E3-T8` Extract `notify`. `completed`
|
||||
- `E3-T9` Add executor tests for all extracted nodes.
|
||||
|
||||
### Legacy Sources
|
||||
|
||||
@@ -8,7 +8,7 @@ Bring `/workflows` to full production parity with the existing legacy render pip
|
||||
|
||||
- Phase 1 completed on canonical config storage, preset migration, and legacy-safe runtime extraction.
|
||||
- Phase 2 completed on backend node registry, node definitions API, and schema-driven editor palette/settings.
|
||||
- Phase 3 in progress: `order_line_setup` and `resolve_template` are extracted behind the legacy task boundary and validated with targeted backend tests.
|
||||
- Phase 3 in progress: `order_line_setup`, `resolve_template`, `material_map_resolve`, `auto_populate_materials`, `glb_bbox`, `output_save`, and `notify` are extracted behind the legacy task boundary and validated with targeted backend tests.
|
||||
|
||||
## Non-Negotiables
|
||||
|
||||
|
||||
Reference in New Issue
Block a user