smallrye / smallrye-reactive-messaging

SmallRye Reactive Messaging
http://www.smallrye.io/smallrye-reactive-messaging/
Apache License 2.0
241 stars 179 forks source link

How to publish messages to a topic in Solace broker using AMQP #2751

Closed elmodeer closed 2 months ago

elmodeer commented 2 months ago

We have a Quarkus app that uses the smallrye-amqp connector. I have the following code for publishing messages to a topic.

@ApplicationScoped
public class InsightLifecycleObserver {

  public static final String INSIGHTS_DELETED_TOPIC = "insightscentral/insight/deleted";

  @Inject
  @Channel(INSIGHTS_DELETED_TOPIC)
  @OnOverflow(value = Strategy.NONE)
  Emitter<byte[]> insightsEmitter;

void onInsightRemoved(@Observes(during = AFTER_SUCCESS) InsightDeletedEvent event) {

    final var tenantId = tenantResolver.getTenantId();
    try {
      var insightLifeCycleEvent = convertToLifeCycleEvent(event);
      var eventEnvelope =
          EventEnvelope.builder()
              .withId(UUID.randomUUID())
              .withData(
                  PojoCloudEventData.wrap(insightLifeCycleEvent, objectMapper::writeValueAsBytes))
              .build();

      // send the event to the Insights topic
      insightsEmitter
          .send(eventEnvelope.serialize())
          .whenComplete(
              (result, throwable) -> {

                if (throwable != null) {
                  LOGGER.error("Failed to emit InsightDeleted event", throwable);
                  metricsService.incrementErrorMessageCounter(
                      MessageType.INSIGHT_DELETED_EVENT, tenantId);
                } else {
                  LOGGER.debug(
                      "Published InsightDeleted event for insight with id: {} and sri: {}",
                      event.insight().id(),
                      event.insight().sri());
                  metricsService.incrementMessageCounter(
                      MessageType.INSIGHT_DELETED_EVENT, tenantId);
                }
              });
    } catch (Exception e) {
      LOGGER.error("Failed to publish InsightDeleted event", e);
      metricsService.incrementErrorMessageCounter(MessageType.INSIGHT_DELETED_EVENT, tenantId);
    }
  }

when calling this hook I get the following exception:

io.vertx.core.impl.NoStackTraceThrowable: message rejected (REJECTED): Error{condition=amqp:not-allowed, description='SMF AD ack response error', info={solace.response_code=400, solace.response_text=Queue Not Found}}

what am I doing wrong ?

cescoffier commented 2 months ago

Why not using the Solace connector? https://github.com/SolaceLabs/solace-quarkus

elmodeer commented 2 months ago

@cescoffier cause we are already using the amqp connector to connect to the solace broker, but it was only consuming.

Are you suggesting that cause it is easier with the connector, or because this usecase is not supported by the amqp connector in smallrye ?

ozangunalp commented 2 months ago

This is a question for the Solace AMQP 1.0 support. I can find this on the Solace Doc : https://docs.solace.com/API/AMQP/Msg-Pub.htm#Specifyi2

A client specifies that messages are to be published to a topic by formatting the destination as follows: topic://topic-string The topic:// prefix is recommended when publishing to a topic and isn't case sensitive. If a prefix isn't part of the destination, and the destination doesn’t start with #, the event broker will assume the message is intended for delivery to a queue, as long as no other setting has been used to specify the destination type.

elmodeer commented 2 months ago

thanks @ozangunalp. This solved it.