facebook / lexical

Lexical is an extensible text editor framework that provides excellent reliability, accessibility and performance.
https://lexical.dev
MIT License
19.1k stars 1.61k forks source link

Bug: Collaboration between multiple users not working #6353

Open excelwebzone opened 2 months ago

excelwebzone commented 2 months ago

When using the Lexical editor’s CollaborationPlugin with the MercureProvider (based on y-websocket code), real-time collaboration features such as user cursor visibility and document updates are not working. Messages are being sent and received, but changes are not reflected across different users’ views.

Lexical version:

@lexical/react: ^0.13.1
lexical: ^0.13.1
y-protocols: ^1.0.6
yjs: ^13.6.18

Steps To Reproduce

  1. Initialize the Lexical editor with the CollaborationPlugin.
  2. Configure the plugin to use the MercureProvider as described in the collaborator.ts code below.
  3. Open the editor in multiple browsers with different user credentials.
  4. Attempt to edit the document and observe the behavior.
import { CollaborationPlugin } from '@lexical/react/LexicalCollaborationPlugin';
import { createMercureProvider } from './collaboration';

const initialConfig = {
  namespace: 'Streamer',
  nodes: [...EditorNodes],
  onError: (error: Error) => {
    throw error;
  },
  theme: EditorTheme,
  editorState: initialEditorState,
  editable: !readOnly,
};

