replicate / cog

Containers for machine learning
https://cog.run
Apache License 2.0
7.76k stars 542 forks source link

Queue worker in Cog #443

Open andreasjansson opened 2 years ago

andreasjansson commented 2 years ago

This is a follow-up from https://github.com/replicate/cog/issues/404, where I've tried to distill the discussion into separate issues and prioritize issues into required and future work.

Required now

Future

sirupsen commented 2 years ago

I'm commenting on the high-level plan here, because this feedback is high-level ๐Ÿ‘€

I was sitting down with pen & paper thinking about how I'd design this from scratch... The big question I was asking myself: Why do the RPC in AMQP-land, rather than directly with the Cog HTTP server on any of the predictors?

I don't think AMQP is a good RPC-interface to expose to users. It's too idiosyncratic. An HTTP-based API is simpler, and what we already do. HTTP should carry through from your quick local demo to production -- despite the backing mechanism changing with a CLI-flag (memory to Rabbit/Redis).

To alleviate the barrier of entry of implementing an AMQP-based API, we'd likely get into the business of writing clients for all languages (Python, Go, Ruby, ...) and brokers (Redis, AMQP, Postgres, ..)... That's a serious amount of work. ๐Ÿ‘Ž๐Ÿป It also means that we have to carefully consider the API design of each broker, backward compatibility, etc. Of course we could make the AMQP the standard and turn that into everything else, but I think that's awkward too.

Enforcing the invariant that all communication with Cog goes through HTTP also gives us, and our users, far more leverage moving forward. They can move between Postgres, Rabbit, and Redis as the wind flies. I think we should put a hard line in the sand that users never talk directly to brokers. Users always talk to the Cog HTTP server.

What I'm proposing:

Untitled 18

Now, you may find it awkward to use the HTTP server in the predictors as the API. However, it's a minimal amount of cycles to service API requests rather than predictions. I think it's simple to understand, and I think it has by far the most optionality down the line.

In terms of load-balancing between Cogs for servicing API requests, we'd leave that to the user. If they're using K8s, they can just set up an ingress. If they're not, they can round-robin between them. Given the simplicity of it, even if you fail to round-robin, most likely a single one will be able to handle the load for 99.9% of users. Any Cog container can accept work for any other as it just turns it into a broker message -- however, only the right Cog container can actually perform the work (as you can see in my illustration).

In general, I just really worry that this AMQP-centered approach closes too many doors. I think this e.g. also doesn't close the door to a broker-less design. We could nest Docker runs so that you can ask any Cog worker to execute any workload, and it'll pull in the appropriate container (or be pre-pulled). That'd be very simple and elegant... Would need to think a bit more about this

CtrlC-Root commented 2 years ago

I was sitting down with pen & paper thinking about how I'd design this from scratch... The big question I was asking myself: Why do the RPC in AMQP-land, rather than directly with the Cog HTTP server on any of the predictors?

I don't think AMQP is a good RPC-interface to expose to users. It's too idiosyncratic. An HTTP-based API is simpler, and what we already do. HTTP should carry through from your quick local demo to production -- despite the backing mechanism changing with a CLI-flag (memory to Rabbit/Redis).

There's a few downsides to an HTTP-based API depending on whether you tie model inference requests to HTTP requests or not. In the former case (i.e. one HTTP request to one inference request) it's harder to implement cancelling a request (maybe by closing the connection?), progress reporting while the request is running (maybe as a streaming response?), and reasonable timeouts (some models take minutes to run and you may have a maximum timeout imposed on you from cloud infrastructure you use like LBs or health checks). It's also more awkward to work with the request backlog as a first-class concept (i.e. for autoscaling purposes you may want to know exactly how many of each type of request is in the backlog at any given time). In the latter case (i.e. an HTTP API to create, monitor, and manage multiple requests) you need to store request state somewhere (i.e. requests created, their current status, etc) and monitor that state over time (i.e. polling). If there's a good way to implement thease features through an HTTP API without the drawbacks above then that would be worth considering.

