"""Add tenant_id FK to all tables + enable Row Level Security. Revision ID: 036 Revises: 035 Create Date: 2026-03-06 """ import sqlalchemy as sa from alembic import op from sqlalchemy.dialects.postgresql import UUID revision = '036' down_revision = '035' branch_labels = None depends_on = None # Tables that receive tenant_id + RLS. # product_variants was removed in migration 027, so we check existence before acting. TENANT_TABLES = [ "users", "orders", "order_items", "order_lines", "products", "cad_files", "materials", "material_aliases", "render_templates", "output_types", "pricing_tiers", "audit_log", "templates", "product_variants", # dropped in 027 — handled with existence check ] def _table_exists(table_name: str) -> bool: """Check if a table exists in the public schema.""" conn = op.get_bind() result = conn.execute( sa.text( "SELECT 1 FROM information_schema.tables " "WHERE table_schema = 'public' AND table_name = :t" ), {"t": table_name}, ) return result.fetchone() is not None def upgrade(): # Grant BYPASSRLS to the DB user if possible (superuser op — ignore if insufficient privilege) op.execute(""" DO $$ BEGIN EXECUTE 'ALTER ROLE ' || current_user || ' BYPASSRLS'; EXCEPTION WHEN insufficient_privilege THEN RAISE NOTICE 'Could not set BYPASSRLS — run manually as superuser if needed'; END; $$; """) for table in TENANT_TABLES: if not _table_exists(table): continue # 1. Add nullable tenant_id column op.add_column( table, sa.Column( "tenant_id", UUID(as_uuid=True), sa.ForeignKey("tenants.id"), nullable=True, index=True, ), ) # 2. Backfill with the default 'schaeffler' tenant op.execute( f"UPDATE {table} " "SET tenant_id = (SELECT id FROM tenants WHERE slug = 'schaeffler')" ) # 3. Make NOT NULL now that every row has a value op.alter_column(table, "tenant_id", nullable=False) # 4. Enable Row Level Security op.execute(f"ALTER TABLE {table} ENABLE ROW LEVEL SECURITY") # 5. Main isolation policy: tenant_id must match current session tenant op.execute(f""" CREATE POLICY tenant_isolation ON {table} USING ( tenant_id = current_setting('app.current_tenant_id', true)::uuid ) """) # 6. Admin bypass policy: allows queries when setting is 'bypass' op.execute(f""" CREATE POLICY admin_bypass ON {table} USING ( current_setting('app.current_tenant_id', true) = 'bypass' ) """) def downgrade(): # Grant BYPASSRLS so the downgrade itself can see all rows op.execute(""" DO $$ BEGIN EXECUTE 'ALTER ROLE ' || current_user || ' BYPASSRLS'; EXCEPTION WHEN insufficient_privilege THEN RAISE NOTICE 'Could not set BYPASSRLS'; END; $$; """) for table in reversed(TENANT_TABLES): if not _table_exists(table): continue op.execute(f"DROP POLICY IF EXISTS admin_bypass ON {table}") op.execute(f"DROP POLICY IF EXISTS tenant_isolation ON {table}") op.execute(f"ALTER TABLE {table} DISABLE ROW LEVEL SECURITY") op.drop_column(table, "tenant_id")