ormushq / ormus

Ormus - Customer Data Platform
MIT License
28 stars 19 forks source link

feat(destination): Add the Channel module #82

Closed mohsenHa closed 5 months ago

mohsenHa commented 5 months ago

Channel

I've introduced a new module called "Channel" to manage all communications between services. Here's a breakdown of the communication needs within the destination service:

I've implemented two adapters for the Channel module:

  1. RabbitMQ
  2. Simple

Both adapters provide a byte channel for communication. In the RabbitMQ adapter, two channels are declared: one for output and another for input. The adapter connects the output channel to publish on RabbitMQ and the input channel to consume from RabbitMQ. This means that when data is pushed to the input channel, the adapter receives it and publishes it to the RabbitMQ server. On the output channel, the adapter starts consuming from RabbitMQ, and when data is received, it publishes it to the output channel.

The simple adapter utilizes a straightforward Go channel on both sides. When data is published to the input channel, it's written to the output channel.

Here's the adapter interface:

type Adapter interface {
    GetInputChannel(name string) (chan<- []byte, error)
    GetOutputChannel(name string) (<-chan []byte, error)
    GetMode(name string) (Mode, error)
    NewChannel(name string, mode Mode, bufferSize int, numberInstants int)
}

Each adapter implements four methods:

mohsenHa commented 5 months ago

Description

This pull request introduces several updates to the current branch.

Changes Made

Acknowledgement Mechanism

I updated the type of the output channel from []byte to Message, which is defined as follows:

type Message struct {
    Ack  func(multiple bool) error
    Body []byte
}

With this structure, consumers outside the channel module can now handle acknowledgments efficiently.

Connection Handling

After establishing a connection to RabbitMQ, a goroutine is initiated to handle potential connection closures. When a connection is closed, the check goroutine attempts to reconnect. The RabbitMQ adapter provides a method for channel goroutines to check when a connection is available, utilizing sync.cond and broadcast to wake all channels waiting for a RabbitMQ connection.

Within the channel, we continuously monitor the connection status. If either the connection or the channel is closed, we restart the function after a 10-second delay. Upon restart, it calls the WaitForConnection function and enters a sleep state until the connection is re-established.

Purpose

These updates aim to enhance the reliability and flexibility of the channel module, allowing for more robust connection handling and improved acknowledgment mechanisms.