chore(repo): initialize planarchy workspace
This commit is contained in:
@@ -0,0 +1,133 @@
|
||||
import { Redis } from "ioredis";
|
||||
import { SSE_EVENT_TYPES, type SseEventType } from "@planarchy/shared";
|
||||
|
||||
export interface SseEvent {
|
||||
type: SseEventType;
|
||||
payload: Record<string, unknown>;
|
||||
timestamp: string;
|
||||
}
|
||||
|
||||
type Subscriber = (event: SseEvent) => void;
|
||||
|
||||
// Module-level subscriber registry (shared between EventBus and publishLocal)
|
||||
const subscribers = new Set<Subscriber>();
|
||||
|
||||
// Redis connection — use env var REDIS_URL or fallback to default dev URL
|
||||
const REDIS_URL = process.env["REDIS_URL"] ?? "redis://localhost:6380";
|
||||
const CHANNEL = "planarchy:sse";
|
||||
|
||||
let publisher: Redis | null = null;
|
||||
let subscriber: Redis | null = null;
|
||||
|
||||
function getPublisher(): Redis {
|
||||
if (!publisher) {
|
||||
publisher = new Redis(REDIS_URL, { lazyConnect: false, enableReadyCheck: false });
|
||||
publisher.on("error", (e: unknown) => console.error("[Redis publisher]", e));
|
||||
}
|
||||
return publisher;
|
||||
}
|
||||
|
||||
function setupSubscriber(): void {
|
||||
if (subscriber) return;
|
||||
try {
|
||||
subscriber = new Redis(REDIS_URL, { lazyConnect: false, enableReadyCheck: false });
|
||||
subscriber.on("error", (e: unknown) => console.error("[Redis subscriber]", e));
|
||||
void subscriber.subscribe(CHANNEL).catch((err: unknown) => {
|
||||
console.error("[Redis subscribe]", err);
|
||||
});
|
||||
subscriber.on("message", (_channel: string, message: string) => {
|
||||
try {
|
||||
const parsed = JSON.parse(message) as { type: SseEventType; payload: Record<string, unknown>; timestamp: string };
|
||||
publishLocal({ type: parsed.type, payload: parsed.payload, timestamp: parsed.timestamp });
|
||||
} catch { /* ignore parse errors */ }
|
||||
});
|
||||
} catch (e) {
|
||||
console.warn("[Redis setupSubscriber] Redis unavailable, SSE will be local-only:", e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* SSE Event Bus with Redis Pub/Sub for multi-instance support.
|
||||
* Gracefully degrades to in-memory delivery when Redis is unavailable.
|
||||
*/
|
||||
class EventBus {
|
||||
subscribe(fn: Subscriber): () => void {
|
||||
subscribers.add(fn);
|
||||
return () => subscribers.delete(fn);
|
||||
}
|
||||
|
||||
publish(event: SseEvent): void {
|
||||
// Broadcast via Redis (all instances receive via subscriber.on("message"))
|
||||
try {
|
||||
const pub = getPublisher();
|
||||
void pub.publish(CHANNEL, JSON.stringify({ type: event.type, payload: event.payload, timestamp: event.timestamp }));
|
||||
} catch (e) {
|
||||
console.warn("[Redis emit] fallback to local-only:", e);
|
||||
// Deliver locally when Redis is unavailable
|
||||
publishLocal(event);
|
||||
}
|
||||
}
|
||||
|
||||
emit(type: SseEventType, payload: Record<string, unknown>): void {
|
||||
this.publish({
|
||||
type,
|
||||
payload,
|
||||
timestamp: new Date().toISOString(),
|
||||
});
|
||||
}
|
||||
|
||||
get subscriberCount(): number {
|
||||
return subscribers.size;
|
||||
}
|
||||
}
|
||||
|
||||
// Local delivery: deliver to subscribers connected to THIS instance (called from Redis subscriber)
|
||||
function publishLocal(event: SseEvent): void {
|
||||
for (const fn of subscribers) {
|
||||
fn(event);
|
||||
}
|
||||
}
|
||||
|
||||
// Singleton event bus
|
||||
export const eventBus = new EventBus();
|
||||
|
||||
// Start Redis subscriber once at module init (best-effort)
|
||||
setupSubscriber();
|
||||
|
||||
// Helper emitters
|
||||
export const emitAllocationCreated = (allocation: Record<string, unknown>) =>
|
||||
eventBus.emit(SSE_EVENT_TYPES.ALLOCATION_CREATED, allocation);
|
||||
|
||||
export const emitAllocationUpdated = (allocation: Record<string, unknown>) =>
|
||||
eventBus.emit(SSE_EVENT_TYPES.ALLOCATION_UPDATED, allocation);
|
||||
|
||||
export const emitAllocationDeleted = (allocationId: string, projectId: string) =>
|
||||
eventBus.emit(SSE_EVENT_TYPES.ALLOCATION_DELETED, { allocationId, projectId });
|
||||
|
||||
export const emitProjectShifted = (project: Record<string, unknown>) =>
|
||||
eventBus.emit(SSE_EVENT_TYPES.PROJECT_SHIFTED, project);
|
||||
|
||||
export const emitBudgetWarning = (projectId: string, payload: Record<string, unknown>) =>
|
||||
eventBus.emit(SSE_EVENT_TYPES.BUDGET_WARNING, { projectId, ...payload });
|
||||
|
||||
export const emitVacationCreated = (vacation: Record<string, unknown>) =>
|
||||
eventBus.emit(SSE_EVENT_TYPES.VACATION_CREATED, vacation);
|
||||
|
||||
export const emitVacationUpdated = (vacation: Record<string, unknown>) =>
|
||||
eventBus.emit(SSE_EVENT_TYPES.VACATION_UPDATED, vacation);
|
||||
|
||||
export const emitVacationDeleted = (vacationId: string, resourceId: string) =>
|
||||
eventBus.emit(SSE_EVENT_TYPES.VACATION_DELETED, { vacationId, resourceId });
|
||||
|
||||
export const emitRoleCreated = (role: Record<string, unknown>) =>
|
||||
eventBus.emit(SSE_EVENT_TYPES.ROLE_CREATED, role);
|
||||
|
||||
export const emitRoleUpdated = (role: Record<string, unknown>) =>
|
||||
eventBus.emit(SSE_EVENT_TYPES.ROLE_UPDATED, role);
|
||||
|
||||
export const emitRoleDeleted = (roleId: string) =>
|
||||
eventBus.emit(SSE_EVENT_TYPES.ROLE_DELETED, { roleId });
|
||||
|
||||
export function emitNotificationCreated(userId: string, notificationId: string): void {
|
||||
eventBus.emit(SSE_EVENT_TYPES.NOTIFICATION_CREATED, { userId, notificationId });
|
||||
}
|
||||
Reference in New Issue
Block a user