spring-cloud / stream-applications

Functions and Spring Cloud Stream Applications for data driven microservices
https://spring.io/projects/spring-cloud-stream-applications
254 stars 104 forks source link

Elasticsearch sink INDEX_NAME header discarded #440

Closed DetlevVanLooyCorilus closed 1 year ago

DetlevVanLooyCorilus commented 1 year ago

The documentation for the index option of the elasticsearch sink says the following:

Name of the index. If set, the INDEX_NAME header value overrides this property on a per message basis.

However when consuming a message with the INDEX_NAME header set, we receive an exception Caused by: org.elasticsearch.action.ActionRequestValidationException: Validation Failed: 1: index is missing;

The ElasticsearchConsumerConfiguration.buildIndexRequest correctly tries to retrieve the index from the headers

        String index = consumerProperties.getIndex();
        if (message.getHeaders().containsKey(INDEX_NAME_HEADER)) {
            index = (String) message.getHeaders().get(INDEX_NAME_HEADER);
        }
        request.index(index);

However when we get to this point the header is no longer present on the message. Connecting via remote debugger I can see that the message with the INDEX_NAME header is correctly received. The headers seem to be discarded in SimpleFunctionRegistry.convertInputMessageIfNecessary method, because the "inputType" of the elasticsearchConsumer is java.lang.Object it simply returns the payload of the message.

I am using docker:springcloudstream/elasticsearch-sink-kafka version 3.2.1

Can also be easily reproduced by adding the following test to ElasticsearchSinkTests:

    @Test
    public void testElasticSearchSinkIndexHeader() {
        this.contextRunner
                .withPropertyValues("spring.cloud.function.definition=elasticsearchConsumer", "elasticsearch.consumer.id=1",
                        "spring.elasticsearch.rest.uris=http://" + elasticsearch.getHttpHostAddress())
                .run(context -> {

                    final InputDestination inputDestination = context.getBean(InputDestination.class);
                    final String jsonObject = "{\"age\":10,\"dateOfBirth\":1471466076564,"
                            + "\"fullName\":\"John Doe\"}";

                    inputDestination.send(new GenericMessage<>(jsonObject, Map.of("INDEX_NAME", "foo")));

                    final ElasticsearchClient elasticsearchClient = context.getBean(ElasticsearchClient.class);
                    final GetRequest getRequest = new GetRequest.Builder().index("foo").id("1").build();
                    final GetResponse<JsonData> response = elasticsearchClient.get(getRequest, JsonData.class);
                    assertThat(response.found()).isTrue();
                    assertThat(response.source()).isNotNull();
                    assertThat(response.source().toJson()).isEqualTo(JsonData.fromJson(jsonObject).toJson());
                });
    }

Spring cloud dataflow example I use for testing http | header-enricher --headers="INDEX_NAME='testing'" | elasticsearch

Full stacktrace

