126 lines
3.5 KiB
Python
126 lines
3.5 KiB
Python
"""Add tenant_id FK to all tables + enable Row Level Security.
|
|
|
|
Revision ID: 036
|
|
Revises: 035
|
|
Create Date: 2026-03-06
|
|
"""
|
|
import sqlalchemy as sa
|
|
from alembic import op
|
|
from sqlalchemy.dialects.postgresql import UUID
|
|
|
|
revision = '036'
|
|
down_revision = '035'
|
|
branch_labels = None
|
|
depends_on = None
|
|
|
|
# Tables that receive tenant_id + RLS.
|
|
# product_variants was removed in migration 027, so we check existence before acting.
|
|
TENANT_TABLES = [
|
|
"users",
|
|
"orders",
|
|
"order_items",
|
|
"order_lines",
|
|
"products",
|
|
"cad_files",
|
|
"materials",
|
|
"material_aliases",
|
|
"render_templates",
|
|
"output_types",
|
|
"pricing_tiers",
|
|
"audit_log",
|
|
"templates",
|
|
"product_variants", # dropped in 027 — handled with existence check
|
|
]
|
|
|
|
|
|
def _table_exists(table_name: str) -> bool:
|
|
"""Check if a table exists in the public schema."""
|
|
conn = op.get_bind()
|
|
result = conn.execute(
|
|
sa.text(
|
|
"SELECT 1 FROM information_schema.tables "
|
|
"WHERE table_schema = 'public' AND table_name = :t"
|
|
),
|
|
{"t": table_name},
|
|
)
|
|
return result.fetchone() is not None
|
|
|
|
|
|
def upgrade():
|
|
# Grant BYPASSRLS to the DB user if possible (superuser op — ignore if insufficient privilege)
|
|
op.execute("""
|
|
DO $$
|
|
BEGIN
|
|
EXECUTE 'ALTER ROLE ' || current_user || ' BYPASSRLS';
|
|
EXCEPTION WHEN insufficient_privilege THEN
|
|
RAISE NOTICE 'Could not set BYPASSRLS — run manually as superuser if needed';
|
|
END;
|
|
$$;
|
|
""")
|
|
|
|
for table in TENANT_TABLES:
|
|
if not _table_exists(table):
|
|
continue
|
|
|
|
# 1. Add nullable tenant_id column
|
|
op.add_column(
|
|
table,
|
|
sa.Column(
|
|
"tenant_id",
|
|
UUID(as_uuid=True),
|
|
sa.ForeignKey("tenants.id"),
|
|
nullable=True,
|
|
index=True,
|
|
),
|
|
)
|
|
|
|
# 2. Backfill with the default 'hartomat' tenant
|
|
op.execute(
|
|
f"UPDATE {table} "
|
|
"SET tenant_id = (SELECT id FROM tenants WHERE slug = 'hartomat')"
|
|
)
|
|
|
|
# 3. Make NOT NULL now that every row has a value
|
|
op.alter_column(table, "tenant_id", nullable=False)
|
|
|
|
# 4. Enable Row Level Security
|
|
op.execute(f"ALTER TABLE {table} ENABLE ROW LEVEL SECURITY")
|
|
|
|
# 5. Main isolation policy: tenant_id must match current session tenant
|
|
op.execute(f"""
|
|
CREATE POLICY tenant_isolation ON {table}
|
|
USING (
|
|
tenant_id = current_setting('app.current_tenant_id', true)::uuid
|
|
)
|
|
""")
|
|
|
|
# 6. Admin bypass policy: allows queries when setting is 'bypass'
|
|
op.execute(f"""
|
|
CREATE POLICY admin_bypass ON {table}
|
|
USING (
|
|
current_setting('app.current_tenant_id', true) = 'bypass'
|
|
)
|
|
""")
|
|
|
|
|
|
def downgrade():
|
|
# Grant BYPASSRLS so the downgrade itself can see all rows
|
|
op.execute("""
|
|
DO $$
|
|
BEGIN
|
|
EXECUTE 'ALTER ROLE ' || current_user || ' BYPASSRLS';
|
|
EXCEPTION WHEN insufficient_privilege THEN
|
|
RAISE NOTICE 'Could not set BYPASSRLS';
|
|
END;
|
|
$$;
|
|
""")
|
|
|
|
for table in reversed(TENANT_TABLES):
|
|
if not _table_exists(table):
|
|
continue
|
|
|
|
op.execute(f"DROP POLICY IF EXISTS admin_bypass ON {table}")
|
|
op.execute(f"DROP POLICY IF EXISTS tenant_isolation ON {table}")
|
|
op.execute(f"ALTER TABLE {table} DISABLE ROW LEVEL SECURITY")
|
|
op.drop_column(table, "tenant_id")
|