kapv89 / yjs-scalable-ws-backend

MIT License
149 stars 25 forks source link

Current state of project and how to deploy it on k8s? #6

Open junoriosity opened 2 years ago

junoriosity commented 2 years ago

Hi @kapv89

many thanks for your project. I would love to try it out myself. Before can you tell me whether the awareness issue you raised here, is still a problem?

If not, could you tell me how I could deploy it in k8s. For instance:

It would be great if you could help me with these questions. πŸ˜€

kapv89 commented 2 years ago

Hey ... The awareness issue is sorted ... It's a bit late night for me right now, tomorrow will share with you the test code where the resolution of awareness was tested. There is also another update which I have been meaning to push for some time which I'll push tomorrow. And yes k8s, horizontal pod scaling etc are all possible. Woul d answer your questions tomorrow.

In the meantime.. can you consider a request ... If you are able to get the k8s setup done .. would you share its details with me?

junoriosity commented 2 years ago

Hi @kapv89

many thanks for getting back to me. Yes, it would be great if you could get back to me regarding my questions. πŸ˜€

With the help of this, I can try to setup a helm chart for deploying everything in k8s and I will share it with you. πŸ˜€

junoriosity commented 2 years ago

@kapv89 Generally, I believe it should not be too much of a problem to build a helm chart for k8s deployment.

However, what I would need to understand is:

Long story short: I need some more knowledge how to use the current version and how my personal preferences can be implemented. Then I think I know enough to write some helm charts for it. πŸ˜€

kapv89 commented 2 years ago

Hey .. https://github.com/kapv89/yjs-scalable-ws-backend-test ... this is the repo for inspecting sync of awareness across 2 separate processes. You can take a look at it, meanwhile let me get to your other questions

kapv89 commented 2 years ago

Added something to provide better performance of persistence of updates by applying eventual-consistency ... copying that code from my private project and pushing that too ... basically no transactions and redis queues help with better persistence

kapv89 commented 2 years ago

@junoriosity just updated the code and changed peristence logic to not use transactions and use redis queues for better persistence performance using eventual-consistency, and for better behaviour of client when a new user connects in an active, in-use document. Coming to your questions now

kapv89 commented 2 years ago

Which prerequisites regarding redis or any other cloud components are necessary?

In a production environment, the crdt-wss (crdt-websocket-server) will need access to a redis-cluster (under a key-prefix), and access to your api server. In a real world app, your api server will have the logic of persistence of updates, and of retrieval of updates (persistUpdate & getUpdates in setupWSConnection.ts). Its simple to pass your auth-token (probably a JWT) to crdt-wss at the time of establishing a connection with it, and have the endpoints called in persistUpdates and getUpdates do authentication check on that token.

PS: you'd need to use js-base64 to send updates over the wire as json payload. This has CPU cost. There is a workaround for this, but I haven't gotten to the part where I need to apply that workaround.

Should Redis be a service within k8s or should it be an external one?

Redis should be a service like GCP memorystore or AWS elasticache. You are better off with your cloud-provider managing it and providing observability to it.

Which environment variables have to be set?

You can see the environment variables used in this repo in .env.example .. pasting them below for further reference

SERVER_HOST=localhost
SERVER_PORT=3002

DB_HOST=
DB_USER=
DB_PASSWORD=
DB_NAME=

REDIS_HOST=
REDIS_PORT=
REDIS_PREFIX=

In a production app, you wouldn't have the crdt-wss connecting directly to DB, so in that case your .env would look like

SERVER_HOST=
SERVER_PORT=

REDIS_HOST=
REDIS_PORT=
REDIS_PREFIX=

API_ENDPOINT_BASE=

Can I scale it with a horizontal pod autoscaler?

Yes, but there is a gotcha when it comes to horizontally scaling websocket-servers. Basically, after the load-balancer has routed a request from a client to a particular websocket-server, all incoming packets from that client need to be routed to the same websocket-server. This is accomplished using a sticky_session load balancing strategy. Socket.io's documentation describes this gotcha quite well - https://socket.io/docs/v4/using-multiple-nodes/

Can I also persist documents to other storage like s3 or does it have to be Postgresql?

Updates to a document come fast, like in a collaborative editing environment, let's say 10 users are on the same document, sending 3 edits per second, that's 30 edits per second on the same document -> lot's of inserts and deletes per-second in the db under the logic of persistence used in this crdt-wss. S3 won't be able to handle this rate of writes. But yes, s3 can be used as a long term storage of documents, which can be loaded into a fast primary db on demand. In this repo, and in my project, the primary db used is Posgresql because I have my eye on Yugabyte which can allow my db to scale horizontally. However there is a restriction, max size of bytea column type in PG can go to 1GB, so that puts a hard document size limit on my documents. The only way I know around this is to build my own database only for yjs-crdt-documents, but I don't have the need to do that right now.