org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.cloud.stream.function.FunctionConfiguration$FunctionToDestinationBinder$1@5fe32154]; nested exception is org.elasticsearch.action.ActionRequestValidationException: Validation Failed: 1: index is missing;, failedMessage=GenericMessage [payload=byte[15], headers={content-length=15, http_requestMethod=POST, deliveryAttempt=3, kafka_timestampType=CREATE_TIME, INDEX_NAME=testing, kafka_receivedTopic=testing-debug-els.header-enricher, target-protocol=kafka, accept=/, b3=75bd5577026ad0e2-d0dda3345041b3f6-0, nativeHeaders={}, kafka_offset=0, scst_nativeHeadersPresent=true, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@2dcc14, host=localhost:8183, http_requestUrl=http://localhost:8183/, connection=keep-alive, id=35196aa4-3515-f1d3-d3c0-06ee7e8d1a70, kafka_receivedPartitionId=0, contentType=application/json, kafka_receivedTimestamp=1678957707093, accept-encoding=gzip, deflate, br, kafka_groupId=testing-debug-els, user-agent=PostmanRuntime/7.29.2, timestamp=1678957715244}] at org.springframework.integration.support.utils.IntegrationUtils.wrapInHandlingExceptionIfNecessary(IntegrationUtils.java:191) at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:65) at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115) at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133) at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106) at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72) at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:317) at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:272) at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187) at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166) at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47) at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109) at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:216) at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.sendMessageIfAny(KafkaMessageDrivenChannelAdapter.java:397) at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.access$300(KafkaMessageDrivenChannelAdapter.java:83) at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:454) at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:428) at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.lambda$onMessage$0(RetryingMessageListenerAdapter.java:125) at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:329) at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:255) at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.onMessage(RetryingMessageListenerAdapter.java:119) at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.onMessage(RetryingMessageListenerAdapter.java:42) at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2645) at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:2625) at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:2552) at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:2433) at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:2311) at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:1982) at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeIfHaveRecords(KafkaMessageListenerContainer.java:1366) at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1357) at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1252) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) at java.base/java.util.concurrent.FutureTask.run(Unknown Source) at java.base/java.lang.Thread.run(Unknown Source) Caused by: org.elasticsearch.action.ActionRequestValidationException: Validation Failed: 1: index is missing; at org.elasticsearch.action.ValidateActions.addValidationError(ValidateActions.java:15) at org.elasticsearch.action.support.replication.ReplicationRequest.validate(ReplicationRequest.java:187) at org.elasticsearch.action.index.IndexRequest.validate(IndexRequest.java:213) at org.elasticsearch.client.RestHighLevelClient.performRequest(RestHighLevelClient.java:1698) at org.elasticsearch.client.RestHighLevelClient.performRequestAndParseEntity(RestHighLevelClient.java:1672) at org.elasticsearch.client.RestHighLevelClient.index(RestHighLevelClient.java:1029) at org.springframework.cloud.fn.consumer.elasticsearch.ElasticsearchConsumerConfiguration.index(ElasticsearchConsumerConfiguration.java:234) at org.springframework.cloud.fn.consumer.elasticsearch.ElasticsearchConsumerConfiguration.lambda$indexingHandler$4(ElasticsearchConsumerConfiguration.java:150) at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115) at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133) at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106) at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72) at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:317) at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:272) at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187) at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166) at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47) at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109) at org.springframework.messaging.core.AbstractMessageSendingTemplate.convertAndSend(AbstractMessageSendingTemplate.java:151) at org.springframework.messaging.core.AbstractMessageSendingTemplate.convertAndSend(AbstractMessageSendingTemplate.java:143) at org.springframework.integration.gateway.MessagingGatewaySupport.send(MessagingGatewaySupport.java:429) at org.springframework.integration.gateway.GatewayProxyFactoryBean.sendOrSendAndReceive(GatewayProxyFactoryBean.java:656) at org.springframework.integration.gateway.GatewayProxyFactoryBean.invokeGatewayMethod(GatewayProxyFactoryBean.java:588) at org.springframework.integration.gateway.GatewayProxyFactoryBean.doInvoke(GatewayProxyFactoryBean.java:555) at org.springframework.integration.gateway.GatewayProxyFactoryBean.invoke(GatewayProxyFactoryBean.java:544) at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186) at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:215) at com.sun.proxy.$Proxy149.accept(Unknown Source) at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.invokeConsumer(SimpleFunctionRegistry.java:987) at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.doApply(SimpleFunctionRegistry.java:716) at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.lambda$andThen$0(SimpleFunctionRegistry.java:634) at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.doApply(SimpleFunctionRegistry.java:710) at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.apply(SimpleFunctionRegistry.java:562) at org.springframework.cloud.stream.function.PartitionAwareFunctionWrapper.apply(PartitionAwareFunctionWrapper.java:84) at org.springframework.cloud.stream.function.FunctionConfiguration$FunctionWrapper.apply(FunctionConfiguration.java:790) at org.springframework.cloud.stream.function.FunctionConfiguration$FunctionToDestinationBinder$1.handleMessageInternal(FunctionConfiguration.java:622) at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:56) ... 32 more

onobc commented 1 year ago

We need to expose a MessageConsumer using Message<?> such as in https://github.com/spring-cloud/stream-applications/blob/main/functions/consumer/ftp-consumer/src/main/java/org/springframework/cloud/fn/consumer/ftp/FtpConsumerConfiguration.java .