globus / globus-compute

Globus Compute: High Performance Function Serving for Science
https://www.globus.org/compute
Apache License 2.0
144 stars 47 forks source link

Streaming results with WebSockets #410

Closed knagaitsev closed 3 years ago

knagaitsev commented 3 years ago

Currently, there is no auth done to confirm that a user is authorized to retrieve the results of a particular task. The current design of the WebSocket service is to poll tasks and either time out or send results back to the client when a result/exception is set. This is not ideal, as redis has queues (lists) that we can push finished tasks to in the forwarder, and pop tasks in the WebSocket service right when they are available, removing the need for polling.

Auth proposal

Auth details such as user and auth group should be stored as part of the redis task. When the WebSocket service needs to check whether a user is authorized to retrieve results of a task, the task id is sent to the web service where the redis auth details are checked.

Queue proposal

Batch objects should be added to redis. These will have an id, auth details just like tasks, the number of tasks in the batch, the number of completed tasks, and a redis queue which starts out empty and contains the ids of completed tasks. Redis tasks would also need a new batch_id field to specify which batch they belong to, so that when a task completes with a result/exception, the task id can get added to the correct batch queue and the number of completed tasks in the batch can be incremented.

The WebSocket service would wait to pop tasks off of the queues based on the batch_id that users send to the service. When a new task_id comes off of the queue, the service would then retrieve that task and send it back to the user, deleting that task from redis. If all tasks in the batch are completed, the batch is also deleted.

Other notes

Should a single task be treated as a batch of 1 task? This would have the benefits of keeping the simple single-task system uniform with the batch system, as well as gaining the ability to wait for the task to finish in the queue as can be done with batch tasks (rather than polling the task)

How should the timeout work if the WebSocket service is waiting a long time for a task that is not completing, meaning it cannot be popped off of the batch queue? The WebSocket service might also need to be watchful of broken batches where all tasks are completed but the redis batch did not get deleted (hopefully doesn't happen but just thinking ahead)

yadudoc commented 3 years ago

@Loonride Thanks for writing this up. Here are some thoughts on some things to consider here:

(WS = Websocket Service, FWS = funcx-web-service)

  1. We assume that the output of a task can be fetched by a single client, I believe the same principle should apply when we use WS to stream an arbitrary number of outputs.
  2. Can the outputs be fetched by a different client? I don't think this should be allowed because, if the results are fetched asynchronously by an executor backend, you are bound to end up in a broken state.
  3. We have the notion of fxc.batch_run(batch_of_tasks) which is a mechanism for the user to create a launch a fixed collection of tasks. In a notebook or any interactive environment, we should expect the user to launch many batches, sometimes for convenience otherwise for performance or due to batch size limitations. So if the user were to launch multiple batches, do we want the backend to request outputs of n batches? Or worse still what happens when the user launches additional batches, do we wait until an existing WS connection finishes, or launch more WS connections? I think that a fixed batch concept is not the right match here. Instead, if we create say a topic_id (pick a different name if you don't like this), and associate regular tasks or batches with it, we could have the executor backend listen for outputs on that topic. This would simplify things a fair bit.
  4. Security-wise, I'm in favor of keeping the WS simple. What do you think of having the WS service query the FWS to check if a (user, topic_id) pair is valid, this would be pretty lightweight that we could easily use an LRU cache on the FWS? We want to check if the user can access the pipe, and not each item that flows through the pipe.
  5. I think we are sort of implementing the reverse of what we have for tasks going to endpoints. Currently, the FWS sorts tasks into the appropriate endpoint-queue on REDIS. What we need is to have the forwarder sort outputs that it receives into the appropriate topic_id-queues. For this to work, we need every task in the redis hash-set to be associated with a topic_id, and the forwarder can do the rest.
  6. I like that in the batch model you can clean up easily when all tasks in a batch are retrieved. This is going to be an issue in the model described in the previous point, where there's no clear way to know when a topic_id is closed.

I think your notes at the end are important considerations here: what do we do about broken/partial states. We probably will have to clean up abandoned topid_ids just like abandoned task_ids. I'm not sure about what you meant about long wait times, do you mean a task that takes a day to process might tie up a web-socket for an extended period of time? This is going to be a problem regardless, and I think we'll need some mechanism for back-off.

BenGalewsky commented 3 years ago

Thank you for documenting this, @Loonride !

I'm a big fan of messaging systems to make developer's life easier. After hearing @yadudoc 's warning about REDIS not being a messaging system, and reading this stack overflow article - I wonder if we could make our lives easier by just bringing a small RabbitMQ sub chart into our deployment and relying on it.

In that event, the WS winds up being much simpler, just waiting on a Queue and serving up results. There could actually be one queue, with message metadata specifying the batchID. The process would only receive messages related to the batch it is serving.

Also, I hear @yadudoc 's comment about how people often use multiple batches. If there are reasons related to their problem for doing that, then so be it. If it's because of HTTP message size issues, then I think we need to improve batch_run to paginate the requests so they fit. No reason for users to know about our throttle restrictions.

yadudoc commented 3 years ago

@BenGalewsky It sounds like we want reliable PUB-SUB style messaging? If that's the case we sort of already implemented that over REDIS with a combination of PUB-SUB and queues: https://github.com/funcx-faas/funcx-forwarder/blob/main/funcx_forwarder/queues/redis/redis_pubsub.py#L23

For a lot of users, a single batch probably will work, just like how the most common workflow pattern is a bag-of-tasks. I think we want to design for the more interesting cases where 1) the user is interactively launching tasks to explore something 2) event/trigger style systems where tasks are launched in response 3) active/reinforcement learning style use-cases where there's no easily discernable pattern. Assuming there's a single fixed-size batch, I don't know how these would fit into that model. Maybe I'm missing something obvious, we should have a quick chat about this.

