rabbitmq / rabbitmq-stream-go-client

A client library for RabbitMQ streams
MIT License
170 stars 20 forks source link

Smart Producer for v2 #219

Open Zerpet opened 1 year ago

Zerpet commented 1 year ago

Description

We have to implement the smart producer, similar as the v1 Producer. However, Iā€™d like to take the chance that we are doing a major and change the API drastically. Some challenges I observe with the current implementation:

Producer is a concrete struct

This is problematic to provide different implementations of producers. In particular, implementations for "regular" stream and super streams. Java client provides the producer as an interface, hiding the concrete implementation and returning the right producer (stream vs super stream) based on options from the builder. .NET client uses a concrete implementation for the producer, and changes its behaviour (stream vs super stream) based on the options passed to the constructor.

I'm inclined to implement the v2 producer as API in a similar manner as the Java client. I'd like to have a very simple producer interface, and return the appropriate implementation based on the options passed to a create function in Environment.

Producer API is arguably broad

The current API has getters, sends, close, notification registration and a sort of flush. The current implementation exposes too much, IMO. In a smart client, the library should take care of all the low level logic and boilerplate code (e.g. reconnection), so that the user simply sends messages.

Given the language differences between Go a Java, most notably, the method overload, we can't have as a small API as the Java client:

public interface Producer extends AutoCloseable {
  MessageBuilder messageBuilder();
  long getLastPublishingId();
  void send(Message message, ConfirmationHandler confirmationHandler);
  void close();
}

We would have two different send functions. One for hand-holding, smart send, that will accumulate messages and send them in batch; and another one to batch send, like "pro-mode", that gives the flexibility to chose how many messages to send per batch. One notable difference between those two send functions is that batch sending will send immediately, whilst the smart send will accumulate until a certain threshold or a deadline, whichever happens first.

Proposed solution

The API could be something as simple as:

type Message interface {
    // ...
}

type Producer interface {
    Send(msg amqpMessage) error
    SendBatch(messages []amqpMessage) error
    SendWithId(publishingId uint64, msg amqpMessage) error
    GetLastPublishedId() uint64
        Close() error
}

Where Message is an AMQP 1.0 concrete type. We will provide the AMQP 1.0 implementation. Where Producer will be a smart producer, prepared to publish to a stream, or a super stream, or use any producer feature e.g. deduplication, sub-batching.

Drawbacks

This new API will break all existing users. It won't be trivial to migrate from v1 to v2, to the point that v2 may only be used for new projects/apps.

The user loses some flexibility, but I think that's acceptable as long as the new implementation provides enough benefits, like automatic connection recovery, logging and potentially metrics.

Zerpet commented 1 year ago

Gabriele and myself had a long conversation about the Producer API, what we like from Java + .NET, what we don't like, and what the new API should look like. I'll use this comment to keep a record of the potential API design outcome.

Pros šŸ‘ and Cons šŸ‘Ž of Java + .NET

šŸ‘ for Java:

šŸ‘Ž for Java:

šŸ‘ for .NET

šŸ‘Ž for .NET

Design proposal in next comment.

Zerpet commented 1 year ago

Potential API design for Producer

Some pseudo code:

// Environment is the entry point. It connects to rabbitmq and does topology
// operations
type Environment struct {
    // func CreateStream()
    // func DeleteStream()
}

// ProducerOpts will capture only optional parameters
type ProducerOpts struct {}

func (e Environment) CreateProducer(streamName string, opts ProducerOpts) Producer  {
        // Producer options only has optional parameters 
    // Returns the standard producer implementation
}

func (e Environment) CreateDeduplicationProducer(streamName, producerReference string, opts ProducerOpts) Producer {
        // Producer options only has optional parameters 
    // Returns the deduplicationProducer implementation
}

func (e Environment) CreateSubBatchingProducer(streamName string, subBatchSize int, opts ProducerOpts) Producer {
        // Producer options only has optional parameters 
    // Returns the subBatchingProducer implementation
}

type amqpMessage struct {
    // amqp 1.0 fields
}

type Producer interface {
    Send(msg amqpMessage) error
    SendBatch(messages []amqpMessage) error
    SendWithId(publishingId uint64, msg amqpMessage) error
    GetLastPublishedId() uint64
}

///////////////////////////////////////////////////

// Implementation for producer
type standardProducer struct {}

// Send batches messages and sends them after a pre-defined time, or when N
// messages are accumulated. The max amount of messages accumulated will be
// configurable. Internally keeps track of publishingID and auto-increments it
func (s standardProducer) Send(m amqpMessage) error {}
// SendWithId always returns an error because publishing ID is tracked internally
func (s standardProducer) SendWithId(publishingId uint64, msg amqpMessage) error {}
// SendBatch immediately sends the batch of messages. Assigns publishing IDs from the
// internally tracked publishing ID
func (s standardProducer) SendBatch(batch []amqpMessage) error {}
// GetLastPublishedId returns the last confirmed ID. Likely just informational in
// this implementation
func (s standardProducer) GetLastPublishedId() uint64 {}

///////////////////////////////////////////////////

// Implementation for producer using the message deduplication feature
type deduplicationProducer struct {}

// Send will always return an error because publishing ID is mandatory
func (p deduplicationProducer) Send(m amqpMessage) error {}
// SendBatch will always return an error because publishing ID is mandatory
func (p deduplicationProducer) SendBatch(msg []amqpMessage) error {}
// SendWithId accumulates messages with given ID, and publishes them after a
// pre-defined time, or when there are N messages accumulated. The max amount of
// messages accumulated will be configurable
func (p deduplicationProducer) SendWithId(publishingId uint64, m amqpMessage) error {}
// GetLastPublishedId Returns the last confirmed published ID. This will be
// important for applications to determine where they left off, in case of a
// disconnection
func (p deduplicationProducer) GetLastPublishedId() uint64 {}

///////////////////////////////////////////////////

// Implementation for producer using the sub-batching feature
type subBatchingProducer struct {}

// Send batches messages and sends them after a pre-defined time, or when N
// messages are accumulated. The max amount of messages accumulated will be
// configurable. Internally, it will track  a publishing ID, and it will send
// the message sub-batch with the current publishing. Compression may be used
// if it was an option in the CreateSubBatchProducer function
func (p subBatchingProducer) Send(m amqpMessage) error {}
// SendBatch immediately sends a sub-batch of messages with the current
// publishing ID
func (p subBatchingProducer) SendBatch(msg []amqpMessage) error {}
// SendWithId will always return an error because the publishing ID is tracked
// internally
func (p subBatchingProducer) SendWithId(publishingId uint64, m amqpMessage) error {}
// GetLastPublishedId returns the last confirmed published ID. This will be
// important for the internal reconnection/init process, to pick up where it left
// off, in the event of a reconnection or application restart
func (p subBatchingProducer) GetLastPublishedId() uint64 {}

Edit: updated API after follow-up conversation with Gabriele

Zerpet commented 1 year ago

Reserved

Zerpet commented 4 months ago

Unassigning since the v2 work stream is paused.