"""KPI / analytics query functions. All functions return plain dicts or lists of dicts. Uses text() for raw SQL to avoid ORM lazy-loading surprises. Every function accepts date_from / date_to ISO-date strings to scope metrics. """ from datetime import date as _date from sqlalchemy import text from sqlalchemy.ext.asyncio import AsyncSession def _parse_date(s: str) -> _date: """Convert ISO date string to datetime.date for asyncpg compatibility.""" return _date.fromisoformat(s) async def order_throughput_by_week( db: AsyncSession, date_from: str, date_to: str, ) -> list[dict]: """Weekly order creation + completion counts within the date range.""" sql = text( """ SELECT TO_CHAR(DATE_TRUNC('week', created_at), 'IYYY-"W"IW') AS week, COUNT(*) AS count, COUNT(*) FILTER (WHERE status = 'completed') AS completed FROM orders WHERE created_at >= CAST(:date_from AS date) AND created_at < CAST(:date_to AS date) + INTERVAL '1 day' GROUP BY DATE_TRUNC('week', created_at) ORDER BY DATE_TRUNC('week', created_at) """ ) result = await db.execute(sql, {"date_from": _parse_date(date_from), "date_to": _parse_date(date_to)}) rows = result.fetchall() return [{"week": r[0], "count": r[1], "completed": r[2]} for r in rows] async def processing_time_stats( db: AsyncSession, date_from: str, date_to: str, ) -> dict: """Average and percentile processing times for completed orders in range.""" sql = text( """ SELECT EXTRACT(EPOCH FROM AVG(completed_at - submitted_at))::FLOAT AS avg_submit_to_complete_s, EXTRACT(EPOCH FROM AVG(processing_started_at - submitted_at))::FLOAT AS avg_submit_to_processing_s, EXTRACT(EPOCH FROM PERCENTILE_CONT(0.5) WITHIN GROUP ( ORDER BY completed_at - submitted_at ))::FLOAT AS p50_s, EXTRACT(EPOCH FROM PERCENTILE_CONT(0.95) WITHIN GROUP ( ORDER BY completed_at - submitted_at ))::FLOAT AS p95_s FROM orders WHERE status = 'completed' AND submitted_at IS NOT NULL AND completed_at IS NOT NULL AND submitted_at >= CAST(:date_from AS date) AND submitted_at < CAST(:date_to AS date) + INTERVAL '1 day' """ ) result = await db.execute(sql, {"date_from": _parse_date(date_from), "date_to": _parse_date(date_to)}) row = result.fetchone() if row is None: return { "avg_submit_to_complete_s": None, "avg_submit_to_processing_s": None, "p50_s": None, "p95_s": None, } return { "avg_submit_to_complete_s": row[0], "avg_submit_to_processing_s": row[1], "p50_s": row[2], "p95_s": row[3], } async def revenue_overview( db: AsyncSession, date_from: str, date_to: str, ) -> list[dict]: """Monthly revenue (sum of estimated_price for completed orders) in range.""" sql = text( """ SELECT TO_CHAR(DATE_TRUNC('month', completed_at), 'YYYY-MM') AS month, COALESCE(SUM(estimated_price), 0)::FLOAT AS revenue, COUNT(*) AS order_count FROM orders WHERE status = 'completed' AND completed_at >= CAST(:date_from AS date) AND completed_at < CAST(:date_to AS date) + INTERVAL '1 day' GROUP BY DATE_TRUNC('month', completed_at) ORDER BY DATE_TRUNC('month', completed_at) """ ) result = await db.execute(sql, {"date_from": _parse_date(date_from), "date_to": _parse_date(date_to)}) rows = result.fetchall() return [{"month": r[0], "revenue": r[1], "order_count": r[2]} for r in rows] async def item_status_breakdown( db: AsyncSession, date_from: str, date_to: str, ) -> dict: """Count of order lines grouped by item_status, scoped to orders in range.""" sql = text( """ SELECT ol.item_status, COUNT(*) AS cnt FROM order_lines ol JOIN orders o ON o.id = ol.order_id WHERE o.created_at >= CAST(:date_from AS date) AND o.created_at < CAST(:date_to AS date) + INTERVAL '1 day' GROUP BY ol.item_status """ ) result = await db.execute(sql, {"date_from": _parse_date(date_from), "date_to": _parse_date(date_to)}) rows = result.fetchall() out: dict = {"pending": 0, "approved": 0, "rejected": 0} for row in rows: key = str(row[0]) out[key] = int(row[1]) return out async def render_time_breakdown( db: AsyncSession, date_from: str, date_to: str, ) -> dict: """Average render duration from completed order lines, scoped to date range. Uses render_started_at / render_completed_at on order_lines (added in migration 015). avg_stl_s is not tracked at order-line level, so only avg_render_s and sample_count are meaningful here; avg_stl_s is left None for UI compatibility. """ sql = text( """ SELECT AVG(EXTRACT(EPOCH FROM (render_completed_at - render_started_at))) AS avg_render_s, COUNT(*) AS sample_count FROM order_lines WHERE render_status = 'completed' AND render_started_at IS NOT NULL AND render_completed_at IS NOT NULL AND render_completed_at >= CAST(:date_from AS date) AND render_completed_at < CAST(:date_to AS date) + INTERVAL '1 day' """ ) result = await db.execute(sql, {"date_from": _parse_date(date_from), "date_to": _parse_date(date_to)}) row = result.fetchone() if row is None or row[1] == 0: return {"avg_stl_s": None, "avg_render_s": None, "avg_total_s": None, "sample_count": 0} return { "avg_stl_s": None, "avg_render_s": float(row[0]) if row[0] is not None else None, "avg_total_s": float(row[0]) if row[0] is not None else None, "sample_count": int(row[1]), } async def render_time_by_output_type( db: AsyncSession, date_from: str, date_to: str, ) -> list[dict]: """Render time statistics per output type for completed order lines.""" sql = text( """ SELECT COALESCE(ot.name, 'Unknown') AS output_type, COUNT(*) AS job_count, AVG(EXTRACT(EPOCH FROM (ol.render_completed_at - ol.render_started_at))) AS avg_render_s, MIN(EXTRACT(EPOCH FROM (ol.render_completed_at - ol.render_started_at))) AS min_render_s, MAX(EXTRACT(EPOCH FROM (ol.render_completed_at - ol.render_started_at))) AS max_render_s, PERCENTILE_CONT(0.5) WITHIN GROUP ( ORDER BY EXTRACT(EPOCH FROM (ol.render_completed_at - ol.render_started_at)) ) AS p50_render_s FROM order_lines ol LEFT JOIN output_types ot ON ot.id = ol.output_type_id WHERE ol.render_status = 'completed' AND ol.render_started_at IS NOT NULL AND ol.render_completed_at IS NOT NULL AND ol.render_completed_at >= CAST(:date_from AS date) AND ol.render_completed_at < CAST(:date_to AS date) + INTERVAL '1 day' GROUP BY ot.id, ot.name ORDER BY avg_render_s DESC NULLS LAST """ ) result = await db.execute(sql, {"date_from": _parse_date(date_from), "date_to": _parse_date(date_to)}) return [ { "output_type": r[0], "job_count": int(r[1]), "avg_render_s": float(r[2]) if r[2] is not None else None, "min_render_s": float(r[3]) if r[3] is not None else None, "max_render_s": float(r[4]) if r[4] is not None else None, "p50_render_s": float(r[5]) if r[5] is not None else None, } for r in result.fetchall() ] async def top_level_summary( db: AsyncSession, date_from: str, date_to: str, ) -> dict: """High-level summary counts and totals within the date range.""" sql = text( """ SELECT COUNT(*) AS total_orders, COUNT(*) FILTER (WHERE status = 'completed') AS completed_orders, COALESCE(SUM(estimated_price) FILTER (WHERE status = 'completed'), 0)::FLOAT AS total_revenue FROM orders WHERE created_at >= CAST(:date_from AS date) AND created_at < CAST(:date_to AS date) + INTERVAL '1 day' """ ) result = await db.execute(sql, {"date_from": _parse_date(date_from), "date_to": _parse_date(date_to)}) row = result.fetchone() items_sql = text( """ SELECT COUNT(*) FROM order_lines ol JOIN orders o ON o.id = ol.order_id WHERE ol.output_type_id IS NOT NULL AND o.created_at >= CAST(:date_from AS date) AND o.created_at < CAST(:date_to AS date) + INTERVAL '1 day' """ ) items_result = await db.execute(items_sql, {"date_from": _parse_date(date_from), "date_to": _parse_date(date_to)}) items_count = items_result.scalar() or 0 return { "total_orders": int(row[0]) if row else 0, "completed_orders": int(row[1]) if row else 0, "total_revenue": float(row[2]) if row else 0.0, "total_rendering_items": int(items_count), } async def product_and_category_stats( db: AsyncSession, date_from: str, date_to: str, ) -> dict: """Product-level stats: unique rendered, total products, CAD coverage, by category.""" params = {"date_from": _parse_date(date_from), "date_to": _parse_date(date_to)} rendered_sql = text( """ SELECT COUNT(DISTINCT ol.product_id) FROM order_lines ol JOIN orders o ON o.id = ol.order_id WHERE ol.render_status = 'completed' AND o.created_at >= CAST(:date_from AS date) AND o.created_at < CAST(:date_to AS date) + INTERVAL '1 day' """ ) rendered = (await db.execute(rendered_sql, params)).scalar() or 0 totals_sql = text( """ SELECT COUNT(*) AS total, COUNT(cad_file_id) AS with_cad FROM products """ ) totals_row = (await db.execute(totals_sql)).fetchone() total_products = int(totals_row[0]) if totals_row else 0 products_with_cad = int(totals_row[1]) if totals_row else 0 cat_sql = text( """ SELECT COALESCE(category_key, 'unknown') AS category, COUNT(*) AS cnt FROM products GROUP BY category_key ORDER BY cnt DESC """ ) cat_rows = (await db.execute(cat_sql)).fetchall() return { "unique_products_rendered": int(rendered), "total_products": total_products, "products_with_cad": products_with_cad, "products_by_category": [ {"category": r[0], "count": int(r[1])} for r in cat_rows ], } async def output_type_usage( db: AsyncSession, date_from: str, date_to: str, ) -> list[dict]: """Order lines grouped by output type name.""" sql = text( """ SELECT ot.name AS output_type, COUNT(*) AS cnt FROM order_lines ol JOIN output_types ot ON ot.id = ol.output_type_id JOIN orders o ON o.id = ol.order_id WHERE o.created_at >= CAST(:date_from AS date) AND o.created_at < CAST(:date_to AS date) + INTERVAL '1 day' GROUP BY ot.name ORDER BY cnt DESC """ ) rows = (await db.execute(sql, {"date_from": _parse_date(date_from), "date_to": _parse_date(date_to)})).fetchall() return [{"output_type": r[0], "count": int(r[1])} for r in rows] async def render_status_distribution( db: AsyncSession, date_from: str, date_to: str, ) -> tuple[dict, list[dict]]: """(a) order_lines by render_status, (b) cad_files by renderer from render_log.""" params = {"date_from": _parse_date(date_from), "date_to": _parse_date(date_to)} status_sql = text( """ SELECT ol.render_status, COUNT(*) AS cnt FROM order_lines ol JOIN orders o ON o.id = ol.order_id WHERE o.created_at >= CAST(:date_from AS date) AND o.created_at < CAST(:date_to AS date) + INTERVAL '1 day' GROUP BY ol.render_status """ ) status_rows = (await db.execute(status_sql, params)).fetchall() status_map: dict = {"pending": 0, "processing": 0, "completed": 0, "failed": 0} for row in status_rows: key = str(row[0]) status_map[key] = int(row[1]) renderer_sql = text( """ SELECT render_log->>'renderer' AS renderer, COUNT(*) AS cnt FROM cad_files WHERE render_log IS NOT NULL AND render_log->>'renderer' IS NOT NULL AND created_at >= CAST(:date_from AS date) AND created_at < CAST(:date_to AS date) + INTERVAL '1 day' GROUP BY render_log->>'renderer' ORDER BY cnt DESC """ ) renderer_rows = (await db.execute(renderer_sql, params)).fetchall() renderer_usage = [{"renderer": r[0], "count": int(r[1])} for r in renderer_rows] return status_map, renderer_usage async def top_products( db: AsyncSession, date_from: str, date_to: str, limit: int = 10, ) -> list[dict]: """Top N most-ordered products by order line count.""" sql = text( """ SELECT p.pim_id, p.name AS product_name, COALESCE(p.category_key, 'unknown') AS category, COUNT(*) AS order_count FROM order_lines ol JOIN products p ON p.id = ol.product_id JOIN orders o ON o.id = ol.order_id WHERE o.created_at >= CAST(:date_from AS date) AND o.created_at < CAST(:date_to AS date) + INTERVAL '1 day' GROUP BY p.id, p.pim_id, p.name, p.category_key ORDER BY order_count DESC LIMIT :lim """ ) rows = (await db.execute(sql, { "date_from": _parse_date(date_from), "date_to": _parse_date(date_to), "lim": limit, })).fetchall() return [ {"pim_id": r[0], "product_name": r[1], "category": r[2], "order_count": int(r[3])} for r in rows ] async def category_revenue( db: AsyncSession, date_from: str, date_to: str, ) -> list[dict]: """Proportional revenue by category: order price / line count, summed per category.""" sql = text( """ WITH order_share AS ( SELECT o.id AS order_id, COALESCE(o.estimated_price, 0) / GREATEST(COUNT(ol.id), 1) AS per_line_price, COALESCE(p.category_key, 'unknown') AS category FROM orders o JOIN order_lines ol ON ol.order_id = o.id JOIN products p ON p.id = ol.product_id WHERE o.status = 'completed' AND o.completed_at >= CAST(:date_from AS date) AND o.completed_at < CAST(:date_to AS date) + INTERVAL '1 day' GROUP BY o.id, o.estimated_price, p.category_key ) SELECT category, COUNT(DISTINCT order_id) AS order_count, COALESCE(SUM(per_line_price), 0)::FLOAT AS revenue FROM order_share GROUP BY category ORDER BY revenue DESC """ ) rows = (await db.execute(sql, {"date_from": _parse_date(date_from), "date_to": _parse_date(date_to)})).fetchall() return [{"category": r[0], "order_count": int(r[1]), "revenue": r[2]} for r in rows] async def render_backend_stats( db: AsyncSession, date_from: str, date_to: str, ) -> list[dict]: """Render time + count by backend (celery vs flamenco).""" sql = text( """ SELECT COALESCE(render_backend_used, 'unknown') AS backend, COUNT(*) AS total, COUNT(*) FILTER (WHERE render_status = 'completed') AS completed, COUNT(*) FILTER (WHERE render_status = 'failed') AS failed, EXTRACT(EPOCH FROM AVG( render_completed_at - render_started_at ) FILTER (WHERE render_status = 'completed'))::FLOAT AS avg_render_s, EXTRACT(EPOCH FROM PERCENTILE_CONT(0.5) WITHIN GROUP ( ORDER BY render_completed_at - render_started_at ) FILTER (WHERE render_status = 'completed'))::FLOAT AS p50_render_s FROM order_lines ol JOIN orders o ON o.id = ol.order_id WHERE render_backend_used IS NOT NULL AND o.created_at >= CAST(:date_from AS date) AND o.created_at < CAST(:date_to AS date) + INTERVAL '1 day' GROUP BY render_backend_used ORDER BY total DESC """ ) rows = (await db.execute(sql, {"date_from": _parse_date(date_from), "date_to": _parse_date(date_to)})).fetchall() return [ { "backend": r[0], "total": int(r[1]), "completed": int(r[2]), "failed": int(r[3]), "avg_render_s": r[4], "p50_render_s": r[5], } for r in rows ] async def orders_by_user( db: AsyncSession, date_from: str, date_to: str, ) -> list[dict]: """Orders grouped by user with counts and revenue.""" sql = text( """ SELECT u.full_name, u.email, u.role, COUNT(*) AS order_count, COALESCE(SUM(o.estimated_price) FILTER (WHERE o.status = 'completed'), 0)::FLOAT AS revenue FROM orders o JOIN users u ON u.id = o.created_by WHERE o.created_at >= CAST(:date_from AS date) AND o.created_at < CAST(:date_to AS date) + INTERVAL '1 day' GROUP BY u.id, u.full_name, u.email, u.role ORDER BY order_count DESC """ ) rows = (await db.execute(sql, {"date_from": _parse_date(date_from), "date_to": _parse_date(date_to)})).fetchall() return [ {"full_name": r[0], "email": r[1], "role": r[2], "order_count": int(r[3]), "revenue": r[4]} for r in rows ]