node-ts / bus

A typescript based enterprise service bus framework based on enterprise integration patterns
https://bus.node-ts.com/
MIT License
274 stars 25 forks source link

RabbitMQ transport doesn't allow custom message headers. #202

Open vsabirov opened 1 year ago

vsabirov commented 1 year ago

Hello, this is a very great library! However, there is one small issue regarding the RabbitMQ transport.

Take rabbitmq-delayed-message-exchange plugin for the RabbitMQ server as an example.

Given an up and running RabbitMQ server with the rabbitmq-delayed-message-exchange plugin installed, the task is to simply use it and send a delayed message. For example, notify an external web server with a request after 1 hour, and if it succeeds, send an event down the bus, or else send another notification command down the bus that would be delayed for another 1 hour in case the external server is down or whatever.

This is currently impossible with node-ts/bus, because:

  1. publishMessage builds its own headers from MessageAttributes and doesn't allow user-specified custom headers, so we can't use the x-delay header required for the delayed message.

  2. There is no possible way to create an exchange with a custom type, in our case x-delayed-type is needed. This is because all methods and fields to achieve this are privated out, so there is no intended way.

It would be much more helpful to be able to specify custom message headers and assert custom exchanges.

adenhertog commented 1 year ago

hi @vsabirov, I had a read through the rabbit implementation again. Generally one of the goals of this library is to abstract away transport-specific features so that they're largely interchangeable. With that said, the ability to specify custom headers is understandable and I wanted to confirm some behaviour with you.

If you were to do Bus.publish(yourEvent, { attributes: { 'x-delay': 100 } }) then you'd expect this to be added directly against the header.

Currently per rabbitmq-transport.ts:367 this is being serialized against a different key like attributes: JSON.stringify({ 'x-delay': 100 }) ie:

image

Would this work for you if all attributes were spread onto the headers? ie:

headers: {
  ...messageOptions.attributes
}
vsabirov commented 1 year ago

@adenhertog Perhaps the ability to specify transport-agnostic attributes in a special header should be kept as is. Sure, it will work fine, however i'm afraid that the user can overwrite some special RMQ headers by accident and will be forced to rename their attributes to avoid collisions.

Perhaps something like

const rawHeaders = messageOptions.attributes.rawHeaders

and

headers: {
  attributes: messageOptions.attributes ? JSON.stringify(messageOptions.attributes) : undefined,
  stickyAttributes: messageOptions.stickyAttributes ? JSON.stringify(messageOptions.stickyAttributes) : undefined

  ...rawHeaders
}

And to use that you'll just have to specify a rawHeaders object in the message options that will be additionally spread into the headers.

adenhertog commented 1 year ago

@vsabirov I'm happy with what you've proposed - the reasoning makes a lot of sense. I can't get around to this immediately, but I'm open to accepting PRs if you need this soon

talentumtuum commented 1 year ago

@adenhertog I propose to hide access to internal attributes from regular use with symbols. The user should understand that this is an internal API.

I see a solution to this problem as follows

@node-ts/bus-messages

export kInternalAttributes = Symbol('@node-ts/bus-messages/internal-attributes')

export type InternalAttributes<T> =  T

export interface MessageAttributes<
  AttributesType extends MessageAttributeMap = MessageAttributeMap,
  StickyAttributesType extends MessageAttributeMap = MessageAttributeMap,
  InternalAttributesType = InternalAttributes<unknown>
> 
    correlationId?: Uuid;
    attributes: AttributesType;
    stickyAttributes: StickyAttributesType;
    [kInternalAttributes]: InternalAttributesType
}

@node-ts/bus-rabbitmq

export const kRabbitMQHeaders = Symbol('@node-ts/bus-rabbitmq/rabbitmq-headers')

export interface RabbitMQInternalAttributes = {
  headers: MessageAttributeMap
}

private async publishMessage(
    message: Message,
    messageOptions: MessageAttributes<..., ..., RabbitMQInternalAttributes> = { attributes: {}, stickyAttributes: {}, internalAttributes: {} }
  ): Promise<void> {
    await this.assertExchange(message.$name)
    const payload = this.coreDependencies.messageSerializer.serialize(message)
    const internalAttributes = messageOptions[kInternalAttributes]
    this.channel.publish(message.$name, '', Buffer.from(payload), {
      correlationId: messageOptions.correlationId,
      messageId: uuid.v4(),
      headers: {
        attributes: messageOptions.attributes
          ? JSON.stringify(messageOptions.attributes)
          : undefined,
        stickyAttributes: messageOptions.stickyAttributes
          ? JSON.stringify(messageOptions.stickyAttributes)
          : undefined
        ...internalAttributes.headers
      }
    })
  }

Usage

import { kInternalAttributes } from '@node-ts/bus-messages'

bus.publish(new Message(), {
  [kInternalAttributes]: { // typed
    headers: {
      'x-delay': 5000
    }
  }
})

This is a draft solution, I'm just suggesting a concept