onebeyond / rascal

A config driven wrapper for amqp.node supporting multi-host connections, automatic error recovery, redelivery flood protection, transparent encryption / decryption and channel pooling.
MIT License
451 stars 69 forks source link

question: multi server subscriptions publishing same messages #113

Closed xenoterracide closed 4 years ago

xenoterracide commented 4 years ago

So I have this problem, we are using graphql as our API interface, for subscriptions it uses websockets. We of course want to have multiple servers backing this API. Currently the subscription we have in place receives a rabbit message and then republishes that to the pubsub (websocket). The problem is that with multiple graph servers any of them could pick up the rabbit message.

My solution, is to create a unique queue name for each server so that each one will have it's own queue. Currently I'm using the hostname to append to the static queue name. the hostname is derived from the docker container sha or whatever... (it's the default container one), which means it will change if the containers build changes, creating a bunch of subscriptions.

One of my concerns here is that yesterday, and this is weird, 2 of my queues deliverable.subscription and deliverable.subscription.1234 (or whatever, similar), both seemed to be picked up by my handler, and that was causing duplicate publications.

I can't nuke all the subscriptions because celery also has subscriptions (we're migrating away from a django app).

my proposed solution is, to delete the previous via some pattern match, but I have no idea how I'd accomplish that. Can you tell me how I could do that? or do you have a better idea?

P.S. Thanks for all the help/consideration so far, and this project, we were losing messages somewhere and migrating to rascal seems to have solved it.

cressie176 commented 4 years ago

Hi @xenoterracide,

Happy to help if I can, but want to make sure I fully understand the issue. I get how you need to create unique reply queues to route messages back to the correct server, and how you're using the container hostname to do this. We're also discussing a solution based on direct reply-to, but not sure when we'll have implemented it.

Unfortunately I got a bit lost when you talked about two queues deliverable.subscription and deliverable.subscription.1234. I assume the first queue is for outbound messages, and the second is for replies (with 1234 representing the sha). i.e.

┌─────────────────┐                 ┌─────────────────┐
│                 │                 │                 │
│                 │                 │                 │
│     Browser     │                 │  Graph Server   │
│                 │◀───────────────▶│                 │─────────────────────────────┐
│                 │   Web Socket    │                 │                             │
│                 │                 │                 │                             │
└─────────────────┘                 └─────────────────┘                             │
                                             ▲                                      │
                                             │                                      │
                                             │                                      │
                                             │                                      │
                                             │                                      │       ─
                                             │                                      ▼
                             ┌──────────────────────────────┐       ┌──────────────────────────────┐
                             │                              │       │                              │
                             │                              │       │                              │
                             │           Reply Q            │       │        Command Queue         │
                             │deliverable.subscription.1234 │       │   deliverable.subscription   │
                             │                              │       │                              │
                             │                              │       │                              │
                             └──────────────────────────────┘       └──────────────────────────────┘
                                             ▲                                      │
                                             │                                      │
                                             │                                      │
                                             │                                      │
                                             │                                      │
                                             │                                      │
                                             │        ┌─────────────────┐           │
                                             │        │                 │           │
                                             │        │                 │           │
                                             │        │ Baackend Server │           │
                                             └────────│                 │◀──────────┘
                                                      │                 │
                                                      │                 │
                                                      └─────────────────┘

Somehow your graph server handler is consuming messages from both queues, causing it to resend two graphql responses back to the websocket based client. I'm not sure my understanding is correct though, as if this were really what you were seeing then

  1. both the graph server and the backend server would be competing for messages
  2. the messages from the command queue presumably have a different format to the replies, which would probably cause other issues

You didn't mention either of these problems, so I suspect I have got something muddled. I'm also not sure how this could happen, unless your graph server was explicitly consuming from deliverable.subscription

I can't nuke all the subscriptions because celery also has subscriptions (we're migrating away from a django app).

What do you mean by subscriptions?

my proposed solution is, to delete the previous via some pattern match, but I have no idea how I'd accomplish that. Can you tell me how I could do that? or do you have a better idea?

What do you mean by previous

Sorry for all the questions.

xenoterracide commented 4 years ago

