spring-cloud / spring-cloud-stream-binder-aws-kinesis

Spring Cloud Stream binder for AWS Kinesis
Apache License 2.0
99 stars 97 forks source link

PartitionKeyExpression can't find paylod properties #193

Closed rsercano closed 1 year ago

rsercano commented 1 year ago

Hi,

I'm using below configuration

spring:
  cloud:
    stream:
      kinesis:
        binder:
          autoCreateStream: false
          kplKclEnabled: true
          autoAddShards: false
      bindings:
        producer-out-0:
          destination: ${KINESIS_DESTINATION:producer}
          content-type: application/json
          producer:
            partitionKeyExpression: payload.id
            headerMode: none

With below producer bean

@Component
@Slf4j
public class KinesisProducer {

    private final BlockingQueue<MyBean> event = new LinkedBlockingQueue<>();

    @Bean
    public Supplier<MyBean> gpsProducer() {
        return this.event::poll;
    }

    public void sendGps(MyBean event) {
        if (this.event.offer(event)) {
            log.info("Event sent: " + event);
        } else {
            log.error("Couldn't manage to send to Kinesis");
        }
    }
}

And it throws below

Caused by: org.springframework.expression.spel.SpelEvaluationException: EL1008E: Property or field 'id' cannot be found on object of type 'byte[]' - maybe not public or not valid?

Actually my bean has an id property, but can this be a timing issue in the binder itself?

rsercano commented 1 year ago

I'm currently building aws_partitionKey header by myself before putting it into blockingqueue.

        Message<UserDeviceDto> message = MessageBuilder.withPayload(event)
                .setHeader("aws_partitionKey", event.getId().toString())
                .build();
artembilan commented 1 year ago

I understand your struggle, but that is not possible against payload since it is serialized as a JSON into byte[] before it reaches that partitionKeyExpression. So, the recommended way is indeed to transfer such an info via header.

At least that is how it works in the old version of Spring Cloud Stream.

Try with recently released Kinesis Binder 4.0.0.

Closing as Works as Designed

rsercano commented 1 year ago

I see, thanks. To be honest it still feels weird this way since there's a configuration namely partitionKeyExpression but it doesn't do what the name suggests, will try new version asap

Thanks anyway