spring-cloud / spring-cloud-stream

Framework for building Event-Driven Microservices
http://cloud.spring.io/spring-cloud-stream
Apache License 2.0
987 stars 602 forks source link

Missing (custom) headers in KStream using custom SmartMessageConverter #2411

Closed PaulV1990 closed 2 years ago

PaulV1990 commented 2 years ago

Describe the issue

I produce a Message (org.springframework.messaging) in application X. I've implemented a custom MessageConverter that implements SmartMessageConverter. The message is sent to a topic with custom headers and with a CLI consumer the custom headers are available.

When using a Kstream in Java application Y, the custom headers are not available.

@Bean
public Consumer<KStream<String, User>> streamUser() {
    return user -> log.info("Received: " + user);
}

I use the configuration:

spring:
  cloud:
    stream:
      bindings:
        streamUser-in-0:
          destination: user.0
          consumer:
            use-native-decoding: false

With this, i only receive these three (default) headers in the log:

contentType=application/json
id=31f9bed4-5750-fcf8-f5bf-e135e262fc43
timestamp=1652795431177

I verified that the custom MessageConvertor is called, but fails because the custom headers are missing.

public Object fromMessage(@NonNull Message<?> message, @NonNull Class<?> targetClass{ ... }

When using a different consumer like:

@Bean
public Consumer<Message<User>> consumeUser() { ... }

I can see all the headers (custom and kafka ):

deliveryAttempt: 1
custom_header_name: Foo
custom_header_version: Bar
kafka_receivedTopic: user.0
kafka_timestampType: LOG_APPEND_TIME
kafka_offset: 2
kafka_consumer: org.apache.kafka.clients.consumer.KafkaConsumer@12178ee3
kafka_receivedPartitionId: 0
kafka_receivedTimestamp: 1652796124185
kafka_groupId: anonymous.cc23b653-b6d2-4e00-b8aa-a01f2e07bb34
scst_nativeHeadersPresent: true
id: 1200be22-bf11-adef-d14f-9262539d66cc
contentType: application/json
timestamp: 1652796124435

I have also tried setting the spring.cloud.stream.bindings.streamuser-in-0.consumer.header-mode but this didn't fix it.

To Reproduce Steps to reproduce the behavior:

The message conversion is used to encrypt the byte[] after serialisation and decrypt the byte[] before deserialisation. A simplified example of the SmartMessageConverter.

@Component
@RequiredArgsConstructor
public class CustomMessageConverter implements SmartMessageConverter {

private final ObjectMapper mapper;

// Decrypting
@Override
@SneakyThrows
public Object fromMessage(@NonNull Message<?> message, @NonNull Class<?> targetClass) {
    if (message.getHeaders().containsKey("custom_header_foo")) {
        byte[] bytes = decrypt(message.getPayload(), message.getHeaders());
        return mapper.readValue(bytes, targetClass);
    } else {
        throw new CustomException("Something failed");
    }
}

// Encrypting
@Override
@SneakyThrows
public Message<?> toMessage(@NonNull Object payload, MessageHeaders headers) {
    EncryptedPoJo ep = encrypt(payload);
    MessageHeaderAccessor accessor = new MessageHeaderAccessor();
    accessor.copyHeaders(headers);
    ep.getMetadata().forEach(accessor::setHeader);
    return MessageBuilder.createMessage(ep.getPayload(), accessor.getMessageHeaders()));
}

@Builder
@Getter
private class EncryptedPoJo {
    private byte[] payload;
    private HashMap<String, Object> metadata;
}

Sending the Message is done with a custom KafkaTemplate with the CustomMessageConverter set and adapted the method:

public ListenableFuture<SendResult<K, V>> send(Message<?> message) { ... }

When debugging, it seems that in the class:

class KStreamMapValues<KIn, VIn, VOut> implements ProcessorSupplier<KIn, VIn, KIn, VOut> {

  public void process(Record<KIn, VIn> record) {
      VOut newValue = KStreamMapValues.this.mapper.apply(record.key(), record.value());
      this.context().forward(record.withValue(newValue));
  }

}

The headers from the Record are unused when the GenericMessage (newValue) is created. Only record.key and record.value are processed.

Version of the framework org.springframework.cloud:spring-cloud-stream-binder-kafka-streams:3.2.3

Expected behavior When converting the Record to GenericMessage all (custom) headers are copied instead of ignored/lost.

Screenshots Debug information

Additional context Original question on StackOverflow, redirected to here by Soby Chacko

sobychacko commented 2 years ago

@PaulV1990 This was a bug. It is now addressed in main (4.0.0-SNAPSHOT) through the commit mentioned above. I will update here when the 3.2.5-SNAPSHOT is ready for you to try. Thanks!

sobychacko commented 2 years ago

@PaulV1990 Both 3.2.5-SNAPSHOT and 4.0.0-SNAPSHOT have the fix now.