Enforcing the invariant that all communication with Cog goes through HTTP also gives us, and our users, far more leverage moving forward. They can move between Postgres, Rabbit, and Redis as the wind flies. I think we should put a hard line in the sand that users never talk directly to brokers. Users always talk to the Cog HTTP server.

What I'm proposing:

Untitled 18

Now, you may find it awkward to use the HTTP server in the predictors as the API. However, it's a minimal amount of cycles to service API requests rather than predictions. I think it's simple to understand, and I think it has by far the most optionality down the line.

It has the most optionality at the cost of not being a good fit for some use cases. Whether that's a tradeoff worth making or how is a different question.

In terms of load-balancing between Cogs for servicing API requests, we'd leave that to the user. If they're using K8s, they can just set up an ingress. If they're not, they can round-robin between them. Given the simplicity of it, even if you fail to round-robin, most likely a single one will be able to handle the load for 99.9% of users. Any Cog container can accept work for any other as it just turns it into a broker message -- however, only the right Cog container can actually perform the work (as you can see in my illustration).

We had a system like this in place for our production inference. Each instance of our backend ran an HTTP API to enqueue requests to a queue and a worker to process requests from the same queue. We used the queue backlog to autoscale the pool of instances. It can certianly work but it suffered the pitfalls I listed above. Namely cancellation and progress reporting had to happen out of band. Also we still had to consider the impact of changing the queue message format on already queued messages (i.e. during deploys to production) so we were effectively maintaining two separate protocols which had to be backwards compatible anyways.

In general, I just really worry that this AMQP-centered approach closes too many doors. I think this e.g. also doesn't close the door to a broker-less design. We could nest Docker runs so that you can ask any Cog worker to execute any workload, and it'll pull in the appropriate container (or be pre-pulled). That'd be very simple and elegant... Would need to think a bit more about this

Our internal tool supports an HTTP API which we use for local development and an AMQP API that we use everywhere else. IMO I think it makes sense to support different protocols for different use cases unless there's some way to work around the limitations of HTTP that I am not aware of.

sirupsen commented 2 years ago

@CtrlC-Root thanks for your thoughtful response! Let me try to comment on the critical points here... I think bidirectional HTTP2 streams can accomplish all of this, if I understand you right ๐Ÿ‘€

implement cancelling a request

I wrote "SSE" in the diagram, which I do think we likely want to implement for simpler clients, but HTTP2 provides bi-directional streams. The client can thus send a frame to the server to terminate, change priority, or whatever other side-channel information we may want long-term

progress reporting while the request is running (maybe as a streaming response?)

Exactly, when the client submits the parameters for the prediction the server will set up a long-running HTTP2 stream and send progress updates back with logging, etc.

It's also more awkward to work with the request backlog as a first-class concept (i.e. for autoscaling purposes you may want to know exactly how many of each type of request is in the backlog at any given time).

I don't think HTTP or not changes this (and the other concerns you mentioned after this), but I may be misunderstanding you ๐Ÿ‘‚๐Ÿป

An auto-scaler can still reach into the broker to see queue sizes or whatever other stats to scale up the necessary models. However, I think this should be a first-class API on each broker (Postgres, Redis, Rabbit, ...) where you can get a Map<ModelName, QueueBacklogSize> or whatever (of course, the implementation of this in e.g. Rabbit might be a bit awkward -- but doable).

Similar, if there's an API to poll the status of the API, that's part of the broker implementation. When the request reaches the Cog container, it has to register it with the broker in a way that implements getStateFor(id) and getQueueBacklogSizes() (better names, but to be verbose and illustrative ๐Ÿ˜… )

So in essence, I think all of what you're asking for is do-able over HTTP, encapsulated in the driver

@evilstreak asked: "What about logging?"

For logging with this solution, I see two fundamental approaches, assuming that it comes back to the client via the HTTP2 bi-directional stream (which is where you'd also send cancellations, we'd send updates, whatever)

