socketio / socket.io

Realtime application framework (Node.JS server)
https://socket.io
MIT License
61.09k stars 10.11k forks source link

Extending state recovery into full message replay #4697

Open joelluijmes opened 1 year ago

joelluijmes commented 1 year ago

Is your feature request related to a problem? Please describe. Currently, socket.io does not have built-in message replay functionality. Implementing this feature at the application level is challenging. This capability is particularly beneficial for clients who join a session or room later but still need access to all previously transmitted messages.

Describe the solution you'd like I would like to have a message replay feature implemented, allowing users to request the retransmission of missed messages from a certain point in time or message sequence number. This feature could be triggered by a specific protocol trigger or when connecting to the server (similar to state recovery).

Describe alternatives you've considered

  1. Manual message replay: Create a mirrored copy of messages on the socket.io server. However, this approach could be susceptible to race conditions, such as synchronization problems where the mirrored message set does not align with the original messages managed by socket.io.
  2. Repurpose the recently added state recovery feature to perform message replay. See additional context.

Additional context I'm using socket.io in combination with the mongo-adapter. I managed to repurpose the state recovery feature to provide message replay. A significant advantage of this approach is that messages are transmitted before the connect event is triggered, allowing for more customization in handling those messages.

Recently, socket.io has introduced limited support for message replay known as state connection recovery. I repurposed this feature into message replay by doing two things:

  1. When the collection is created, I insert a special BROADCAST message with a fixed id of 000000000000000000000000. Upon client connection - when state recovery is enabled - it tries to find the latest message. However, the backend doesn't keep track of this, so with this specially crafted message, we now can reference a message that always exists.
  2. In my case, I already know the identifier of the client before it connects to the socket.io server. This is known by a regular HTTP call that generates an authentication token. When generating the token, I call persistSession to prepare a session. In regular state recovery, this is only triggered when the socket disconnects. Upon client connection, socket.io searches for an existing session to restore the client to.
    • I'm generating the pid as combination of the room id and the client id (provided by the client)
    • I'm generating the sid just as the socket.io server does thus a base64id.generateId

For references, look at the source code:

The client code then looks as:

const { sessionId, clientId, pid } = decode(token) as any;
console.log(`Session ID: ${sessionId}`)
console.log(`Client ID: ${clientId}`)
console.log(`pid: ${pid}`)

const socket = io.connect(endpoint, {
  path: "/socket.io/interaction-backbone",
  transports: ["websocket"],
  auth: {
    token: process.env.TOKEN,
    offset: "000000000000000000000000",
    pid,
  }
});
Actual implementation code ``` typescript /** * Creates the collection if it does not exist. If it already exists, it does nothing. * If it fails to create the collection, it throws an error. */ private async createCollectionIfNotExists() { try { await this.connection.db.createCollection(MONGO_SOCKETIO_COLLECTION); // This is hacky workaround to trick socketio into providing replay functionality. When // a client connects with the backbone, socketio can replay the messages from a certain // threshold (using greater than filter). // The backbone doesn't keep track of the last message sent. So instead, we create a // dummy message that is always the first message in the collection, with a known id. // This works because every id is larger than the dummy id. this.connection.db.collection(MONGO_SOCKETIO_COLLECTION).insertOne({ _id: new Types.ObjectId('000000000000000000000000'), type: EVENT_BROADCAST, }); } catch (e) { if (e.codeName === 'NamespaceExists') { return; } // rethrow if not collection already exists throw e; } } /** * Preregisters a session for a client. This is needed to provide clients with message replay. When the * client connects with the backbone, this ensures all previous messages are replayed. * @returns private identifier for the session, client - must be provided when connecting to the socket. */ public async registerSessionForClient(sessionId: string, clientId: string): Promise { const collection = this.connection.db.collection(MONGO_SOCKETIO_COLLECTION); // TODO: hash the pid to prevent leaking implementation details? // pid is a private unique identifier. This id is reused for all connections of the same client. // sessionId is generated when the session is created. clientId is provided by the client. // By combining both, we trick state recovery to replay all messages scoped to single session. const pid = `sessionId:${sessionId}-clientId:${clientId}`; // TODO: race condition exists here. Mitigation would be to create some locking mechanism. // out of scope for the current hack. const existingSession = await collection.findOne({ type: EVENT_SESSION, 'data.pid': pid, }); if (existingSession) { return pid; } // sid is socket id. We don't actually depend on it, but it is expected to exist during state recovery. // socket.io generates the sid on connection, so we need to generate it ourselves. This is the same // implementation as in the socket.io source code. const sid = base64id.generateId(); // This function returns void, but internally the call to write the document to the database is async. const { adapter } = this.io.of('/'); adapter.persistSession({ pid, sid, rooms: [sessionId], data: {}, }); // So we have this hacky workaround here to mitigate with the unexposed promise. We just wait until // we can find the record in the database. // NOTE: this path is called from HTTP request, so we could potentially ignore this, as there will // be some delay at the client between generating the token, and actually connecting. for (let i = 0; i < 100; i++) { await new Promise((r) => { setTimeout(r, i > 40 ? 100 : 25); }); const result = await collection.findOne({ type: EVENT_SESSION, 'data.pid': pid, }); if (result) { return pid; } } throw new Error(`Failed to find created session for pid ${pid}, sessionId ${sessionId}`); } ```

I think the state recovery is a great starting point in implementing message replay. I'm willing to put in the effort to actually implement this feature. However, before diving in, I wanted to make sure if such feature aligns with socket.io.

darrachequesne commented 1 year ago

That's an interesting idea, thanks a lot for raising this :+1:

How do you decide which messages a given client has access to? With the connection state recovery feature, we store the rooms the client was in at the moment of the disconnection, and send the messages that were missed for the given rooms upon reconnection.

How would it work with message replay?

joelluijmes commented 1 year ago

That's a valid argument. Perhaps we can make it work by providing the option to accept one or multiple rooms. Based on the given rooms, we can query the adapter for the corresponding messages, similar to how the state recovery operates. Moreover, it could be advantageous if users could access global messages sent outside the scope of a room when no room is specified.

However, I must admit that I am uncertain about how we can distinguish these messages and whether we need to make any other modifications to the data format.

darrachequesne commented 1 year ago

I think we would need an additional abstraction here, as the client does not know the rooms it's in (it's a server side only concept).

Food for thoughts: https://ably.com/docs/realtime/history