return (
  <LexicalComposer key={editorKey} initialConfig={initialConfig}>
    <ToolbarPlugin
      setIsLinkEditMode={setIsLinkEditMode}
      {...toolbarProps}
    />

    <div
      ref={containerRef}
      className={overrideTailwindClasses(
        clsx(
          'relative block w-full rounded-t-lg bg-white',
          containerClassName
        )
      )}
    >
      <GlobalEventsPlugin
        isTypeaheadMenuActive={isTypeaheadMenuActive}
        onEnter={submitOnEnter ? submit : undefined}
        onEscape={onCancel}
      />
      <DragDropPastePlugin
        canEmbedImages={
          toolbarProps?.buttons?.insertImage ||
          floatingToolbarProps?.buttons?.insertImage
        }
        onFilePaste={onFilePaste}
      />
      {collaborationId ? (
        <CollaborationPlugin
          id={collaborationId}
          username={currentUser?.name}
          cursorColor={currentUser?.color || undefined}
          cursorsContainerRef={containerRef}
          providerFactory={createMercureProvider}
          initialEditorState={initialEditorState}
          shouldBootstrap={!skipCollaborationInit}
        />
      ) : (
        <HistoryPlugin />
      )}

// ...
import { Provider, ProviderAwareness } from '@lexical/yjs';
import * as decoding from 'lib0/decoding';
import * as encoding from 'lib0/encoding';
import { Observable } from 'lib0/observable';
import * as authProtocol from 'y-protocols/auth';
import * as awarenessProtocol from 'y-protocols/awareness';
import * as syncProtocol from 'y-protocols/sync';
import * as Y from 'yjs';

import MercureClient, { MercureMessage } from '../../utils/mercureClient';

export const messageSync = 0;
export const messageQueryAwareness = 3;
export const messageAwareness = 1;
export const messageAuth = 2;

const messageHandlers: Array<
  (
    encoder: encoding.Encoder,
    decoder: decoding.Decoder,
    provider: MercureProvider,
    emitSynced: boolean,
    messageType: number
  ) => void
> = [];

messageHandlers[messageSync] = (
  encoder: encoding.Encoder,
  decoder: decoding.Decoder,
  provider: MercureProvider,
  emitSynced: boolean,
  _messageType: number
) => {
  encoding.writeVarUint(encoder, messageSync);
  const syncMessageType = syncProtocol.readSyncMessage(
    decoder,
    encoder,
    provider.doc,
    provider
  );
  if (
    emitSynced &&
    syncMessageType === syncProtocol.messageYjsSyncStep2 &&
    !provider.getSynced()
  ) {
    provider.setSynced(true);
  }
};

messageHandlers[messageQueryAwareness] = (
  encoder: encoding.Encoder,
  _decoder: decoding.Decoder,
  provider: MercureProvider,
  _emitSynced: boolean,
  _messageType: number
) => {
  encoding.writeVarUint(encoder, messageAwareness);
  encoding.writeVarUint8Array(
    encoder,
    awarenessProtocol.encodeAwarenessUpdate(
      provider.awareness as unknown as awarenessProtocol.Awareness,
      Array.from(provider.awareness.getStates().keys())
    )
  );
};

messageHandlers[messageAwareness] = (
  _encoder: encoding.Encoder,
  decoder: decoding.Decoder,
  provider: MercureProvider,
  _emitSynced: boolean,
  _messageType: number
) => {
  awarenessProtocol.applyAwarenessUpdate(
    provider.awareness as unknown as awarenessProtocol.Awareness,
    decoding.readVarUint8Array(decoder),
    provider
  );
};

messageHandlers[messageAuth] = (
  _encoder: encoding.Encoder,
  decoder: decoding.Decoder,
  provider: MercureProvider,
  _emitSynced: boolean,
  _messageType: number
) => {
  authProtocol.readAuthMessage(decoder, provider.doc, (_ydoc, reason) =>
    console.warn(
      `Permission denied to access ${provider.getRoomID()}.\n${reason}`
    )
  );
};

class MercureProvider extends Observable<string> implements Provider {
  public doc: Y.Doc;
  public awareness: ProviderAwareness;
  private synced = false;
  private roomID: string;
  private eventListeners: Map<string, Set<(arg: any) => void>> = new Map();

  constructor(roomID: string, doc: Y.Doc, { connect = true } = {}) {
    super();
    this.roomID = roomID;
    this.doc = doc;
    this.awareness = new awarenessProtocol.Awareness(
      doc
    ) as unknown as ProviderAwareness;

    this.doc.on('update', this.yjsUpdateHandler.bind(this));
    (this.awareness as unknown as awarenessProtocol.Awareness).on(
      'update',
      this.yjsAwarenessUpdateHandler.bind(this)
    );

    if (connect) {
      this.connect();
    }
  }

  private readMessage(buf: Uint8Array, emitSynced: boolean): encoding.Encoder {
    const decoder = decoding.createDecoder(buf);
    const encoder = encoding.createEncoder();
    const messageType = decoding.readVarUint(decoder);
    const messageHandler = messageHandlers[messageType];
    if (messageHandler) {
      messageHandler(encoder, decoder, this, emitSynced, messageType);
    } else {
      console.error(`Unknown message type: ${messageType}`);
    }
    return encoder;
  }

  private ocpDataUpdateHandler(data: MercureMessage): void {
    const message = typeof data === 'string' ? data : JSON.stringify(data);
    const uint8Array = new Uint8Array(
      message.split('').map((char) => char.charCodeAt(0))
    );
    const encoder = this.readMessage(uint8Array, true);
    if (encoding.length(encoder) > 1) {
      this.send(encoding.toUint8Array(encoder));
    }
  }

  private yjsUpdateHandler(update: Uint8Array, origin: unknown): void {
    if (origin !== this) {
      const encoder = encoding.createEncoder();
      encoding.writeVarUint(encoder, messageSync);
      syncProtocol.writeUpdate(encoder, update);
      this.send(encoding.toUint8Array(encoder));
    }

    this.dispatchEvent('update', update);
  }

  private yjsAwarenessUpdateHandler({
    added,
    updated,
    removed,
  }: {
    added: number[];
    updated: number[];
    removed: number[];
  }): void {
    const changedClients = added.concat(updated).concat(removed);
    const encoder = encoding.createEncoder();
    encoding.writeVarUint(encoder, messageAwareness);
    encoding.writeVarUint8Array(
      encoder,
      awarenessProtocol.encodeAwarenessUpdate(
        this.awareness as unknown as awarenessProtocol.Awareness,
        changedClients
      )
    );
    this.send(encoding.toUint8Array(encoder));
  }

  connect(): void {
    MercureClient.subscribe(
      `/rooms/${this.roomID}`,
      this.ocpDataUpdateHandler.bind(this)
    );

    if (this.doc && this.awareness) {
      this.sendSyncMessages();
      this.dispatchEvent('status', { status: 'connected' });
    }
  }

  disconnect(): void {
    const encoder = encoding.createEncoder();
    encoding.writeVarUint(encoder, messageAwareness);
    encoding.writeVarUint8Array(
      encoder,
      awarenessProtocol.encodeAwarenessUpdate(
        this.awareness as unknown as awarenessProtocol.Awareness,
        [this.doc.clientID],
        new Map()
      )
    );
    this.send(encoding.toUint8Array(encoder));

    MercureClient.unsubscribe(`/rooms/${this.roomID}`);

    this.dispatchEvent('status', { status: 'disconnected' });
  }

  dispose(): void {
    this.disconnect();
    (this.awareness as unknown as awarenessProtocol.Awareness).off(
      'update',
      this.yjsAwarenessUpdateHandler
    );
    this.doc.off('update', this.yjsUpdateHandler);
    super.destroy();
  }

  private sendSyncMessages(): void {
    const encoderSync = encoding.createEncoder();
    encoding.writeVarUint(encoderSync, messageSync);
    syncProtocol.writeSyncStep1(encoderSync, this.doc);
    this.send(encoding.toUint8Array(encoderSync));

    const encoderState = encoding.createEncoder();
    encoding.writeVarUint(encoderState, messageSync);
    syncProtocol.writeSyncStep2(encoderState, this.doc);
    this.send(encoding.toUint8Array(encoderState));

    const encoderAwarenessQuery = encoding.createEncoder();
    encoding.writeVarUint(encoderAwarenessQuery, messageQueryAwareness);
    this.send(encoding.toUint8Array(encoderAwarenessQuery));

    const encoderAwarenessState = encoding.createEncoder();
    encoding.writeVarUint(encoderAwarenessState, messageAwareness);
    encoding.writeVarUint8Array(
      encoderAwarenessState,
      awarenessProtocol.encodeAwarenessUpdate(
        this.awareness as unknown as awarenessProtocol.Awareness,
        [this.doc.clientID]
      )
    );
    this.send(encoding.toUint8Array(encoderAwarenessState));
  }

  private send(message: Uint8Array): void {
    MercureClient.publish(
      `/rooms/${this.roomID}`,
      String.fromCharCode(...message)
    );
  }

  getRoomID(): string {
    return this.roomID;
  }

  getSynced(): boolean {
    return this.synced;
  }

  setSynced(value: boolean): void {
    this.synced = value;
  }

  private dispatchEvent(type: string, arg: any): void {
    const listeners = this.eventListeners.get(type);
    if (listeners) {
      listeners.forEach((cb) => cb(arg));
    }
  }

  on(
    type: 'sync' | 'update' | 'status' | 'reload',
    cb: (arg: any) => void
  ): void {
    if (!this.eventListeners.has(type)) {
      this.eventListeners.set(type, new Set());
    }
    this.eventListeners.get(type)!.add(cb);
  }

  off(
    type: 'sync' | 'update' | 'status' | 'reload',
    cb: (arg: any) => void
  ): void {
    if (this.eventListeners.has(type)) {
      this.eventListeners.get(type)!.delete(cb);
    }
  }
}

export function createMercureProvider(
  id: string,
  yjsDocMap: Map<string, Y.Doc>
): Provider {
  let doc = yjsDocMap.get(id);
  if (!doc) {
    doc = new Y.Doc();
    yjsDocMap.set(id, doc);
  } else {
    doc.load();
  }

  return new MercureProvider(id, doc);
}
import axios from 'axios';
import * as pako from 'pako';

import { mergeDeep } from './mergeDeep';

export type MercureMessage = Record<string, any> | string;
export type MercureDependencies = Record<string, any>;

export type MercureEventSource = {
  topic: string;
  data: MercureMessage;
  chunkIndex?: number;
  totalChunks?: number;
};

class MercureClient {
  private mercureHubUrl: string;
  private mercureJwt: string;
  private subscribedTopics: Record<
    string,
    (data: MercureMessage, dependencies: MercureDependencies) => void
  > = {};
  private dependencies: MercureDependencies = {};
  private eventSource: EventSource | null = null;
  private receivedChunks: Record<
    string,
    { chunks: string[]; totalChunks: number }
  > = {};

  constructor(mercureHubUrl: string, mercureJwt: string) {
    this.mercureHubUrl = mercureHubUrl;
    this.mercureJwt = mercureJwt;

    if (this.mercureHubUrl) {
      this.initializeEventSource();
    }
  }

  private initializeEventSource() {
    const url = new URL(this.mercureHubUrl);
    url.searchParams.append('topic', '/datasync');

    this.eventSource = new EventSource(url.toString());

    this.eventSource.onmessage = (event) => {
      const { topic, data, chunkIndex, totalChunks }: MercureEventSource =
        JSON.parse(event.data);

      let jsonData;
      try {
        if (chunkIndex !== undefined && totalChunks !== undefined) {
          // Handle chunked data
          if (!this.receivedChunks[topic]) {
            this.receivedChunks[topic] = { chunks: [], totalChunks };
          }
          this.receivedChunks[topic].chunks[chunkIndex] = data as string;

          if (window.appConfig?.isDev) {
            console.debug(
              `Received chunk ${
                chunkIndex + 1
              }/${totalChunks} for topic ${topic}`
            );
          }

          if (
            this.receivedChunks[topic].chunks.filter(Boolean).length ===
            totalChunks
          ) {
            const fullData = this.receivedChunks[topic].chunks.join('');
            const decodedData = atob(fullData);
            const charData = decodedData.split('').map((x) => x.charCodeAt(0));
            const binData = new Uint8Array(charData);
            const decompressedData = pako.inflate(binData, { to: 'string' });
            jsonData = JSON.parse(decompressedData);

            // Clean up after processing
            delete this.receivedChunks[topic];
          } else {
            return; // Wait for more chunks
          }
        } else {
          // Handle non-chunked data
          try {
            const decodedData = atob(data as string);
            const charData = decodedData.split('').map((x) => x.charCodeAt(0));
            const binData = new Uint8Array(charData);
            const decompressedData = pako.inflate(binData, { to: 'string' });
            jsonData = JSON.parse(decompressedData);
          } catch (e) {
            // If decoding and decompressing fails, set jsonData to data directly
            jsonData = JSON.parse(data as string);
          }
        }
      } catch (error) {
        // If parsing fails, set jsonData to the original data
        jsonData = data;
      }

      const callback = this.subscribedTopics[topic];
      if (callback) {
        if (window.appConfig?.isDev) {
          console.debug(`Message received on topic ${topic}:`, jsonData);
        }

        callback(jsonData, this.dependencies);
      }
    };
  }

  setDependencies(dependencies: Record<string, any>) {
    this.dependencies = mergeDeep(this.dependencies, dependencies);
  }

  subscribe(
    topic: string,
    callback: (data: MercureMessage, dependencies: MercureDependencies) => void
  ) {
    if (!this.subscribedTopics[topic]) {
      this.subscribedTopics[topic] = callback;
    }
  }

  unsubscribe(topic: string) {
    delete this.subscribedTopics[topic];
  }

  transfer(topic: string, data: MercureMessage) {
    const callback = this.subscribedTopics[topic];
    if (callback) {
      callback(data, this.dependencies);
    }
  }

  async publish(topic: string, data: MercureMessage) {
    try {
      const formData = new URLSearchParams();
      formData.append('topic', '/datasync');
      formData.append('data', JSON.stringify({ topic, data }));

      await axios.post(this.mercureHubUrl, formData, {
        headers: {
          Authorization: `Bearer ${this.mercureJwt}`,
        },
      });
    } catch (error) {
      console.error('Error publishing data:', error);
    }
  }
}

export default new MercureClient(
  window.appConfig.mercureHubUrl,
  window.appConfig.mercureJwt
);

The current behavior

Although no errors are thrown, the collaboration does not seem to be working between multiple users. Messages are sent and received, but user cursors and updated content are not visible across different users.

The expected behavior

User cursors and document updates should be visible to all connected users in real-time.

Impact of fix

This bug affects all users attempting to use the collaboration feature, causing a lack of real-time collaboration functionality. Fixing this issue would benefit any team or group relying on real-time editing, improving the overall usability of the Lexical editor.

excelwebzone commented 2 months ago

Based on https://lexical.dev/docs/collaboration/react it's critical for collaboration plugin to set editor state to null:

const initialConfig = {
  namespace: 'Streamer',
  nodes: [...EditorNodes],
  onError: (error: Error) => {
    throw error;
  },
  theme: EditorTheme,
  editorState: !collaborationId ? initialEditorState : null,
  editable: !readOnly,
};

This seems to solve the sync issue, so I see both users making changes at the same time. But now, the editor is empty even though I pass the initialEditorState to the plugin:

<CollaborationPlugin
  id={collaborationId}
  username={currentUser?.name}
  cursorColor={currentUser?.color || undefined}
  cursorsContainerRef={containerRef}
  providerFactory={createMercureProvider}
  initialEditorState={initialEditorState}
  shouldBootstrap={!skipCollaborationInit}
/>

Note: Still don't see the cursor of the users..

excelwebzone commented 2 months ago

Quick update: I kinda solve the initialEditorState empty issue. Had to make sure the editor key and collaboration doesn't change on the component.

As for the cursor, it seems to be working, but when I open for a second user (new browser) it only start showing the cursor after I press enter, even though I see the change the second user name via the send/received message.

Any thought, anyone?