(1) Broker-based. All logging would flow through the broker, in a broker-specific way. For RabbitMQ, might be a logging-specific queue that the predicting Cog container emits to, and the Cog container you're connected to will pop off that queue and stream it back to the client in the HTTP2 stream. (This is what I've pictured in the comment on the issue)

(2) Proxy-based. We could have the one you connect to use service discovery (in the broker) to locate the Cog container for your model, and then proxy the connection to it. From then on, we know we'd have access to everything -- this is how parts of k8s works (or, at least, used to work)

bfirsh commented 2 years ago

To alleviate the barrier of entry of implementing an AMQP-based API, we'd likely get into the business of writing clients for all languages (Python, Go, Ruby, ...) and brokers (Redis, AMQP, Postgres, ..)

This is a great point, and was on my mind too. Ideally users would be interacting via a client library, and maintaining that matrix will be horrible.

This is a really interesting idea. I can't think of any fundamental flaws with the approach. What I particularly like is we're describing a single standard API, with a built-in adapter system for running it on top of multiple systems. Much more Docker-like.

Another thing worth noting is that for AMQP, Redis, etc, implementing a client is really fiddly (or much more so than HTTP) so you'll really need client libraries. We'll have to build and maintain those right now. If it's built-in, then it effectively ships with its own client library.

A few things popping into my head:

  1. I wonder how often this would not be run on Kubernetes with an ingress. Service discovery for lots of random workers sounds like a nightmare.
  2. Maybe people actually want a broker to sit in between these two systems, as a form of firewalling? Maybe people want to talk to, e.g., a central Kafka instance instead of going via HTTP?
  3. What happens if we mix gRPC into this? Does that ease any of the issues of dealing with HTTP directly? (The downside being we lose the simplicity of plain HTTP.)
sirupsen commented 2 years ago

I wonder how often this would not be run on Kubernetes with an ingress. Service discovery for lots of random workers sounds like a nightmare.

I mean, even if you're not running on K8s you'll have to route to n pods... even if you're on bare-metal, hand-rolled Chef'ed up Nginx, you just hard-code the IPs. Not a big deal. Heroku, Render, etc. will support it just fine.

If we go the route of letting Cog proxy to each other, then it requires a little more thoughtโ€”but definitely still very doable. We'd just do service-discovery in whatever storage machanism they'll give us (anything can store a list of hosts)

Maybe people actually want a broker to sit in between these two systems, as a form of firewalling? Maybe people want to talk to, e.g., a central Kafka instance instead of going via HTTP?

If you want to do this, run an instance of Cog as a sidekick to your broker. I don't expect this to be super common

What happens if we mix gRPC into this? Does that ease any of the issues of dealing with HTTP directly? (The downside being we lose the simplicity of plain HTTP.)

If we think about Cog as a 10-year project (at least), then gRPC / GraphQL / HTTP2 / HTTP3... will all just be endpoints that the Cog HTTP server in the container supports. Doesn't architecturally change anything

Another thing worth noting is that for AMQP, Redis, etc, implementing a client is really fiddly (or much more so than HTTP) so you'll really need client libraries. We'll have to build and maintain those right now. If it's built-in, then it effectively ships with its own client library.

Exactly my point โ€” HTTP, GraphQL and gRPC are the the only modern things where you can get away with no client library

bfirsh commented 2 years ago

It's worth noting this might be something we don't have to do right now. Cleaning up and documenting a queue-based API seems like a noble cause, regardless of whether it's internal or external.

E.g. It might just be that it's initially external, for the sake of getting something shipped, but over time instead of implementing the matrix from hell of client libraries, we add this HTTP-based client to Cog. (It also occurred to me it's a 3 dimensional matrix -- the API needs versioning too! And we probably want to support old Cogs in new clients. Arrrgghh!)

andreasjansson commented 2 years ago

This is a really interesting idea. I particularly like how it abstracts away the queue consumer logic. I've spent countless hours fighting both RabbitMQ and Redis Streams to get that logic right. If we can encapsulate that behind a simple HTTP API, it would be a huge time saver for anyone who wants to build a batch inference system like this. And in terms of maintenance it'll be less work for us than a bunch of Cog client libraries in different languages.