Ok, here's the flow, 5 and 6 were not supposed to be happening. for some reason when it was just deliverable.subscription I was getting duplicate messages. I deleted deliverable.subscription by hand. Now I'm not longer getting duplicate messages (meaning 5 and 6 aren't happening).

Screen Shot 2020-08-06 at 3 13 51 PM

What do you mean by previous

When we deploy a new version of the server, with new software, I'm, expectedly, getting a new queue. So what I mean by previous, is the prior server with a different hash.

Screen Shot 2020-08-06 at 3 13 51 PM

What do you mean by subscriptions?

graphql term, it's when you subscribe to events over a websocket, it's 1 way.

both the graph server and the backend server would be competing for messages

Oddly, this is exactly what I'm trying to prevent if we have 2 graph servers, since each graph server would host its own websocket, both graph servers would only have half the messages since rabbit is balancing the queue (this happened so we dropped to 1 server). so with queue per server we should not have this problem, but now we'll have the ever growing queue list problem.

note: haven't had time to read the direct reply to stuff yet.

xenoterracide commented 4 years ago

btw, did you use a program to make your diagram?

cressie176 commented 4 years ago

btw, did you use a program to make your diagram?

monodraw

cressie176 commented 4 years ago

to delete the previous

If you set a queue property to auto-delete: true, it should be automatically deleted when the consumer unsubscribes.

If set, the queue is deleted when all consumers have finished using it. The last consumer can be cancelled either explicitly or because its channel is closed. If there was no consumer ever on the queue, it won't be deleted. Applications can explicitly delete auto-delete queues using the Delete method as normal.

https://www.rabbitmq.com/amqp-0-9-1-reference.html

The queue may still persist if your application terminates after creating it, but before consuming, but that should be unlikely. Also I'm assuming that GraphQL subscriptions to persist across application restarts, but if they did, you would lose any replies that were left unprocessed on the queue when it was deleted.

xenoterracide commented 4 years ago

If you set a queue property to auto-delete: true, it should be automatically deleted when the consumer unsubscribes.

I thought about doing this, but if we just reboot the server/container for some reason, I don't know that I want the queue events to go away. Really this is only a problem at a "release" time.

cressie176 commented 4 years ago

There isn't a way with the amqp protocol to delete queues using a pattern, or to obtain a list of queues. You can do the latter with the RabbitMQ HTTP Management API though (assuming your broker has the management plugin enabled). You could list all the queues (probably specifying a filter) then delete them.

Rascal has extremely limited support for the Management API - you can only use it to assert vhosts into existence (and delete them afterwards). I added it to help with testing, but it isn't intended to be a production feature, and there's currently to way to use it directly.

cressie176 commented 4 years ago

A couple of things I'm still curious about in your architecture is

  1. How Django/Celery knows what queue to publish the message to
  2. How the Graph server knows what subscription the message is for

I assume both these bits of information must be passed in the initial HTTP request to Django/Celery?

xenoterracide commented 4 years ago

well, it uses amq.topic, so they publish to the exchange, and we just coordinate (developers) on routing keys and exchange

cressie176 commented 4 years ago

So Django/Celery publishes to amq.topic, but how does it decide the routing key? Also do you remember what binding key/pattern you used for the two queues delivery.subscription and delivery.subscription.1234?

xenoterracide commented 4 years ago

we just talked about what routing key to use for what before firing the events. it's hard coded.

  queue: string = queueName(this) + '.' + hostname();
  bindingKey = 'device.*.deliverable.*';
cressie176 commented 4 years ago

Did both delivery.subscription and delivery.subscription.1234 have the same binding key?

xenoterracide commented 4 years ago

yes

cressie176 commented 4 years ago

so that explains how the same message ended up on both queues, however I still don't see how both messages would have been delivered to the graph server, unless you had called broker.subscribe twice with the respective queue names. There's no way that I'm aware of in Rascal/amqplib/RabbitMQ to call channel.consume once, and have it consume from multiple queues.

xenoterracide commented 4 years ago

right, but somehow it happened, I don't understand it either.

cressie176 commented 4 years ago

Unrelated to the the issue of how your graph server could have started receiving messages from a queue that it wasn't expected to consumer from, I'm curious about whether messages can be delivered from the graph server to the browser after restarts.

I'm no expert on websockets, but my understanding is that they maintain a persistent/stateful connection. This means if the graph server restarts that the connection will be closed, and will not automatically be recovered. To receive any pending replies the browser would have to establish a new websocket with the same graph server, which would have to consume from the same unique queue.

Furthermore, if the server restarted because of a redeploy, it would get a new container sha. Not only would you have to re-establish connections, but you would also have to either consume from the previous queue, or move messages from the previous queue to the new one.

Is my understanding correct, and is this something you have a solution for? Love to hear it if so as it might be relevant for something one of my colleagues is working on.

xenoterracide commented 4 years ago

I'm curious about whether messages can be delivered from the graph server to the browser after restarts.

uh... I'm not certain but I find it highly doubtful with how apollo server subscriptions are implemented. However, they do recommend "production" ready pubsub which I'm not using, but maybe there's a way to implement a different one for rabbitmq that could do this.

Furthermore, if the server restarted because of a redeploy, it would get a new container sha. Not only would you have to re-establish connections, but you would also have to either consume from the previous queue, or move messages from the previous queue to the new one.

yes, and thinking about it (and the above), and how we've implemented things, I think auto-delete is actually a fine solution, the system also polls every minute (little frequent if you ask me, but there are reasons).

cressie176 commented 4 years ago

Closing as I don't think there's anything more that I can add. Doesn't sound like a rascal bug.