spring-cloud / spring-cloud-stream-binder-rabbit

Apache License 2.0
156 stars 133 forks source link

How to use a SPEL expression that is applied on a POJO for the routingKey #83

Closed SaiprasadKrishnamurthy closed 6 years ago

SaiprasadKrishnamurthy commented 7 years ago

My Source APP:

@InboundChannelAdapter(value = RequestSource.CHANNEL_NAME)
    public Event eventMessageSource() throws Exception {
        final Random r = new Random();
        Event event = generateSampleEvent(EventType.values()[r.nextInt(EventType.values().length - 1)]);
        System.out.println("---- Putting now ==> " + event);
        return event;
    }

My application.properties:

logging.level.=INFO
server.port=0
logging.file=events.log
server.port=0
spring.cloud.stream.bindings.requestChannel.destination=events-exchange
spring.cloud.stream.bindings.requestChannel.content-type=application/json
spring.cloud.stream.bindings.requestChannel.binder=rabbit
spring.cloud.stream.bindings.requestChannel.group=eventconsumersgroup
spring.cloud.stream.rabbit.bindings.requestChannel.producer.routing-key-expression=payload.eventType

Exception is this:

Caused by: org.springframework.expression.spel.SpelEvaluationException: EL1008E: Property or field 'eventType' cannot be found on object of type 'byte[]' - maybe not public?
    at org.springframework.expression.spel.ast.PropertyOrFieldReference.readProperty(PropertyOrFieldReference.java:224)
    at org.springframework.expression.spel.ast.PropertyOrFieldReference.getValueInternal(PropertyOrFieldReference.java:94)
    at org.springframework.expression.spel.ast.PropertyOrFieldReference.access$000(PropertyOrFieldReference.java:46)

Should I not use application/json for using a SPEL? Am I missing the right set of properties here?

SaiprasadKrishnamurthy commented 7 years ago

Oops! Posted this question here in SO as it's more appropriate: https://stackoverflow.com/questions/45216818/how-to-use-a-spel-expression-that-is-applied-on-a-pojo-for-the-routingkey

mbogoevici commented 7 years ago

@SaiprasadKrishnamurthy Do you need Rabbit routing or you can use the more generic partitioning support ? http://docs.spring.io/autorepo/docs/spring-cloud-stream-docs/Chelsea.SR2/reference/htmlsingle/#_partitioning

SaiprasadKrishnamurthy commented 7 years ago

In this example, I was more specifically trying the rabbit routing. [ADDED] - I'd prefer to use rabbit routing for my use case as it's transparent both in (SCDF and non-SCDF modes) as opposed to generic partitioning where I'd have to set the instance count and instance index (if I don't use SCDF, which is mostly the case for quick protoyping and testing). Also, I don't have any usecase where I'd need the similar requests going to the same instance (perhaps for some stateful operations).

garyrussell commented 7 years ago

spring.cloud.stream.rabbit.bindings.requestChannel.producer.routing-key-expression=payload.eventType

The problem is that the payload has already been serialized by the time the message reaches the channel adapter (where the routing key expression is evaluated).

One solution would be to set a message header to the eventType field, before sending the message, and then use:

spring.cloud.stream.rabbit.bindings.requestChannel.producer.routing-key-expression=headers.eventType.

garyrussell commented 7 years ago

@sabbyanandan @viniciusccarvalho @ilayaperumalg In 2.0, if we restructure the binders (as suggested by @viniciusccarvalho ) and we provide the binder access to the MessageChannel (see the comment in https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/pull/184), we could add an interceptor to the channel to evaluate the routing key expression prior to serialization.

I therefore suggest we move this to a 2.0 milestone.

eutiagocosta commented 3 years ago

I am having the same problem in the new version of SCS 3.1.0, @garyrussell in previous versions, using payload.eventType worked.

@Service
public class DomainEventPublisher {

    @Autowired
    private StreamBridge streamBridge;

    public void publisher(DomainEvent aDomainEvent) {

        final Message<DomainEvent> message = MessageBuilder
            .withPayload(aDomainEvent)
            .build();

        streamBridge.send("publisher-out-0", message);
    }

}

@NoArgsConstructor(access = AccessLevel.PRIVATE, force = true)
@AllArgsConstructor
@Value
@EqualsAndHashCode
public class DomainEvent {

    private String storeId;
    private String serialNumber;
    private OffsetDateTime occurredOn;
    public String eventType = "teste.authorized";

}

My application.yml:

spring:
  cloud:
    function:
      definition: newQrCodeGenerated
    stream:
      source: publisher
      bindings:
        publisher-out-0:
          contentType: application/json
          destination: beblue-service-bus
        newQrCodeGenerated-in-0:
          contentType: application/json
          destination: beblue-service-bus
          group: terminal-service-terminal-created
      rabbit:
        bindings:
          newQrCodeGenerated-in-0:
            consumer:
              auto-bind-dlq: true
              republish-to-dlq: true
              transacted: true
              #queue-name-group-only: true
              binding-routing-key: terminal-service.terminal.created
          publisher-out-0:
            producer:
              exchange-type: topic
              routing-key-expression: payload.eventType 
              transacted: true
Exception:
 Caused by: org.springframework.expression.spel.SpelEvaluationException: EL1008E: Property or field 'eventType' cannot be found on object of type 'byte[]' - maybe not public?
garyrussell commented 3 years ago

This is a bug https://github.com/spring-cloud/spring-cloud-stream/issues/2145