Now coming to uploading documents to S3, you can use Y.encodeStateAsUpdate(yDoc) to get a UInt8Array representing the document, which you can easily convert to Buffer and upload to S3.

Can I somehow allow only specific users to read resp. write a document or block them completely?

Yes. It all comes down to being able to pass the auth-token (JWT) of the user to the crdt-wss, storing it against the connection, and using the auth-token in read and write operations to perform access control. I'll paste relevant bits of code from my project's codebase, if you need further help, I'm open to doing a call.

export const getDocIdFromReq = (req: any): string => {
  return req.url.slice(1).split('?')[0] as string;
}
// what's `doc.id` in my project code is `doc.name` in this repo
export const getTokenFromReq = (req: any): string => {
  const [, query] = req.url.split('?');
  const data = qs.parse(query);
  return data.token as string;
}
export const app = express();
export const server = http.createServer(app);
export const wss = new WebSocketServer({noServer: true});

wss.on('connection', async (ws, req) => {
  await setupWSConnection(ws, req);
});

server.on('upgrade', async (req, socket, head) => {
  const token = getTokenFromReq(req);

  try {
    await checkAuth(token);

    wss.handleUpgrade(req, socket as Socket, head, (ws) => {
      wss.emit('connection', ws, req);
    })
  } catch (err) {
    serverLogger.error(err);
    socket.write('HTTP/1.1 401 Unauthorized\r\n\r\n');
    socket.destroy();
  }
});
export const connTokens = new Map<WebSocket, string>();
export default async function setupWSConnection(conn: WebSocket, req: http.IncomingMessage): Promise<void> {
  conn.binaryType = 'arraybuffer';
  const docId = getDocIdFromReq(req);
  const token = getTokenFromReq(req); // this token can be used in `getUpdates`, which can call your API server
  connTokens.set(conn, token);

  const [doc, isNew] = getYDoc(docId);
  doc.conns.set(conn, new Set());
...
// rest of the project code is similar to what can be found in this repo
export const closeConn = (doc: WSSharedDoc, conn: WebSocket): void => {
  const controlledIds = doc.conns.get(conn);
  if (controlledIds) {
    doc.conns.delete(conn);
    awarenessProtocol.removeAwarenessStates(doc.awareness, Array.from(controlledIds), null);

    if (doc.conns.size == 0) {
      doc.destroy();
      docs.delete(doc.id);
    }
  }

  connTokens.delete(conn); // this is the relevant line for managing tokens

  conn.close();
}
export const updateHandler = async (update: Uint8Array, origin: any, doc: WSSharedDoc): Promise<void> => {
  const isOriginWSConn = origin instanceof WebSocket && doc.conns.has(origin)
  let postFailed = false;

  if (isOriginWSConn) {
    try {
      Promise.all([
        pub.publishBuffer(doc.id, Buffer.from(update)),
        pushDocUpdatesToQueue(doc, update)
      ]) // do not await
      const encoder = encoding.createEncoder();
      encoding.writeVarUint(encoder, messageSync);
      syncProtocol.writeUpdate(encoder, update);
      const message = encoding.toUint8Array(encoder);
      doc.conns.forEach((_, conn) => send(doc, conn, message));

      await persistUpdate(doc.id, update, connTokens.get(origin) as string); // notice use of token for persistence here
    } catch {
      postFailed = true;
    }
  }

  if (postFailed && isOriginWSConn) {
    closeConn(doc, origin);
  }
}

Which environment variables are to be set?

I think I have already answered this

How does it connect to Redis?

Not sure what you are looking for here - ioredis is the redis lib used, other than that I have already suggested using managed redis for crdt-wss. From what I remember about k8s, you'd need to expose the memorystore/elasticache URL inside your kubernetes cluster using one of its constructs.

For my personal use-case I would like to ensure at times that only users that are allowed to do so (which can be obtained from a read-out from a Postgresql database) can connect to it when they are Auth0-authenticated. Then they should be able to read and/or write.

Think I have already answered this, let me know if you have more questions

Also is it possible to store Quill documents in S3? How would that have to implemented.

Think I have already answered this.

@junoriosity let me know if you need anything else ... looking forward to that helm-chart πŸ˜„

junoriosity commented 2 years ago

Hi @kapv89, many thanks for all your input.

That is a lot of information to digest. Perhaps we should continue our discussion on Slack or something as this might be better suited for some back and forth to get the backend as I need it and also to setup a decent helm chart. How does that sound to you?

kapv89 commented 2 years ago

Hey @junoriosity … slack sounds good … I am kapv89 on slack too

junoriosity commented 2 years ago

@kapv89 I could not find you there, so I added your e-mail as an external invite Β πŸ˜€

kapv89 commented 2 years ago

@junoriosity please continue the conversation here. Slack was too intrusive on my personal space.

Just for reference, till now I've answered your questions in this issue, and I've explained several different parts of the architecture you want. And you have added helm charta to this repo which I need to verify if they are using sticky-sessions or not

junoriosity commented 2 years ago

@kapv89 Sticky sessions depend on the reverse proxy configuration, which is not part of the helm chart. The helm chart brings your service into kubernetes.

Just for the reference, we agreed on the idea that I would provide the code for the helm chart (which is there already) and you would provide (by 01/11) the required amendments on the backend to not persist in Postgresql, but rather Redis itself plus some API calls to clarify rights of the users and to load the binary into Redis, if not available.

kapv89 commented 2 years ago
kapv89 Sticky sessions depend on the reverse proxy configuration, which is not part of the helm chart. The helm chart brings your service into kubernetes.

Just for the reference, we agreed on the idea that I would provide the code for the helm chart (which is there already) and you would provide (by 01/11) the required amendments on the backend to not persist in Postgresql, but rather Redis itself plus some API calls to clarify rights of the users and to load the binary into Redis, if not available.

I didn't promise a date ... I gave you a clear description of each piece of your architecture, with the last piece related to S3 persistence left a little vague because of time constraints, and then various events during the festive season took my time, during which time you formulated your own approach towards the distributed backend you need and narrowed down the requirements to what you have stated.

Anyways ... It seems like the following pieces are needed for your requirements:

An api server with the following endpoints

  1. Generate token with read or read+write privileges
  2. Accept a yjs update which can be queued up for persistence to s3

a crdt-wss with the following features

  1. Accept a token while establishing connection to a doc, and get access-payload with privilege data for that doc
  2. To apply privilege data to the logic of reads and writes in the crdt-wss

Will it be enough if I only provide the crdt-wss with an api interface that you can fulfill with your own api-server?

kapv89 commented 2 years ago

Probably another api endpoint willl he required to fetch the document from s3

junoriosity commented 2 years ago

From my understanding, this is a promise to fulfill it by the end of 01/11

kapil

Anyway, the idea is to separate the authorization and persistence from your backend.

  1. The rights clarification is as discussed, i.e., after an API call has been made with or without token, the user no rights/read/write rights
  2. The backend checks, whether it has the document in its own memory.
  3. If not, it tries to get it from Redis and does the syncing
  4. If not found in Redis, then makes an API call to a service, which should provide that document in Redis, and after that, you go back to Point 3
  5. The syncing between the websocket-backends is as usual
  1. Perhaps we can even set an upper limit to the size of the documents

That way everyone can provide user-based access to its service by using its own API your backend is calling and also the persistence is decoupled from it. That suits a microservice architecture quite well.

kapv89 commented 2 years ago

Yeah ... that message you shared was after you started acting a bit aggressively after me postponing my slot a few times due to festivities ... I'd call it me making a promise under duress, which I am not good at handling :)

Coming to the ask, I will provide the crdt-wss part only, and cover the interaction with backend via unit tests. The work will be in a branch of this repo. Will point you to the file containing the api client which you'll need to satisfy with an api server.

The festivities ended yesterday, and I have work to do at my job. Will give 2 hours or so tonight to this, and push the progress to a branch, and drop a comment here.

junoriosity commented 2 years ago

I do not see anything aggressive from my side after you postponed things further and further (four or five times, if I recall correctly) and I even narrowed down the efforts to make your life simpler (and your work more generally applicable), and then you promised something.

Anyway, let us focus on the code. What we need is to clarify a few more.

  1. We need to have a good documentation regarding the API endpoints (in which format do you make the requests and what replies do you expect). That matters for the following:
  1. Is the binary data stored in a Redis list or (!) is each binary stored in a separate key-value and the key is stored in a list?

  2. Assume we persist the data to Redis and we are having some issues like that perhaps Redis misses some updates or, on the contrary, it receives the same update multiple times. Also, the order might be corrupted or the Redis primary crashes and the data is not yet replicated to the replica. Could you tell me, which of these would pose a problem for obtaining the β€œreal” document and how much the damage would be, in particular, if we proceed with β€œregular” updates?

kapv89 commented 2 years ago

Have made some progress ... https://github.com/kapv89/yjs-scalable-ws-backend/tree/redis_permissions_s3 ... the wss now communicates with an api-endpoint, which in tests, uses an array for persistence. Redis is still being used but PG is not. These changes are localized to this branch, will probably move them to a separate repo later.

As you can see, the work is quite extensive and will take some time

junoriosity commented 2 years ago

Hi @kapv89 many thanks for sharing your input with me. It looks very good, but I think we can make it a bit easier:

At the current stage we need one API interaction:

The persistence could be done to the corresponding doc_id Redis queue (like here: https://github.com/kapv89/yjs-scalable-ws-backend/blob/main/src/setupWSConnection.ts#L170) without any API call, only with the key value trick as outlined above.

What do you think?

kapv89 commented 1 year ago

https://github.com/kapv89/yjs-scalable-ws-backend/tree/redis_permissions_s3 ... this branch has been updated with read+write and read access level functionality. Please go through the tests and ask questions.

Regarding loading doc from s3 to redis if it isn't already present there, that is out the scope of the crdt-wss. That should be handled by the api-server, in it's api endpoint GET /documents/{id}/updates. Similarly POST /documents/{id}/updates should handle the logic of writing updates to redis, and setting up periodic persistence to S3.

The logic to save an update to redis will look something like this:

export const getDocRedisKey = (docId: string) => `doc:${docId}:all_updates`

export const docRedisLifetimeInS = 3600;

export const saveDocUpdateInRedis = async (docId: string, update: Uint8Array) => {
  const key = getDocRedisKey(docId);

  const len = await redis.llen(key);

  if (len > 100) {
    const res = await redis.pipeline()
      .lrangeBuffer(key, 0, 100)
      .rpushBuffer(key, Buffer.from(update))
      .ltrim(key, 0, 100)
      .expire(key, docRedisLifetimeInS)
      .exec()
    ;

    const [, leftUpdates]: [Error | null, Buffer[]] = res[0];

    const doc = new Y.Doc();
    for (const u of leftUpdates) {
      Y.applyUpdate(doc, u);
    }

    const combinedUpdate = Y.encodeStateAsUpdate(doc);
    await redis.lpushBuffer(key, Buffer.from(combinedUpdate));
  } else {
    await redis.pipeline()
      .rpushBuffer(key, Buffer.from(update))
      .expire(key, docRedisLifetimeInS)
      .exec()
    ;
  }
}
junoriosity commented 1 year ago

Ah, that looks truly great. πŸ˜€

I have a few questions, could you be a bit more specific on what the three API endpoints should be receiving in terms of format and in terms of reply. So, this would mean

For the two functions getDocUpdates and postDocUpdates we would currently send the entire data as byte array. However, isn't it perhaps easier just to to do something like the following

This would at least remove the load from the API endpoints.

Nevertheless, this brings us a huge leap forward: Now one could build a persistence connector with API endpoints for any storage type and connect it to this websocket backend. πŸ˜€

For enforcing a maximum size on the documents, what would have to be done here?

junoriosity commented 1 year ago

@kapv89 It could also be, that a user does not come along with a token and might end up with read or no rights. Is that covered here as well?

kapv89 commented 1 year ago

No. Would you want such a user to establish a connection to your websocket server?

junoriosity commented 1 year ago

@kapv89 In some cases, yes. So it should be two-fold: For users providing a token and users without it (so which are not logged in, but still can see the document)

kapv89 commented 1 year ago

There are 2 levels here ... 1. Non logged in users receiving realtime updates as users with write permission change the document and 2. Non logged in users receive the latest persisted snapshot of the document but don't receive realtime updates

junoriosity commented 1 year ago

Ah, these are already implemented.

I imagine it as follows for non-loged-in users:

They make a request similar to logged-in users.

Depending on the reply (might be different between documents), they

Could you also get back to me regarding my questions on the API endpoints and document size from above?

kapv89 commented 1 year ago

Will probably get back on those sometime early coming week

junoriosity commented 1 year ago

Hi @kapv89 could you get back to me on that matter, please? πŸ˜€

junoriosity commented 1 year ago

@kapv89 Do you have some input regarding my questions?

kapv89 commented 1 year ago

Hey @junoriosity ... Caught up with a lot of work in job. Will try to take out 30 minutes this weekend to answer your questions

junoriosity commented 1 year ago

Hey @kapv89 that would be very helpful.

Also, could you please tell me, whether users with read resp. write rights are by any means differently treated? As far as I can see, there is no difference here.

kapv89 commented 1 year ago

what do you mean by "read resp. write"?

junoriosity commented 1 year ago

I mean this here: https://github.com/kapv89/yjs-scalable-ws-backend/blob/redis_permissions_s3/src/apiClient.ts#L13

kapv89 commented 1 year ago

They are treated differently, otherwise how will the tests work … check for usage of https://github.com/kapv89/yjs-scalable-ws-backend/blob/redis_permissions_s3/src/setupWSConnection.ts

junoriosity commented 1 year ago

@kapv89 That looks terrific, many thanks for it. Would still be great, if you could give me some input on my other questions later. πŸ˜€

junoriosity commented 1 year ago

@kapv89 Hi Kapil, could you perhaps get back to me questions raised above. That would be very helpful.

junoriosity commented 1 year ago

@kapv89 Could you please get back to the questions?