Would it have to use SSE/bidirectional HTTP2 or could we use a more "boring" async API where the prediction endpoint returns an operation ID that could be polled against? The feels like a more familiar design, it would handle breaking connections, would allow us to return progressive output and logs from the poll endpoint, and could support a cancellation endpoint.

Thinking about how we'd fit this into replicate.com's architecture, we'd probably need to run an HTTP proxy in front of Cog's built-in HTTP servers. Currently when we get a prediction request, we not only add a message to a queue, but we also do some custom things, like scale k8s deployments. I can imagine users of Cog wanting to add predictions to a database, route to specific model variants based on A/B tests, log prediction outputs to some custom monitoring system, etc. In those cases they'll probably also add a proxy in front. But with a simple HTTP API that proxy should be easy to implement.

From my reading of the proposal above, it seems like the HTTP server for a particular model worker would only handle requests to that model? In replicate.com's case, we scale model worker deployments down to zero pods. But perhaps the built-in HTTP server could be configured to handle messages to every model that's connected to a broker.

sirupsen commented 2 years ago

Would it have to use SSE/bidirectional HTTP2 or could we use a more "boring" async API where the prediction endpoint returns an operation ID that could be polled against?

Good pointโ€”I laid out the options here, I think there's definitely a case to be made that all we cover is the polling API for a while until HTTP2 is requested (and possibly it never is, polling is great for its simplicity).

need to run an HTTP proxy in front of Cog's built-in HTTP servers.

So by "proxy" here, you don't necessarily mean "nginx" (albeit, it could be in early innings and likely for most customers) but "make it really easy to implement the Cog API in something else and just route it through"? Kind of like a super().

I like that. In the replicate.com case, it'd also mean that we could then kill the current HTTP frontend and replace it with a simple Cog proxy with our business logic on top, yeah?

That said, it could also be that we make it easy to somehow inject custom code into your Cog HTTP server..? I don't think we have to figure that out right now, nothing in this design makes that non-reversible -- but for ease of use, it may be easier that we allow e.g. mounting a volume where if you create predictions_post.py with a function called callback the HTTP server calls that function, without you needing to create a heavy-handed proxy. I hate the sound of exactly that, but it's just to go out on a limb to illustrate a possible way to implement "callbacks" here.

From my reading of the proposal above, it seems like the HTTP server for a particular model worker would only handle requests to that model? In replicate.com's case, we scale model worker deployments down to zero pods. But perhaps the built-in HTTP server could be configured to handle messages to every model that's connected to a broker.

In the replicate.com case we'd probably need the proxy to signal scaling up the target pod and stall the request until that's up. We have to be careful not to over-specialize Cog for replicate.com โ€” I think the 0-scaled pod is a use-case fairly specific to us, no?

CtrlC-Root commented 2 years ago

When running inference in production for user-facing functionality I think latency is likely to be an important metric. For that reason alone I think preferring push instead of polling semantics is a good idea. This was part of why we opted to move away from a simple HTTP1.1 API to an AMQP API internally. With push notifications to mobile clients and WebSockets/Server Push/HTTP2/etc you can actually avoid polling throughout the entire stack. Hence my question above about whether HTTP2 would allow us to implement an HTTP API without having to resort to polling.

As an example, our internal target from when the user creates an inference request in a user-facing mobile client to when they see the first image is on the order of single seconds. Choosing polling intervals and request timeouts across 2-3 layers of HTTP1.1 APIs to achieve this goal reliably is non-trivial.

evilstreak commented 2 years ago

For now we're going to stick with Redis as our queueing system. I've updated this issue and the linked issues to rename/reword them or close them as appropriate.

Some of the discussion above outlines an approach where we only expose an HTTP API from Cog, and make the queueing system an internal implementation detail. That seems like a good direction to go in, so we're going to make changes to the queueing API we currently use to match the HTTP API, but we aren't going to swap from Redis to AMQP.