Pub/Sub Adapters
JSandy’s WebSocket layer is provider‑agnostic. You can bring your own real‑time backend by implementing a tiny adapter that satisfies a simple interface and configuring it on the router.
This page shows:
- The
PubSubAdapterinterface - How to configure the adapter on your router
- Ready‑to‑use examples:
- Upstash REST (Cloudflare Workers‑friendly)
- In‑memory (local dev)
- Cloudflare Pub/Sub (Kafka API) — conceptual outline
- Native Redis (Node‑only)
JSandy’s WebSocket server is designed to run on Cloudflare Workers. If your adapter needs TCP (e.g., native Redis), that won’t work on Workers — pick an HTTP/SSE compatible provider (e.g., Upstash REST) for Workers, or run your WebSocket server on a Node runtime.
The Adapter Interface
JSandy expects a minimal adapter with publish/subscribe:
export interface PubSubAdapter {
/**
* Publish a message to a topic/room.
*/
publish(topic: string, payload: unknown): Promise<void>;
/**
* Subscribe to a topic/room. Call `onMessage` for every payload.
* Must respect `options.signal` for cancellation.
*/
subscribe(
topic: string,
onMessage: (payload: unknown) => void,
options?: {
signal?: AbortSignal;
onOpen?: () => void;
onError?: (error: unknown) => void;
},
): Promise<void>;
}Payload convention:
- JSandy sends/receives
["eventName", data]tuples over pub/sub. - Your adapter should pass the parsed tuple back to JSandy’s socket.
Wiring the Adapter into the Router
Set an adapter per request via router.config({ getPubSubAdapter }):
import { jsandy, type PubSubAdapter } from "@jsandy/rpc"
export const j = jsandy.init()
// Choose an adapter (see examples below)
function getPubSubAdapter(_c: unknown): PubSubAdapter {
// return new MyAdapter(...)
throw new Error("Implement getPubSubAdapter")
}
export const api = j.router().config({ getPubSubAdapter })That’s it — all WebSocket procedures on this router will use your adapter.
Example 1: Upstash REST (Cloudflare Workers‑friendly)
Upstash exposes Redis pub/sub over HTTP REST + SSE, which works on Workers.
import { jsandy, UpstashRestPubSub } from "@jsandy/rpc"
import { env } from "hono/adapter"
export const j = jsandy.init()
export const api = j.router().config({
getPubSubAdapter: (c) => {
const { UPSTASH_REDIS_REST_URL, UPSTASH_REDIS_REST_TOKEN } = env(c)
return new UpstashRestPubSub(UPSTASH_REDIS_REST_URL, UPSTASH_REDIS_REST_TOKEN)
},
})Cloudflare secrets (if you choose Upstash):
wrangler secret put UPSTASH_REDIS_REST_URL
wrangler secret put UPSTASH_REDIS_REST_TOKENExample 2: In‑Memory Adapter (Local Dev)
Great for quick local testing — no external infra required.
import type { PubSubAdapter } from "@jsandy/rpc"
export class InMemoryPubSub implements PubSubAdapter {
private subs = new Map<string, Set<(p: unknown) => void>>()
async publish(topic: string, payload: unknown) {
const set = this.subs.get(topic)
if (!set) return
for (const cb of set) cb(payload)
}
async subscribe(
topic: string,
onMessage: (payload: unknown) => void,
options?: { signal?: AbortSignal; onOpen?: () => void; onError?: (e: unknown) => void },
) {
let set = this.subs.get(topic)
if (!set) {
set = new Set()
this.subs.set(topic, set)
}
set.add(onMessage)
options?.onOpen?.()
// Clean up when cancelled
options?.signal?.addEventListener(
"abort",
() => {
set!.delete(onMessage)
},
{ once: true },
)
}
}Wire it up:
import { jsandy } from "@jsandy/rpc"
import { InMemoryPubSub } from "./in-memory-pubsub"
export const j = jsandy.init()
export const api = j.router().config({
getPubSubAdapter: () => new InMemoryPubSub(),
})Note: This is single‑process only; it won’t scale across multiple instances.
Example 3: Cloudflare Pub/Sub (Kafka API) — Conceptual Outline
Cloudflare Pub/Sub provides a Kafka‑compatible API. In Workers, you’d typically integrate through a Kafka client that supports the Workers runtime or a Cloudflare‑provided binding.
This outline shows the shape — consult Cloudflare’s documentation for a production‑ready implementation and authentication details.
import type { PubSubAdapter } from "@jsandy/rpc"
/**
* Conceptual Cloudflare Pub/Sub adapter (Kafka API)
* Note: This is illustrative — actual producer/consumer setup depends on the client you use
* and how you configure bindings/credentials in `wrangler.toml`.
*/
export class CloudflareKafkaPubSub implements PubSubAdapter {
constructor(
private opts: {
// e.g., SASL/SSL config, broker URLs, topic prefix, etc.
bootstrapServers: string[]
username: string
password: string
topicPrefix?: string
},
) {}
async publish(topic: string, payload: unknown): Promise<void> {
const fullTopic = `${this.opts.topicPrefix || ""}${topic}`
// 1) Create/Reuse a producer
// 2) Serialize payload (JSON.stringify)
// 3) send({ topic: fullTopic, value: JSON.stringify(payload) })
// PSEUDOCODE:
// await producer.send([{ topic: fullTopic, value: JSON.stringify(payload) }])
}
async subscribe(
topic: string,
onMessage: (payload: unknown) => void,
options?: { signal?: AbortSignal; onOpen?: () => void; onError?: (e: unknown) => void },
): Promise<void> {
const fullTopic = `${this.opts.topicPrefix || ""}${topic}`
// 1) Create/Reuse a consumer
// 2) subscribe({ topic: fullTopic })
// 3) on each message: JSON.parse and call onMessage(parsed)
// 4) options?.onOpen?.() once connected
// 5) Respect options.signal to close the consumer gracefully
// PSEUDOCODE:
// await consumer.subscribe({ topic: fullTopic })
// options?.onOpen?.()
// const abortHandler = () => consumer.close()
// options?.signal?.addEventListener("abort", abortHandler, { once: true })
// await consumer.run({
// eachMessage: async ({ message }) => {
// try { onMessage(JSON.parse(message.value.toString())) } catch {}
// },
// })
}
}Wire it up (with your config/secrets):
import { jsandy } from "@jsandy/rpc"
import { CloudflareKafkaPubSub } from "./cf-pubsub"
export const j = jsandy.init()
export const api = j.router().config({
getPubSubAdapter: () =>
new CloudflareKafkaPubSub({
bootstrapServers: ["<BROKER_1>", "<BROKER_2>"],
username: "<SASL_USERNAME>",
password: "<SASL_PASSWORD>",
topicPrefix: "myapp.",
}),
})Important: The code above is intentionally a template. Use a Kafka client compatible with Workers and follow Cloudflare’s official guidance for bindings, credentials, and connectivity.
Example 4: Native Redis (Node‑only)
This approach uses a TCP Redis client (e.g., redis or ioredis) and thus requires a Node environment (not Cloudflare Workers).
import type { PubSubAdapter } from "@jsandy/rpc"
import { createClient, type RedisClientType } from "redis" // Node-only
export class NativeRedisPubSub implements PubSubAdapter {
private pub!: RedisClientType
private sub!: RedisClientType
constructor(private url: string) {}
private async ensure() {
if (!this.pub) {
this.pub = createClient({ url: this.url })
this.sub = createClient({ url: this.url })
await Promise.all([this.pub.connect(), this.sub.connect()])
}
}
async publish(topic: string, payload: unknown): Promise<void> {
await this.ensure()
await this.pub.publish(topic, JSON.stringify(payload))
}
async subscribe(
topic: string,
onMessage: (payload: unknown) => void,
options?: { signal?: AbortSignal; onOpen?: () => void; onError?: (e: unknown) => void },
): Promise<void> {
await this.ensure()
try {
await this.sub.subscribe(topic, (msg) => {
try {
onMessage(JSON.parse(msg))
} catch {
// ignore malformed
}
})
options?.onOpen?.()
} catch (e) {
options?.onError?.(e)
}
// Clean up on abort
options?.signal?.addEventListener(
"abort",
async () => {
try {
await this.sub.unsubscribe(topic)
} catch {}
},
{ once: true },
)
}
}Wire it up (Node server):
import { jsandy } from "@jsandy/rpc"
import { NativeRedisPubSub } from "./native-redis-pubsub"
export const j = jsandy.init()
export const api = j.router().config({
getPubSubAdapter: () => new NativeRedisPubSub(process.env.REDIS_URL!),
})Tips & Best Practices
- Always honor
options.signalinsubscribe()to avoid leaking open streams. - Validate and JSON‑serialize payloads. JSandy expects
["event", data]. - For Workers, choose providers with HTTP/SSE/Web APIs (e.g., Upstash REST).
- For Node runtimes, native clients (Redis, NATS, Kafka) are fine — just ensure your hosting platform supports persistent connections.
Next Steps
- See the full WebSockets guide: /docs/backend/websockets
- Build a simple chat with room broadcasts using your adapter
- Add metrics/logging around publish/subscribe for observability