yadudoc commented 3 years ago

@BenGalewsky I agree that batch size limitations and handling should just happen transparently.

joshbryan-globus commented 3 years ago

A couple of quick thoughts regarding queuing systems vs pub sub. Redis is not a full fledged message broker by itself (missing metadata and aggregation, doesn't handle deadletter or re-routing, durability, at-most-once or at-least-once semantics etc), and making it behave that way requires some work. Celery actualy jumps through a lot of hoops to make redis work almost like a true message broker but still falls short when it comes to more complex routing rules. From that perspective, I'd second Ben's vote for something purpose built for messaging (at least in the long run). I would though caveat that with the same recommendation I've made for RDS/Elasticache ... if we can let AWS operate the underlying service then we are better off from an operations resource point of view. Amazon MQ does offer both Active MQ and Rabbit MQ engines.

Regarding topic vs batch ids, my 2 cents would be to plan for something other than batch id unless the lift is too big. By not entangling the purpose of batch id (i.e. an id for a single group of submissions) with the id used to subscribe to events, you enable a lot of interaction patterns. Besides the ones Yado mentioned, In theory a batch (single submission) could contain tasks with multiple topics and allow results to be waited on by different processes. If as a convenience we set "topic_id" to "batch_id" for a first version, that might limit changes we need to make to the api and webservice. But from a WS design perspective we may not want to paint ourselves in to that corner yet.

yadudoc commented 3 years ago

@joshbryan-globus I didn't know Amazon offered RabbitMQ, now that does change things a bit. We can go the same route as we do now with redis and Postgres, and keep easy local deployability. I agree that switching to a message broker instead of PUBSUB+Queue implementation is a better design choice. I've been disappointed with Rabbit's pika client and some of that I'm projecting that onto Rabbit.

knagaitsev commented 3 years ago

I'm still a bit confused what is meant by topic_id. I understand from the WS perspective that we want flexible groupings to subscribe to that aren't confined to a single batch or task.

But how would a topic_id be assigned? Would the default behavior be that a batch is assigned a single topic_id, but then the user could customize the topics of tasks within the batch, so that there could be multiple topics in a batch, and a topic could span multiple batches?

Would this then introduce a new value that the user may need to request? For example, a user asks the web service to make a new topic for them that they can listen on for results, and the web service gives them topic_id x. The user submits multiple batches and assigns all these batches the topic_id x. They then tell WS to listen on x and return results to them. (Let me know if I've misunderstood what is meant by topics)

knagaitsev commented 3 years ago

Latest plan

I discussed this last week with Yadu. The plan is as follows:

A user will retrieve a topic_id from the web service. The user can then add any tasks in a batch and tasks across batches to this topic when they submit tasks. The user can then send this topic_id to the WS server and ask it to send back results whenever a new task has results for this topic. The user could potentially make multiple WS connections that are all listening on the same topic_id, if they want the results in many places (Unclear if there is a use case but it should be implemented this way anyway so that there can't be race conditions).

Rabbit MQ will be used as a messaging system to send the WS server task result/exception messages. When a user sends a topic_id to the WS server, this id is first passed along to the web service to confirm the user is authorized to listen on it. Then, the WS server subscribes to the Rabbit MQ topic, or remains subscribed to it if it already was subscribed. The forwarder will publish any updates to Rabbit MQ for these topics when a task has a new result/exception, and the WS server will receive them. When an update comes for topic_id x, the WS server will determine all connected clients that are listening to x and send the update to them.

BenGalewsky commented 3 years ago

I'd like to have a more in-depth discussion of Topic-ID vs Batch-ID. I would think that a batch is a real concept in user space and would be reluctant to add a new concept.

knagaitsev commented 3 years ago

I'd like to have a more in-depth discussion of Topic-ID vs Batch-ID. I would think that a batch is a real concept in user space and would be reluctant to add a new concept.

Yes, let's discuss on Monday?

knagaitsev commented 3 years ago

Based on what was discussed in the meeting, the plan will be to have a single queue between the forwarder and the websocket server that task results get put on, with plenty of info in each queue message to filter on (user id, batch that the task belonged to?, group id?). The user will tell the websocket server what filters they want, and the websocket server will pull items off of the queue based on these filters after confirming that the user is authorized to use these particular filters.

yadudoc commented 3 years ago

Summarizing from our dev chat today with (@BenGalewsky, @ZhuozhaoLi, @yongyanrao and @Loonride)

Here are some outstanding tasks: