Files
Hartmut 121fbdafd3 refactor(phase3): remove dead services + STL remnant cleanup
Phase 3.2 — Delete orphaned service directories:
  - blender-renderer/ (HTTP microservice replaced by render-worker subprocess)
  - threejs-renderer/ (replaced by render-worker)
  - flamenco/ (removed in migration 032, directory still existed on disk)

Phase 3.2 — Remove STL workflow remnants:
  - analytics.py: remove avg_stl_s from RenderTimeBreakdown schema (always None)
  - kpi_service.py: remove avg_stl_s from return dicts + update docstring
  - frontend/src/api/analytics.ts: remove avg_stl_s from RenderTimeBreakdown interface
  - admin.py: remove dead blender-renderer HTTP configure call (service gone)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-08 19:30:52 +01:00

469 lines
18 KiB
Python

"""KPI / analytics query functions.
All functions return plain dicts or lists of dicts.
Uses text() for raw SQL to avoid ORM lazy-loading surprises.
Every function accepts date_from / date_to ISO-date strings to scope metrics.
"""
from datetime import date as _date
from sqlalchemy import text
from sqlalchemy.ext.asyncio import AsyncSession
def _parse_date(s: str) -> _date:
"""Convert ISO date string to datetime.date for asyncpg compatibility."""
return _date.fromisoformat(s)
async def order_throughput_by_week(
db: AsyncSession, date_from: str, date_to: str,
) -> list[dict]:
"""Weekly order creation + completion counts within the date range."""
sql = text(
"""
SELECT
TO_CHAR(DATE_TRUNC('week', created_at), 'IYYY-"W"IW') AS week,
COUNT(*) AS count,
COUNT(*) FILTER (WHERE status = 'completed') AS completed
FROM orders
WHERE created_at >= CAST(:date_from AS date)
AND created_at < CAST(:date_to AS date) + INTERVAL '1 day'
GROUP BY DATE_TRUNC('week', created_at)
ORDER BY DATE_TRUNC('week', created_at)
"""
)
result = await db.execute(sql, {"date_from": _parse_date(date_from), "date_to": _parse_date(date_to)})
rows = result.fetchall()
return [{"week": r[0], "count": r[1], "completed": r[2]} for r in rows]
async def processing_time_stats(
db: AsyncSession, date_from: str, date_to: str,
) -> dict:
"""Average and percentile processing times for completed orders in range."""
sql = text(
"""
SELECT
EXTRACT(EPOCH FROM AVG(completed_at - submitted_at))::FLOAT
AS avg_submit_to_complete_s,
EXTRACT(EPOCH FROM AVG(processing_started_at - submitted_at))::FLOAT
AS avg_submit_to_processing_s,
EXTRACT(EPOCH FROM PERCENTILE_CONT(0.5) WITHIN GROUP (
ORDER BY completed_at - submitted_at
))::FLOAT AS p50_s,
EXTRACT(EPOCH FROM PERCENTILE_CONT(0.95) WITHIN GROUP (
ORDER BY completed_at - submitted_at
))::FLOAT AS p95_s
FROM orders
WHERE status = 'completed'
AND submitted_at IS NOT NULL
AND completed_at IS NOT NULL
AND submitted_at >= CAST(:date_from AS date)
AND submitted_at < CAST(:date_to AS date) + INTERVAL '1 day'
"""
)
result = await db.execute(sql, {"date_from": _parse_date(date_from), "date_to": _parse_date(date_to)})
row = result.fetchone()
if row is None:
return {
"avg_submit_to_complete_s": None,
"avg_submit_to_processing_s": None,
"p50_s": None,
"p95_s": None,
}
return {
"avg_submit_to_complete_s": row[0],
"avg_submit_to_processing_s": row[1],
"p50_s": row[2],
"p95_s": row[3],
}
async def revenue_overview(
db: AsyncSession, date_from: str, date_to: str,
) -> list[dict]:
"""Monthly revenue (sum of estimated_price for completed orders) in range."""
sql = text(
"""
SELECT
TO_CHAR(DATE_TRUNC('month', completed_at), 'YYYY-MM') AS month,
COALESCE(SUM(estimated_price), 0)::FLOAT AS revenue,
COUNT(*) AS order_count
FROM orders
WHERE status = 'completed'
AND completed_at >= CAST(:date_from AS date)
AND completed_at < CAST(:date_to AS date) + INTERVAL '1 day'
GROUP BY DATE_TRUNC('month', completed_at)
ORDER BY DATE_TRUNC('month', completed_at)
"""
)
result = await db.execute(sql, {"date_from": _parse_date(date_from), "date_to": _parse_date(date_to)})
rows = result.fetchall()
return [{"month": r[0], "revenue": r[1], "order_count": r[2]} for r in rows]
async def item_status_breakdown(
db: AsyncSession, date_from: str, date_to: str,
) -> dict:
"""Count of order lines grouped by item_status, scoped to orders in range."""
sql = text(
"""
SELECT ol.item_status, COUNT(*) AS cnt
FROM order_lines ol
JOIN orders o ON o.id = ol.order_id
WHERE o.created_at >= CAST(:date_from AS date)
AND o.created_at < CAST(:date_to AS date) + INTERVAL '1 day'
GROUP BY ol.item_status
"""
)
result = await db.execute(sql, {"date_from": _parse_date(date_from), "date_to": _parse_date(date_to)})
rows = result.fetchall()
out: dict = {"pending": 0, "approved": 0, "rejected": 0}
for row in rows:
key = str(row[0])
out[key] = int(row[1])
return out
async def render_time_breakdown(
db: AsyncSession, date_from: str, date_to: str,
) -> dict:
"""Average render duration from completed order lines, scoped to date range.
Uses render_started_at / render_completed_at on order_lines (added in migration 015).
"""
sql = text(
"""
SELECT
AVG(EXTRACT(EPOCH FROM (render_completed_at - render_started_at))) AS avg_render_s,
COUNT(*) AS sample_count
FROM order_lines
WHERE render_status = 'completed'
AND render_started_at IS NOT NULL
AND render_completed_at IS NOT NULL
AND render_completed_at >= CAST(:date_from AS date)
AND render_completed_at < CAST(:date_to AS date) + INTERVAL '1 day'
"""
)
result = await db.execute(sql, {"date_from": _parse_date(date_from), "date_to": _parse_date(date_to)})
row = result.fetchone()
if row is None or row[1] == 0:
return {"avg_render_s": None, "avg_total_s": None, "sample_count": 0}
return {
"avg_render_s": float(row[0]) if row[0] is not None else None,
"avg_total_s": float(row[0]) if row[0] is not None else None,
"sample_count": int(row[1]),
}
async def render_time_by_output_type(
db: AsyncSession, date_from: str, date_to: str,
) -> list[dict]:
"""Render time statistics per output type for completed order lines."""
sql = text(
"""
SELECT
COALESCE(ot.name, 'Unknown') AS output_type,
COUNT(*) AS job_count,
AVG(EXTRACT(EPOCH FROM (ol.render_completed_at - ol.render_started_at))) AS avg_render_s,
MIN(EXTRACT(EPOCH FROM (ol.render_completed_at - ol.render_started_at))) AS min_render_s,
MAX(EXTRACT(EPOCH FROM (ol.render_completed_at - ol.render_started_at))) AS max_render_s,
PERCENTILE_CONT(0.5) WITHIN GROUP (
ORDER BY EXTRACT(EPOCH FROM (ol.render_completed_at - ol.render_started_at))
) AS p50_render_s
FROM order_lines ol
LEFT JOIN output_types ot ON ot.id = ol.output_type_id
WHERE ol.render_status = 'completed'
AND ol.render_started_at IS NOT NULL
AND ol.render_completed_at IS NOT NULL
AND ol.render_completed_at >= CAST(:date_from AS date)
AND ol.render_completed_at < CAST(:date_to AS date) + INTERVAL '1 day'
GROUP BY ot.id, ot.name
ORDER BY avg_render_s DESC NULLS LAST
"""
)
result = await db.execute(sql, {"date_from": _parse_date(date_from), "date_to": _parse_date(date_to)})
return [
{
"output_type": r[0],
"job_count": int(r[1]),
"avg_render_s": float(r[2]) if r[2] is not None else None,
"min_render_s": float(r[3]) if r[3] is not None else None,
"max_render_s": float(r[4]) if r[4] is not None else None,
"p50_render_s": float(r[5]) if r[5] is not None else None,
}
for r in result.fetchall()
]
async def top_level_summary(
db: AsyncSession, date_from: str, date_to: str,
) -> dict:
"""High-level summary counts and totals within the date range."""
sql = text(
"""
SELECT
COUNT(*) AS total_orders,
COUNT(*) FILTER (WHERE status = 'completed') AS completed_orders,
COALESCE(SUM(estimated_price) FILTER (WHERE status = 'completed'), 0)::FLOAT
AS total_revenue
FROM orders
WHERE created_at >= CAST(:date_from AS date)
AND created_at < CAST(:date_to AS date) + INTERVAL '1 day'
"""
)
result = await db.execute(sql, {"date_from": _parse_date(date_from), "date_to": _parse_date(date_to)})
row = result.fetchone()
items_sql = text(
"""
SELECT COUNT(*)
FROM order_lines ol
JOIN orders o ON o.id = ol.order_id
WHERE ol.output_type_id IS NOT NULL
AND o.created_at >= CAST(:date_from AS date)
AND o.created_at < CAST(:date_to AS date) + INTERVAL '1 day'
"""
)
items_result = await db.execute(items_sql, {"date_from": _parse_date(date_from), "date_to": _parse_date(date_to)})
items_count = items_result.scalar() or 0
return {
"total_orders": int(row[0]) if row else 0,
"completed_orders": int(row[1]) if row else 0,
"total_revenue": float(row[2]) if row else 0.0,
"total_rendering_items": int(items_count),
}
async def product_and_category_stats(
db: AsyncSession, date_from: str, date_to: str,
) -> dict:
"""Product-level stats: unique rendered, total products, CAD coverage, by category."""
params = {"date_from": _parse_date(date_from), "date_to": _parse_date(date_to)}
rendered_sql = text(
"""
SELECT COUNT(DISTINCT ol.product_id)
FROM order_lines ol
JOIN orders o ON o.id = ol.order_id
WHERE ol.render_status = 'completed'
AND o.created_at >= CAST(:date_from AS date)
AND o.created_at < CAST(:date_to AS date) + INTERVAL '1 day'
"""
)
rendered = (await db.execute(rendered_sql, params)).scalar() or 0
totals_sql = text(
"""
SELECT COUNT(*) AS total, COUNT(cad_file_id) AS with_cad
FROM products
"""
)
totals_row = (await db.execute(totals_sql)).fetchone()
total_products = int(totals_row[0]) if totals_row else 0
products_with_cad = int(totals_row[1]) if totals_row else 0
cat_sql = text(
"""
SELECT COALESCE(category_key, 'unknown') AS category, COUNT(*) AS cnt
FROM products
GROUP BY category_key
ORDER BY cnt DESC
"""
)
cat_rows = (await db.execute(cat_sql)).fetchall()
return {
"unique_products_rendered": int(rendered),
"total_products": total_products,
"products_with_cad": products_with_cad,
"products_by_category": [
{"category": r[0], "count": int(r[1])} for r in cat_rows
],
}
async def output_type_usage(
db: AsyncSession, date_from: str, date_to: str,
) -> list[dict]:
"""Order lines grouped by output type name."""
sql = text(
"""
SELECT ot.name AS output_type, COUNT(*) AS cnt
FROM order_lines ol
JOIN output_types ot ON ot.id = ol.output_type_id
JOIN orders o ON o.id = ol.order_id
WHERE o.created_at >= CAST(:date_from AS date)
AND o.created_at < CAST(:date_to AS date) + INTERVAL '1 day'
GROUP BY ot.name
ORDER BY cnt DESC
"""
)
rows = (await db.execute(sql, {"date_from": _parse_date(date_from), "date_to": _parse_date(date_to)})).fetchall()
return [{"output_type": r[0], "count": int(r[1])} for r in rows]
async def render_status_distribution(
db: AsyncSession, date_from: str, date_to: str,
) -> tuple[dict, list[dict]]:
"""(a) order_lines by render_status, (b) cad_files by renderer from render_log."""
params = {"date_from": _parse_date(date_from), "date_to": _parse_date(date_to)}
status_sql = text(
"""
SELECT ol.render_status, COUNT(*) AS cnt
FROM order_lines ol
JOIN orders o ON o.id = ol.order_id
WHERE o.created_at >= CAST(:date_from AS date)
AND o.created_at < CAST(:date_to AS date) + INTERVAL '1 day'
GROUP BY ol.render_status
"""
)
status_rows = (await db.execute(status_sql, params)).fetchall()
status_map: dict = {"pending": 0, "processing": 0, "completed": 0, "failed": 0}
for row in status_rows:
key = str(row[0])
status_map[key] = int(row[1])
renderer_sql = text(
"""
SELECT render_log->>'renderer' AS renderer, COUNT(*) AS cnt
FROM cad_files
WHERE render_log IS NOT NULL
AND render_log->>'renderer' IS NOT NULL
AND created_at >= CAST(:date_from AS date)
AND created_at < CAST(:date_to AS date) + INTERVAL '1 day'
GROUP BY render_log->>'renderer'
ORDER BY cnt DESC
"""
)
renderer_rows = (await db.execute(renderer_sql, params)).fetchall()
renderer_usage = [{"renderer": r[0], "count": int(r[1])} for r in renderer_rows]
return status_map, renderer_usage
async def top_products(
db: AsyncSession, date_from: str, date_to: str,
limit: int = 10,
) -> list[dict]:
"""Top N most-ordered products by order line count."""
sql = text(
"""
SELECT p.pim_id, p.name AS product_name,
COALESCE(p.category_key, 'unknown') AS category,
COUNT(*) AS order_count
FROM order_lines ol
JOIN products p ON p.id = ol.product_id
JOIN orders o ON o.id = ol.order_id
WHERE o.created_at >= CAST(:date_from AS date)
AND o.created_at < CAST(:date_to AS date) + INTERVAL '1 day'
GROUP BY p.id, p.pim_id, p.name, p.category_key
ORDER BY order_count DESC
LIMIT :lim
"""
)
rows = (await db.execute(sql, {
"date_from": _parse_date(date_from),
"date_to": _parse_date(date_to),
"lim": limit,
})).fetchall()
return [
{"pim_id": r[0], "product_name": r[1], "category": r[2], "order_count": int(r[3])}
for r in rows
]
async def category_revenue(
db: AsyncSession, date_from: str, date_to: str,
) -> list[dict]:
"""Proportional revenue by category: order price / line count, summed per category."""
sql = text(
"""
WITH order_share AS (
SELECT o.id AS order_id,
COALESCE(o.estimated_price, 0) / GREATEST(COUNT(ol.id), 1) AS per_line_price,
COALESCE(p.category_key, 'unknown') AS category
FROM orders o
JOIN order_lines ol ON ol.order_id = o.id
JOIN products p ON p.id = ol.product_id
WHERE o.status = 'completed'
AND o.completed_at >= CAST(:date_from AS date)
AND o.completed_at < CAST(:date_to AS date) + INTERVAL '1 day'
GROUP BY o.id, o.estimated_price, p.category_key
)
SELECT category,
COUNT(DISTINCT order_id) AS order_count,
COALESCE(SUM(per_line_price), 0)::FLOAT AS revenue
FROM order_share
GROUP BY category
ORDER BY revenue DESC
"""
)
rows = (await db.execute(sql, {"date_from": _parse_date(date_from), "date_to": _parse_date(date_to)})).fetchall()
return [{"category": r[0], "order_count": int(r[1]), "revenue": r[2]} for r in rows]
async def render_backend_stats(
db: AsyncSession, date_from: str, date_to: str,
) -> list[dict]:
"""Render time + count by backend (celery vs flamenco)."""
sql = text(
"""
SELECT
COALESCE(render_backend_used, 'unknown') AS backend,
COUNT(*) AS total,
COUNT(*) FILTER (WHERE render_status = 'completed') AS completed,
COUNT(*) FILTER (WHERE render_status = 'failed') AS failed,
EXTRACT(EPOCH FROM AVG(
render_completed_at - render_started_at
) FILTER (WHERE render_status = 'completed'))::FLOAT AS avg_render_s,
EXTRACT(EPOCH FROM PERCENTILE_CONT(0.5) WITHIN GROUP (
ORDER BY render_completed_at - render_started_at
) FILTER (WHERE render_status = 'completed'))::FLOAT AS p50_render_s
FROM order_lines ol
JOIN orders o ON o.id = ol.order_id
WHERE render_backend_used IS NOT NULL
AND o.created_at >= CAST(:date_from AS date)
AND o.created_at < CAST(:date_to AS date) + INTERVAL '1 day'
GROUP BY render_backend_used
ORDER BY total DESC
"""
)
rows = (await db.execute(sql, {"date_from": _parse_date(date_from), "date_to": _parse_date(date_to)})).fetchall()
return [
{
"backend": r[0],
"total": int(r[1]),
"completed": int(r[2]),
"failed": int(r[3]),
"avg_render_s": r[4],
"p50_render_s": r[5],
}
for r in rows
]
async def orders_by_user(
db: AsyncSession, date_from: str, date_to: str,
) -> list[dict]:
"""Orders grouped by user with counts and revenue."""
sql = text(
"""
SELECT u.full_name, u.email, u.role,
COUNT(*) AS order_count,
COALESCE(SUM(o.estimated_price) FILTER (WHERE o.status = 'completed'), 0)::FLOAT AS revenue
FROM orders o
JOIN users u ON u.id = o.created_by
WHERE o.created_at >= CAST(:date_from AS date)
AND o.created_at < CAST(:date_to AS date) + INTERVAL '1 day'
GROUP BY u.id, u.full_name, u.email, u.role
ORDER BY order_count DESC
"""
)
rows = (await db.execute(sql, {"date_from": _parse_date(date_from), "date_to": _parse_date(date_to)})).fetchall()
return [
{"full_name": r[0], "email": r[1], "role": r[2], "order_count": int(r[3]), "revenue": r[4]}
for r in rows
]