"""MediaAsset service.""" import uuid from sqlalchemy import select, update as sql_update from sqlalchemy.ext.asyncio import AsyncSession from app.domains.media.models import MediaAsset, MediaAssetType _SORT_COLUMNS = { "created_at": MediaAsset.created_at, "file_size_bytes": MediaAsset.file_size_bytes, "storage_key": MediaAsset.storage_key, } async def list_media_assets( db: AsyncSession, product_id: uuid.UUID | None = None, order_line_id: uuid.UUID | None = None, cad_file_id: uuid.UUID | None = None, asset_type: MediaAssetType | None = None, asset_types: list[MediaAssetType] | None = None, is_archived: bool | None = False, skip: int = 0, limit: int = 50, sort_by: str = "created_at", sort_dir: str = "desc", ) -> list[MediaAsset]: from sqlalchemy import asc, desc col = _SORT_COLUMNS.get(sort_by, MediaAsset.created_at) order = desc(col) if sort_dir == "desc" else asc(col) q = select(MediaAsset).order_by(order) if product_id: q = q.where(MediaAsset.product_id == product_id) if order_line_id: q = q.where(MediaAsset.order_line_id == order_line_id) if cad_file_id: q = q.where(MediaAsset.cad_file_id == cad_file_id) if asset_types: q = q.where(MediaAsset.asset_type.in_(asset_types)) elif asset_type is not None: q = q.where(MediaAsset.asset_type == asset_type) if is_archived is not None: q = q.where(MediaAsset.is_archived == is_archived) q = q.offset(skip).limit(limit) result = await db.execute(q) return list(result.scalars().all()) async def get_media_asset(db: AsyncSession, asset_id: uuid.UUID) -> MediaAsset | None: result = await db.execute(select(MediaAsset).where(MediaAsset.id == asset_id)) return result.scalar_one_or_none() async def create_media_asset(db: AsyncSession, **kwargs) -> MediaAsset: asset = MediaAsset(**kwargs) db.add(asset) await db.commit() await db.refresh(asset) return asset async def archive_media_asset(db: AsyncSession, asset_id: uuid.UUID) -> MediaAsset | None: await db.execute( sql_update(MediaAsset).where(MediaAsset.id == asset_id).values(is_archived=True) ) await db.commit() return await get_media_asset(db, asset_id) async def delete_media_asset(db: AsyncSession, asset_id: uuid.UUID) -> bool: asset = await get_media_asset(db, asset_id) if not asset: return False await db.delete(asset) await db.commit() return True def get_download_url(asset: MediaAsset) -> str | None: """Return a backend proxy URL so the browser can always download the file.""" return f"/api/media/{asset.id}/download" def get_thumbnail_url(asset: MediaAsset) -> str | None: """Return CAD thumbnail URL if asset has a cad_file_id.""" if asset.cad_file_id: return f"/api/cad/{asset.cad_file_id}/thumbnail" return None