couchbase / sync_gateway

Manages access and synchronization between Couchbase Lite and Couchbase Server
https://www.couchbase.com/products/sync-gateway
Other
448 stars 138 forks source link

Feature: amqp event hook #2813

Closed RidgeA closed 4 years ago

RidgeA commented 7 years ago

Hello!

https://github.com/couchbase/sync_gateway/issues/2811

In my current project I need to synchronize changes that has been made in couchbase into external service. External service could be unavailable (reboot, maintains, etc) and I should keep all changes and send them as soon as service become available. As for me the better solution for this - to use message queue such as RabbitMQ.

It could be implemented using webhook, but I prefer to publish message directly from sync gateway.

So, I am going to implement this feature and make PR.

How it will looks like:

  1. In configuration file user can add amqp handler
    "event_handlers":  {
      "document_changed": [{
        "handler": "amqp",
        "url": "amqp://guest:guest@localhost:5672/",
        "amqp" : {
          // amqp-related options
        }
       ]},
    }
  1. During server start the amqp handler will be created, and connection (and channel) to amqp server will be established. Also I'm going to add watchers for changing channel and connection state to handle situation when amqp server is unavailable.
  2. In HandleEvent method the event's payload will be published to queue.

What features and options I'm going to implement.

  1. Where to publish a message - in a queue or an exchange.
  2. Options for amqp message - durable, headers, etc.
  3. Feature to declare amqp topology (exchange, queue, binding between queue and exchange) - optional and will be perform once, during server startup.
  4. Feature to keep messages for amqp server inmemory (in golang channel) if amqp service become unavailable - optional; this can lead to high memory consumption if a lot amount of messages stored in golang channel, but could help to keep messages during short-time amqp server downtime and eventually deliver messages.

What I'm going to change/add in codebase:

  1. Add one more case to create amqp handler in https://github.com/couchbase/sync_gateway/blob/master/rest/server_context.go#L691

  2. Add amqp-related configs in https://github.com/couchbase/sync_gateway/blob/master/rest/config.go

  3. Add new file next to https://github.com/couchbase/sync_gateway/blob/master/db/event_manager.go and write all logic here

  4. Optional - to make small refactoring of this method https://github.com/couchbase/sync_gateway/blob/master/db/event_handler.go#L72 and move extracting event's payload into separate method. May be I'll add new method into Event interface https://github.com/couchbase/sync_gateway/blob/master/db/event.go#L23 called Paylaod() to get event's body in []byte and will implement this method for existing events. It is not necessarily, but i believe this will make code better : -)

So, there are no significant changes into existing codebase (except 4) and this won't touch base functionality.

Of course, I'm going to write as much unit-test as possible.

I want to discuss this feature and, if it is could be useful and this PR could be accepted eventually to start work on it.

P.S. Actually I already have some POC implementation of this feature and it works great! :-)

tleyden commented 7 years ago

Feature to keep messages for amqp server inmemory (in golang channel)

If SG crashes, these will all get lost right? Is there any mechanism to redeliver?

tleyden commented 7 years ago

Is building Sync Gateway yourself a sustainable option? I ask because the way that Caddyserver does build-time plugins might me a good option here.

Eg, there is a single "hook" for receiving 3rd party components to receive changes that is a no-op by default, but can be wired up at build time. Everything would then run in the same process. (SG + 3rd party plugin)

The huge downside is that then supporting SG becomes difficult, since the 3rd party plugin could cause problems

RidgeA commented 7 years ago

@tleyden

If SG crashes, these will all get lost right? Is there any mechanism to redeliver?

Yes, if keep them in golang channel. I think it is possible make this messages persistent, but to do this I need to dive deeper into SG code.

Is building Sync Gateway....

Sorry, but I didn't get what did you mean in your second comment. Could you explain it?

tleyden commented 7 years ago

I don't think we'll accept a PR for this in this form, mainly due to the fact that it's specific to RabbitMQ, and it will be a never-ending exercise to keep supporting more and more Queue systems (Kafka, etc, ..)

One approach I just discussed with @adamcfraser:

This still has some known downsides (no redelivery when SG crashes before flushing events), but the upshot is that it would support any kind of destination.

K-Leon commented 7 years ago

I would love seeing something like this land - maybe incorporate the basic transmission logic (redelevery...) into core and provide the possibility to add the actual transmitter as an external process / plugin?

RidgeA commented 7 years ago

mainly due to the fact that it's specific to RabbitMQ

RabbitMQ is not the only implementation of AMQP (but very popular).

One approach I just discussed ....

I think this approach is less flexible than existing webhook. It require to place process at the same host with SG and implement logic of publishing message to destination according to its API. Webhook allows to do it on separate host.

I would love seeing something like this land - maybe incorporate the basic transmission logic (redelevery...) into core and provide the possibility to add the actual transmitter as an external process / plugin?

Redelivery couldn't be suitable for all kinds of destinations. E.g. if someone wants send messages via UDP for some reasons? But the idea to move event handling logic into plugin is good.

adamcfraser commented 4 years ago

Closing based on age.