import hashlib import uuid from pathlib import Path from fastapi import APIRouter, Depends, File, HTTPException, UploadFile, status from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy import select from pydantic import BaseModel from app.config import settings from app.database import get_db from app.models.cad_file import CadFile, ProcessingStatus from app.models.order import Order from app.models.order_item import OrderItem from app.models.order_line import OrderLine from app.models.user import User from app.schemas.upload import ParsedExcelResponse, ParsedRow, ParsedComponent, StepUploadResponse from app.schemas.order import OrderDetailOut from app.services.excel_parser import parse_excel, parsed_excel_to_dict from app.services.excel_import import import_excel_to_products, preview_excel_rows from app.services.order_service import generate_order_number from app.utils.auth import get_current_user router = APIRouter(prefix="/uploads", tags=["uploads"]) # ── Preview response models ──────────────────────────────────────────── class ExcelPreviewRow(BaseModel): row_index: int pim_id: str | None = None produkt_baureihe: str | None = None gewaehltes_produkt: str | None = None product_exists: bool = False product_id: str | None = None medias_rendering: bool | None = None category_key: str | None = None has_step: bool = False is_duplicate: bool = False duplicate_of_row: int | None = None class ExcelPreviewResponse(BaseModel): excel_path: str filename: str category_key: str | None row_count: int existing_product_count: int new_product_count: int no_pim_id_count: int has_step_count: int = 0 no_step_count: int = 0 duplicate_count: int = 0 warnings: list[str] rows: list[ExcelPreviewRow] column_headers: list[str] = [] template_name: str | None = None validation_id: str | None = None # ── Finalize request models ──────────────────────────────────────────── class OutputTypeSelection(BaseModel): row_index: int output_type_ids: list[uuid.UUID] class ExcelFinalizeRequest(BaseModel): excel_path: str included_row_indices: list[int] output_type_selections: list[OutputTypeSelection] = [] notes: str | None = None template_id: uuid.UUID | None = None ALLOWED_EXCEL = {".xlsx", ".xls"} ALLOWED_STEP = {".stp", ".step"} @router.post("/excel", response_model=ExcelPreviewResponse) async def upload_excel( file: UploadFile = File(...), user: User = Depends(get_current_user), db: AsyncSession = Depends(get_db), ): """Parse Excel and return a read-only preview. No products are created.""" suffix = Path(file.filename or "").suffix.lower() if suffix not in ALLOWED_EXCEL: raise HTTPException(400, detail="Only .xlsx / .xls files are accepted") # Save the file upload_dir = Path(settings.upload_dir) / "excel_files" upload_dir.mkdir(parents=True, exist_ok=True) tmp_name = f"{uuid.uuid4()}{suffix}" tmp_path = upload_dir / tmp_name content = await file.read() tmp_path.write_bytes(content) try: parsed = parse_excel(tmp_path) except ValueError as exc: tmp_path.unlink(missing_ok=True) raise HTTPException(422, detail=str(exc)) parsed_dict = parsed_excel_to_dict(parsed) parsed_dict["filename"] = file.filename parsed_dict["excel_path"] = str(tmp_path) rows = parsed_dict.get("rows", []) try: preview = await preview_excel_rows( db, rows, category_key=parsed_dict.get("category_key"), ) except Exception as exc: try: from app.services.notification_service import emit_notification await emit_notification( db, actor_user_id=user.id, target_user_id=user.id, action="excel.import_error", entity_type="upload", entity_id=None, details={ "filename": file.filename or "", "error": str(exc)[:500], }, ) except Exception: pass raise HTTPException(500, detail=f"Preview failed: {str(exc)[:300]}") annotated_rows = [ ExcelPreviewRow( row_index=r.get("row_index", 0), pim_id=r.get("pim_id"), produkt_baureihe=r.get("produkt_baureihe"), gewaehltes_produkt=r.get("gewaehltes_produkt"), product_exists=r.get("product_exists", False), product_id=r.get("product_id"), medias_rendering=r.get("medias_rendering"), category_key=r.get("category_key"), has_step=r.get("has_step", False), is_duplicate=r.get("is_duplicate", False), duplicate_of_row=r.get("duplicate_of_row"), ) for r in preview.rows ] all_warnings = preview.warnings + parsed_dict.get("warnings", []) if all_warnings: from app.services.notification_service import emit_notification await emit_notification( db, actor_user_id=user.id, target_user_id=user.id, action="excel.import_warnings", entity_type="upload", entity_id=None, details={ "filename": file.filename or "", "warning_count": len(all_warnings), "warnings": all_warnings[:10], }, ) # Queue sanity-check validation task validation_id: str | None = None try: from app.domains.imports.models import ImportValidation val = ImportValidation( excel_path=str(tmp_path), tenant_id=getattr(user, "tenant_id", None), ) db.add(val) await db.commit() await db.refresh(val) validation_id = str(val.id) from app.domains.imports.tasks import validate_excel_import validate_excel_import.delay(validation_id, str(tmp_path), str(getattr(user, "tenant_id", "") or "")) except Exception as exc: pass # validation is non-critical return ExcelPreviewResponse( excel_path=str(tmp_path), filename=file.filename or "", category_key=parsed_dict.get("category_key"), row_count=parsed_dict.get("row_count", len(rows)), existing_product_count=preview.existing_product_count, new_product_count=preview.new_product_count, no_pim_id_count=preview.no_pim_id_count, has_step_count=preview.has_step_count, no_step_count=preview.no_step_count, duplicate_count=preview.duplicate_count, warnings=all_warnings, rows=annotated_rows, column_headers=parsed_dict.get("column_headers", []), template_name=parsed_dict.get("template_name"), validation_id=validation_id, ) @router.post("/excel/finalize", response_model=OrderDetailOut, status_code=status.HTTP_201_CREATED) async def finalize_excel( body: ExcelFinalizeRequest, user: User = Depends(get_current_user), db: AsyncSession = Depends(get_db), ): """Create products + order + lines from a previously parsed Excel file. This is the second step: the user has reviewed the preview and confirmed which rows to include and which output types to request. """ # 1. Validate Excel file still exists excel_path = Path(body.excel_path) if not excel_path.is_file(): raise HTTPException(404, detail="Excel file not found — please re-upload") # 2. Re-parse the Excel try: parsed = parse_excel(excel_path) except ValueError as exc: raise HTTPException(422, detail=str(exc)) parsed_dict = parsed_excel_to_dict(parsed) all_rows = parsed_dict.get("rows", []) # 3. Filter to included rows included_set = set(body.included_row_indices) included_rows = [r for r in all_rows if r.get("row_index") in included_set] if not included_rows: raise HTTPException(400, detail="No rows selected") # 4. Import into product library (creates/updates products) import_result = await import_excel_to_products( db, included_rows, source_excel=str(excel_path), category_key=parsed_dict.get("category_key"), tenant_id=getattr(user, 'tenant_id', None), ) # 5. Seed material aliases material_mappings = parsed_dict.get("material_mappings", []) if material_mappings: try: from app.services.material_service import seed_material_aliases_from_mappings await seed_material_aliases_from_mappings(db, material_mappings) except Exception: pass # non-critical # 6. Create Order order_number = await generate_order_number(db) order = Order( order_number=order_number, template_id=body.template_id, created_by=user.id, source_excel=str(excel_path), notes=body.notes, tenant_id=getattr(user, 'tenant_id', None), ) db.add(order) await db.flush() # 7. Create OrderItems (legacy compat — one per included row) for row in import_result.rows: # If the matched product already has a STEP file linked (from a # previous order or direct product-library upload), inherit it so the # submit validation passes without requiring a re-upload. inherited_cad = ( uuid.UUID(row["product_cad_file_id"]) if row.get("product_cad_file_id") else None ) item = OrderItem( order_id=order.id, row_index=row.get("row_index", 0), ebene1=row.get("ebene1"), ebene2=row.get("ebene2"), baureihe=row.get("baureihe"), pim_id=row.get("pim_id"), produkt_baureihe=row.get("produkt_baureihe"), gewaehltes_produkt=row.get("gewaehltes_produkt"), name_cad_modell=row.get("name_cad_modell"), gewuenschte_bildnummer=row.get("gewuenschte_bildnummer"), lagertyp=row.get("lagertyp"), medias_rendering=row.get("medias_rendering"), components=[ c if isinstance(c, dict) else c for c in row.get("components", []) ], cad_file_id=inherited_cad, tenant_id=getattr(user, 'tenant_id', None), ) db.add(item) # 8. Build output type selections lookup: row_index → list[UUID] ot_map: dict[int, list[uuid.UUID]] = {} for sel in body.output_type_selections: ot_map[sel.row_index] = sel.output_type_ids # 9. Create OrderLines for row in import_result.rows: product_id = row.get("product_id") if not product_id: continue row_idx = row.get("row_index", 0) type_ids = ot_map.get(row_idx, []) if not type_ids: # Tracking-only line (no output type) line = OrderLine( order_id=order.id, product_id=uuid.UUID(product_id), output_type_id=None, gewuenschte_bildnummer=row.get("gewuenschte_bildnummer"), tenant_id=getattr(user, 'tenant_id', None), ) db.add(line) else: for type_id in type_ids: line = OrderLine( order_id=order.id, product_id=uuid.UUID(product_id), output_type_id=type_id, gewuenschte_bildnummer=row.get("gewuenschte_bildnummer"), tenant_id=getattr(user, 'tenant_id', None), ) db.add(line) # 10. Commit, then snapshot prices into the new draft order try: await db.commit() except Exception as exc: await db.rollback() # Emit error notification via its own connection (session is now invalid) try: from app.services.notification_service import emit_notification_sync from sqlalchemy.exc import IntegrityError if isinstance(exc, IntegrityError) and "order_number" in str(exc): error_msg = "Duplicate order number — please try again" else: error_msg = str(exc)[:300] emit_notification_sync( actor_user_id=user.id, target_user_id=str(user.id), action="excel.finalize_error", entity_type="upload", entity_id=None, details={ "filename": Path(body.excel_path).name, "error": error_msg, }, ) except Exception: pass from sqlalchemy.exc import IntegrityError if isinstance(exc, IntegrityError) and "order_number" in str(exc): raise HTTPException(409, detail="Duplicate order number — please try again") raise HTTPException(500, detail=f"Order creation failed: {str(exc)[:200]}") # Snapshot prices into the draft order so the estimate is visible immediately try: from app.services.pricing_service import refresh_order_price await refresh_order_price(db, order.id) except Exception: pass # non-critical — estimate can be computed on first view # Load and return full order detail from app.api.routers.orders import _load_order_detail, _order_detail_out order_loaded = await _load_order_detail(db, order.id) return _order_detail_out(order_loaded) @router.post("/step", response_model=StepUploadResponse, status_code=status.HTTP_201_CREATED) async def upload_step( file: UploadFile = File(...), user: User = Depends(get_current_user), db: AsyncSession = Depends(get_db), ): """Upload a single STEP/STP CAD file.""" suffix = Path(file.filename or "").suffix.lower() if suffix not in ALLOWED_STEP: raise HTTPException(400, detail="Only .stp / .step files are accepted") content = await file.read() file_hash = hashlib.sha256(content).hexdigest() # Check dedup result = await db.execute(select(CadFile).where(CadFile.file_hash == file_hash)) existing = result.scalar_one_or_none() if existing: return StepUploadResponse( cad_file_id=str(existing.id), original_name=existing.original_name, file_hash=file_hash, status="already_exists", ) # Save file step_dir = Path(settings.upload_dir) / "step_files" step_dir.mkdir(parents=True, exist_ok=True) stored_name = f"{uuid.uuid4()}{suffix}" stored_path = step_dir / stored_name stored_path.write_bytes(content) cad_file = CadFile( original_name=file.filename, stored_path=str(stored_path), file_hash=file_hash, file_size=len(content), processing_status=ProcessingStatus.pending, tenant_id=getattr(user, 'tenant_id', None), ) db.add(cad_file) await db.commit() await db.refresh(cad_file) # Enqueue background processing task (Phase 3) try: from app.tasks.step_tasks import process_step_file process_step_file.delay(str(cad_file.id)) except Exception: pass # Worker not configured yet return StepUploadResponse( cad_file_id=str(cad_file.id), original_name=file.filename, file_hash=file_hash, status="uploaded", ) @router.get("/validations/{validation_id}") async def get_import_validation( validation_id: uuid.UUID, db: AsyncSession = Depends(get_db), user: User = Depends(get_current_user), ): """Poll the result of an Excel sanity-check validation.""" from app.domains.imports.models import ImportValidation from app.domains.imports.schemas import ImportValidationOut result = await db.execute(select(ImportValidation).where(ImportValidation.id == validation_id)) val = result.scalar_one_or_none() if not val: raise HTTPException(404, detail="Validation not found") return ImportValidationOut.model_validate(val) class AddAliasRequest(BaseModel): part_name: str material_name: str @router.post("/validations/{validation_id}/add-alias", status_code=status.HTTP_201_CREATED) async def add_material_alias_from_validation( validation_id: uuid.UUID, body: AddAliasRequest, db: AsyncSession = Depends(get_db), user: User = Depends(get_current_user), ): """Create a MaterialAlias entry mapping part_name to an existing material. Requires admin or project_manager role. """ from app.utils.auth import require_admin_or_pm from app.domains.imports.models import ImportValidation from app.domains.materials.models import Material, MaterialAlias # Gate to admin/PM if user.role.value not in ("admin", "project_manager"): raise HTTPException(status_code=403, detail="Admin or project_manager required") # Verify the validation exists val_result = await db.execute(select(ImportValidation).where(ImportValidation.id == validation_id)) if not val_result.scalar_one_or_none(): raise HTTPException(404, detail="Validation not found") # Find the target material by name mat_result = await db.execute(select(Material).where(Material.name == body.material_name)) material = mat_result.scalar_one_or_none() if not material: raise HTTPException(404, detail=f"Material '{body.material_name}' not found in library") # Check for duplicate alias (case-insensitive) from sqlalchemy import func as sql_func dup_result = await db.execute( select(MaterialAlias).where( sql_func.lower(MaterialAlias.alias) == body.part_name.lower() ) ) existing_alias = dup_result.scalar_one_or_none() if existing_alias: raise HTTPException(409, detail=f"Alias '{body.part_name}' already exists") alias = MaterialAlias(material_id=material.id, alias=body.part_name) db.add(alias) await db.commit() await db.refresh(alias) return {"id": str(alias.id), "alias": alias.alias, "material_id": str(material.id), "material_name": material.name}