Cloudflare Workers

@pluv/io is designed to be self-hosted on the Cloudflare Workers runtime via Durable Objects.

While @pluv/io does support key-value storage-backed Durable Objects and the standard WebSocket API, this documentation will walkthrough the recommended way to setup @pluv/io - using WebSocket hibernation on SQLite-backed Durable Objects.

Installation

To install pluv.io for Cloudflare Workers (self-hosted), we will install the following packages from pluv.io:

PurposeLocationInstall command
Register websockets and custom eventsServernpm install @pluv/io
Adapter for Cloudflare runtimeServernpm install @pluv/platform-cloudflare
yjs CRDTBothnpm install @pluv/crdt-yjs

Installation Example

Here is an example installation for npm for you to copy:

# For the server
npm install @pluv/io @pluv/platform-cloudflare
# Server peer dependencies
npm install zod

# Optional if you wish to use CRDT storage
# For storage capabilities
npm install @pluv/crdt-yjs
# Storage peer dependencies
npm install yjs

Create PluvIO instance

Define an io (websocket client) instance on the server codebase:

// server/io.ts

import { yjs } from "@pluv/crdt-yjs";
import { createIO } from "@pluv/io";
import { infer, platformCloudflare } from "@pluv/platform-cloudflare";
import { Database } from "./db";

export type Env = {
    DB: D1Database;
};

/**
 * Infer type of Cloudflare Env
 * Note that it is defined outside of the `createIO` function. This is to
 * work around TypeScript type-inference limitations.
 */
const types = infer((i) => ({ env: i<Env> }));
export const io = createIO(
    platformCloudflare({
        // Provide the `env` type
        types,
        // Optional: Authorization is optional for `@pluv/platform-cloudflare`
        // If excluded, your `user` will be `null` on the frontend.
        authorize: ({ env }) => ({
            // Your own secret for generating JWTs
            secret: env.PLUV_AUTH_SECRET,
            // The shape of your user object. `id` must be provided
            user: z.object({
                id: z.string(),
                // Here is an additional field we can add
                name: z.string(),
            }),
        }),
        // Optional: Context that will be made available in event
        // handlers and procedures
        context: ({ env }) => ({
            db: new Database(env.DATABASE_URL),
        }),
        // Optional: Only if you want to use CRDT storage features
        crdt: yjs,
        // Optional: If you want to enable development logging
        debug: true,
        /**
         * Optional: Specify whether WebSocket event listeners should be
         * "attached" to or "detached" from the WebSocket upon registration.
         * 
         * Event listeners should be "detached" to use Cloudflare's WebSocket
         * hibernation.
         */
        mode: "detached", // @default = "detached"
        types,
    })
);

export const ioServer = io.server({
    // Optional: Only if you're using storage, and persisting
    // to a database
    getInitialStorage: async ({ room, context }) => {
        const { db } = context;

        // Stubbed example DB query
        const rows = await db.sql(
            "SELECT storage FROM room WHERE name = ?;",
            [room]
        );
        const storage = rows[0]?.storage ?? null;

        return storage;
    },
    // Optional: Only if you want to run code when a room
    // is deleted, such as saving the last storage state
    onRoomDeleted: async ({ room, encodedState, context }) => {
        // Upsert the db room with last storage state
    },
});

Attach to a RoomDurableObject

Next, create a RoomDurableObject and attach our new PluvServer to the RoomDurableObject:

// server/RoomDurableObject.ts

import { InferIORoom } from "@pluv/io";
import { DurableObject } from "cloudflare:workers";
import { Env, ioServer } from "./io";

export class RoomDurableObject extends DurableObject<Env> {
    private _room: InferIORoom<typeof ioServer>;

    constructor(state: DurableObjectState, env: Env) {
        this._room = ioServer.createRoom(state.id.toString(), { env, state });
    }

    // Only needed if using "detached" mode (i.e. hibernation)
    public async webSocketClose(
        ws: WebSocket,
        code: number,
        reason: string
    ): Promise<void> {
        const handler = this._room.onClose(ws);

        await handler({ code, reason });
    }

    // Only needed if using "detached" mode (i.e. hibernation)
    public async webSocketError(
        ws: WebSocket,
        error: unknown
    ): Promise<void> {
        const handler = this._room.onError(ws);
        const eventError = error instanceof Error ? error : new Error("Internal Error");

        await handler({ error: eventError, message: eventError.message });
    }

    // Only needed if using "detached" mode (i.e. hibernation)
    public async webSocketMessage(
        ws: WebSocket,
        message: string | ArrayBuffer
    ): Promise<void> {
        const handler = this._room.onMessage(ws);

        await handler({ data: message });
    }

    async fetch(request: Request) {
        if (request.headers.get("Upgrade") !== "websocket") {
            return new Response("Expected WebSocket", { status: 400 });
        }

        const alarm = await this.ctx.storage.getAlarm();
        if (alarm !== null) await this.ctx.storage.setAlarm(Date.now() + 60_000);

        const { 0: client, 1: server } = new WebSocketPair();

        // Only needed if you have specified authorize
        const token = new URL(request.url).searchParams.get("token");
        await this._room.register(server, { request, token });

        return new Response(null, { status: 101, webSocket: client });
    }

    // Recommended to run garbage collection periodically due to edge cases around
    // WebSocket hibernation
    public async alarm(alarmInfo?: AlarmInvocationInfo): Promise<void> {
        await this._room.garbageCollect();
        await this.ctx.storage.setAlarm(Date.now() + 60_000);
    }
}

Forward request to RoomDurableObject

Lastly, integrate your RoomDurableObject with your Cloudflare Worker's default handler:

// server/index.ts
import { Hono } from "hono";
import { ioServer } from "./io";

// Bind this in your wrangler.jsonc file
export { RoomDurableObject } from "./RoomDurableObject";

// Stub example to get roomId from url
const parseRoomId = (url: string): string => {
    /* get room from req.url */
};

const app = new Hono<{ Bindings: Env }>()
    // Only if you specified `authorize` on @pluv/io
    .get("/api/pluv/auth", async (c) => {
        const room = c.req.query("room") as string;

        const request = c.req.raw;
        // Example stub. However you get the authed user here
        const user = await getUser(request);
        const token = await ioServer.createToken({
            user,
            env: c.env,
            request,
            room
        });

        return c.text(token, 200);
    })
    // Setup your websocket request handler
    .get("/api/pluv/room". async (c) => {
        const env = c.env;
        const roomId = c.req.query("room") as string;

        // Assuming wrangler.toml:
        // [durable_objects]
        // bindings = [{ name = "rooms", class_name = "RoomDurableObject" }]
        const durableObjectId = c.env.rooms.idFromName(roomId);
        const room = c.env.rooms.get(durableObjectId);

        return room.fetch(request);
    });