spring-projects / spring-data-elasticsearch

Provide support to increase developer productivity in Java when using Elasticsearch. Uses familiar Spring concepts such as a template classes for core API usage and lightweight repository style data access.
https://spring.io/projects/spring-data-elasticsearch/
Apache License 2.0
2.9k stars 1.33k forks source link

Spring data elasticsearch cannot create index of type Data Stream #2590

Open patpatpat123 opened 1 year ago

patpatpat123 commented 1 year ago

Hello team,

I wanted to reach out with an issue observed while trying to use the method save() and/or saveAll() from spring data elasticsearch, when the index name is on purpose starting with logs-

What I would like to achieve please:

Using the methods save() and saveAll() from spring data elasticsearch, when on purpose, using this

@Document(indexName = "logs-onpurpose-foo")
public record Foo(@JsonProperty(value = "@timestamp") String timestamp, ...

I would expect the documents of type Foo to be saved/created as Data Stream (not just Index) in elasticsearch

Actual:

I am encountering this issue:

defined in @EnableReactiveElasticsearchRepositories declared on MyElasticConfiguration: Failed to instantiate [org.springframework.data.elasticsearch.repository.support.SimpleReactiveElasticsearchRepository]: Constructor threw exception
    at org.springframework.beans.factory.support.ConstructorResolver.createArgumentArray(ConstructorResolver.java:800)
    at org.springframework.beans.factory.support.ConstructorResolver.autowireConstructor(ConstructorResolver.java:245)
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.autowireConstructor(AbstractAutowireCapableBeanFactory.java:1352)
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBeanInstance(AbstractAutowireCapableBeanFactory.java:1189)
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:560)
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:520)
    at org.springframework.beans.factory.support.AbstractBeanFactory.lambda$doGetBean$0(AbstractBeanFactory.java:326)
    at org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.getSingleton(DefaultSingletonBeanRegistry.java:234)
    at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:324)
    at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:200)
    at org.springframework.beans.factory.support.DefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java:973)
    at org.springframework.context.support.AbstractApplicationContext.finishBeanFactoryInitialization(AbstractApplicationContext.java:941)
    at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:608)
    at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:733)
    at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:435)
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:311)
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1305)
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1294)
    at com.MyApplication.main(MyApplication.java:18)
Caused by: org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'myElasticRepository' defined in com.repository.elastic.MyElasticRepository defined in @EnableReactiveElasticsearchRepositories declared on MyElasticConfiguration: Failed to instantiate [org.springframework.data.elasticsearch.repository.support.SimpleReactiveElasticsearchRepository]: Constructor threw exception
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1770)
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:598)
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:520)
    at org.springframework.beans.factory.support.AbstractBeanFactory.lambda$doGetBean$0(AbstractBeanFactory.java:326)
    at org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.getSingleton(DefaultSingletonBeanRegistry.java:234)
    at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:324)
    at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:200)
    at org.springframework.beans.factory.config.DependencyDescriptor.resolveCandidate(DependencyDescriptor.java:254)
    at org.springframework.beans.factory.support.DefaultListableBeanFactory.doResolveDependency(DefaultListableBeanFactory.java:1417)
    at org.springframework.beans.factory.support.DefaultListableBeanFactory.resolveDependency(DefaultListableBeanFactory.java:1337)
    at org.springframework.beans.factory.support.ConstructorResolver.resolveAutowiredArgument(ConstructorResolver.java:888)
    at org.springframework.beans.factory.support.ConstructorResolver.createArgumentArray(ConstructorResolver.java:791)
    ... 18 common frames omitted
Caused by: org.springframework.beans.BeanInstantiationException: Failed to instantiate [org.springframework.data.elasticsearch.repository.support.SimpleReactiveElasticsearchRepository]: Constructor threw exception
    at org.springframework.beans.BeanUtils.instantiateClass(BeanUtils.java:224)
    at org.springframework.data.repository.core.support.RepositoryFactorySupport.lambda$instantiateClass$5(RepositoryFactorySupport.java:571)
    at java.base/java.util.Optional.map(Optional.java:260)
    at org.springframework.data.repository.core.support.RepositoryFactorySupport.instantiateClass(RepositoryFactorySupport.java:571)
    at org.springframework.data.repository.core.support.RepositoryFactorySupport.getTargetRepositoryViaReflection(RepositoryFactorySupport.java:536)
    at org.springframework.data.elasticsearch.repository.support.ReactiveElasticsearchRepositoryFactory.getTargetRepository(ReactiveElasticsearchRepositoryFactory.java:94)
    at org.springframework.data.repository.core.support.RepositoryFactorySupport.getRepository(RepositoryFactorySupport.java:317)
    at org.springframework.data.repository.core.support.RepositoryFactoryBeanSupport.lambda$afterPropertiesSet$5(RepositoryFactoryBeanSupport.java:279)
    at org.springframework.data.util.Lazy.getNullable(Lazy.java:245)
    at org.springframework.data.util.Lazy.get(Lazy.java:114)
    at org.springframework.data.repository.core.support.RepositoryFactoryBeanSupport.afterPropertiesSet(RepositoryFactoryBeanSupport.java:285)
    at org.springframework.data.elasticsearch.repository.support.ReactiveElasticsearchRepositoryFactoryBean.afterPropertiesSet(ReactiveElasticsearchRepositoryFactoryBean.java:102)
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.invokeInitMethods(AbstractAutowireCapableBeanFactory.java:1816)
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1766)
    ... 29 common frames omitted
Caused by: org.springframework.data.elasticsearch.UncategorizedElasticsearchException: [es/indices.create] failed: [illegal_argument_exception] cannot create index with name [logs-foo-one], because it matches with template [logs] that creates data streams only, use create data stream api instead
    at org.springframework.data.elasticsearch.client.elc.ElasticsearchExceptionTranslator.translateExceptionIfPossible(ElasticsearchExceptionTranslator.java:102)
    at org.springframework.data.elasticsearch.client.elc.ElasticsearchExceptionTranslator.translateException(ElasticsearchExceptionTranslator.java:63)
    at reactor.core.publisher.Flux.lambda$onErrorMap$27(Flux.java:7099)
    at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:94)
    at reactor.core.publisher.MonoCompletionStage$MonoCompletionStageSubscription.apply(MonoCompletionStage.java:122)
    at reactor.core.publisher.MonoCompletionStage$MonoCompletionStageSubscription.apply(MonoCompletionStage.java:71)
    at java.base/java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:934)
    at java.base/java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:911)
    at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510)
    at java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2162)
    at co.elastic.clients.transport.rest_client.RestClientTransport$1.onSuccess(RestClientTransport.java:185)
    at org.elasticsearch.client.RestClient$FailureTrackingResponseListener.onSuccess(RestClient.java:676)
    at org.elasticsearch.client.RestClient$1.completed(RestClient.java:399)
    at org.elasticsearch.client.RestClient$1.completed(RestClient.java:393)
    at org.apache.http.concurrent.BasicFuture.completed(BasicFuture.java:122)
    at org.apache.http.impl.nio.client.DefaultClientExchangeHandlerImpl.responseCompleted(DefaultClientExchangeHandlerImpl.java:182)
    at org.apache.http.nio.protocol.HttpAsyncRequestExecutor.processResponse(HttpAsyncRequestExecutor.java:448)
    at org.apache.http.nio.protocol.HttpAsyncRequestExecutor.inputReady(HttpAsyncRequestExecutor.java:338)
    at org.apache.http.impl.nio.DefaultNHttpClientConnection.consumeInput(DefaultNHttpClientConnection.java:265)
    at org.apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:87)
    at org.apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:40)
    at org.apache.http.impl.nio.reactor.AbstractIODispatch.inputReady(AbstractIODispatch.java:114)
    at org.apache.http.impl.nio.reactor.BaseIOReactor.readable(BaseIOReactor.java:162)
    at org.apache.http.impl.nio.reactor.AbstractIOReactor.processEvent(AbstractIOReactor.java:337)
    at org.apache.http.impl.nio.reactor.AbstractIOReactor.processEvents(AbstractIOReactor.java:315)
    at org.apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:276)
    at org.apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:104)
    at org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:591)
    at java.base/java.lang.Thread.run(Thread.java:833)
    Suppressed: java.lang.Exception: #block terminated with an error
        at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:99)
        at reactor.core.publisher.Mono.block(Mono.java:1710)
        at org.springframework.data.elasticsearch.repository.support.SimpleReactiveElasticsearchRepository.createIndexAndMappingIfNeeded(SimpleReactiveElasticsearchRepository.java:68)
        at org.springframework.data.elasticsearch.repository.support.SimpleReactiveElasticsearchRepository.<init>(SimpleReactiveElasticsearchRepository.java:60)
        at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
        at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:77)
        at java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
        at java.base/java.lang.reflect.Constructor.newInstanceWithCaller(Constructor.java:499)
        at java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:480)
        at org.springframework.beans.BeanUtils.instantiateClass(BeanUtils.java:211)
        at org.springframework.data.repository.core.support.RepositoryFactorySupport.lambda$instantiateClass$5(RepositoryFactorySupport.java:571)
        at java.base/java.util.Optional.map(Optional.java:260)
        at org.springframework.data.repository.core.support.RepositoryFactorySupport.instantiateClass(RepositoryFactorySupport.java:571)
        at org.springframework.data.repository.core.support.RepositoryFactorySupport.getTargetRepositoryViaReflection(RepositoryFactorySupport.java:536)
        at org.springframework.data.elasticsearch.repository.support.ReactiveElasticsearchRepositoryFactory.getTargetRepository(ReactiveElasticsearchRepositoryFactory.java:94)
        at org.springframework.data.repository.core.support.RepositoryFactorySupport.getRepository(RepositoryFactorySupport.java:317)
        at org.springframework.data.repository.core.support.RepositoryFactoryBeanSupport.lambda$afterPropertiesSet$5(RepositoryFactoryBeanSupport.java:279)
        at org.springframework.data.util.Lazy.getNullable(Lazy.java:245)
        at org.springframework.data.util.Lazy.get(Lazy.java:114)
        at org.springframework.data.repository.core.support.RepositoryFactoryBeanSupport.afterPropertiesSet(RepositoryFactoryBeanSupport.java:285)
        at org.springframework.data.elasticsearch.repository.support.ReactiveElasticsearchRepositoryFactoryBean.afterPropertiesSet(ReactiveElasticsearchRepositoryFactoryBean.java:102)
        at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.invokeInitMethods(AbstractAutowireCapableBeanFactory.java:1816)
        at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1766)
        at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:598)
        at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:520)
        at org.springframework.beans.factory.support.AbstractBeanFactory.lambda$doGetBean$0(AbstractBeanFactory.java:326)
        at org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.getSingleton(DefaultSingletonBeanRegistry.java:234)
        at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:324)
        at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:200)
        at org.springframework.beans.factory.config.DependencyDescriptor.resolveCandidate(DependencyDescriptor.java:254)
        at org.springframework.beans.factory.support.DefaultListableBeanFactory.doResolveDependency(DefaultListableBeanFactory.java:1417)
        at org.springframework.beans.factory.support.DefaultListableBeanFactory.resolveDependency(DefaultListableBeanFactory.java:1337)
        at org.springframework.beans.factory.support.ConstructorResolver.resolveAutowiredArgument(ConstructorResolver.java:888)
        at org.springframework.beans.factory.support.ConstructorResolver.createArgumentArray(ConstructorResolver.java:791)
        at org.springframework.beans.factory.support.ConstructorResolver.autowireConstructor(ConstructorResolver.java:245)
        at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.autowireConstructor(AbstractAutowireCapableBeanFactory.java:1352)
        at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBeanInstance(AbstractAutowireCapableBeanFactory.java:1189)
        at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:560)
        at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:520)
        at org.springframework.beans.factory.support.AbstractBeanFactory.lambda$doGetBean$0(AbstractBeanFactory.java:326)
        at org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.getSingleton(DefaultSingletonBeanRegistry.java:234)
        at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:324)
        at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:200)
        at org.springframework.beans.factory.support.DefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java:973)
        at org.springframework.context.support.AbstractApplicationContext.finishBeanFactoryInitialization(AbstractApplicationContext.java:941)
        at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:608)
        at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:733)
        at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:435)
        at org.springframework.boot.SpringApplication.run(SpringApplication.java:311)
        at org.springframework.boot.SpringApplication.run(SpringApplication.java:1305)
        at org.springframework.boot.SpringApplication.run(SpringApplication.java:1294)
        at com.MyApplication.main(MyApplication.java:18)
Caused by: co.elastic.clients.elasticsearch._types.ElasticsearchException: [es/indices.create] failed: [illegal_argument_exception] cannot create index with name [logs-foo-one], because it matches with template [logs] that creates data streams only, use create data stream api instead
    at co.elastic.clients.transport.rest_client.RestClientTransport.getHighLevelResponse(RestClientTransport.java:334)
    at co.elastic.clients.transport.rest_client.RestClientTransport.access$200(RestClientTransport.java:70)
    at co.elastic.clients.transport.rest_client.RestClientTransport$1.onSuccess(RestClientTransport.java:181)
    ... 18 common frames omitted

Process finished with exit code 1

And as a result, the document does not end up as Data Stream.

Justification:

It can be interesting for spring data elastic search to save data as data streams. From official elasticsearch documentation, data streams are well suited for metrics, and logs type of data. But also, for some specific time series data.

If nothing from above, having a custom LogsPojo object to be saved as a data stream should be beneficial.

Issue:

Would it be possible to help on this issue, to allow saving to the data stream (not index) logs-foo?

Thank you

sothawo commented 1 year ago

Spring Data Elasticsearch does not support the creation of data streams. And using a name like <prefix>-* to define that a data stream should be created is not feasible, as these names are valid for normal Elasticsearch indices as well.

To add support for data streams, we'd need additional parameters in the @Document annotation and in addition to that, the bulk/save methods would probably need to be extended to handle the data stream case.

patpatpat123 commented 1 year ago

Thank you @sothawo for the clear explanation.

Agreed, some sort of @Document(indexName = "myindex", datastream = true) would be really nice. I hope this enhancement request will see the light one day. Good day

patpatpat123 commented 1 year ago

Update two months after:

It is confirmed Spring data elasticsearch cannot create index of type data stream.

I did another test. Even if the data stream is created beforehand, meaning, the data stream is created prior to any spring data elasticsearch interaction, via the Index Management WebUI, or via curl, etc , spring data elasticsearch still cannot write into a data stream created (even if the data stream is already there)

reactor.core.publisher.Operators : Operator called default onErrorDropped
reactor.core.Exceptions$ErrorCallbackNotImplemented: org.springframework.data.elasticsearch.BulkFailureException: Bulk operation has failures. Use ElasticsearchException.getFailedDocuments() for detailed messages [{null=only write ops with an op_type of create are allowed in data streams}]
Caused by: org.springframework.data.elasticsearch.BulkFailureException: Bulk operation has failures. Use ElasticsearchException.getFailedDocuments() for detailed messages [{null=only write ops with an op_type of create are allowed in data streams}]
    at org.springframework.data.elasticsearch.client.elc.ReactiveElasticsearchTemplate.checkForBulkOperationFailure(ReactiveElasticsearchTemplate.java:261)
    at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:132)
    at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:79)
    at reactor.core.publisher.MonoCompletionStage$MonoCompletionStageSubscription.apply(MonoCompletionStage.java:125)
    at reactor.core.publisher.MonoCompletionStage$MonoCompletionStageSubscription.apply(MonoCompletionStage.java:71)
    at java.base/java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:934)
    at java.base/java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:911)
    at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510)
    at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2147)
    at co.elastic.clients.transport.rest_client.RestClientTransport$1.onSuccess(RestClientTransport.java:183)
    at org.elasticsearch.client.RestClient$FailureTrackingResponseListener.onSuccess(RestClient.java:676)
    at org.elasticsearch.client.RestClient$1.completed(RestClient.java:399)
    at org.elasticsearch.client.RestClient$1.completed(RestClient.java:393)
    at org.apache.http.concurrent.BasicFuture.completed(BasicFuture.java:122)
    at org.apache.http.impl.nio.client.DefaultClientExchangeHandlerImpl.responseCompleted(DefaultClientExchangeHandlerImpl.java:182)
    at org.apache.http.nio.protocol.HttpAsyncRequestExecutor.processResponse(HttpAsyncRequestExecutor.java:448)
    at org.apache.http.nio.protocol.HttpAsyncRequestExecutor.inputReady(HttpAsyncRequestExecutor.java:338)
    at org.apache.http.impl.nio.DefaultNHttpClientConnection.consumeInput(DefaultNHttpClientConnection.java:265)
    at org.apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:87)
    at org.apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:40)
    at org.apache.http.impl.nio.reactor.AbstractIODispatch.inputReady(AbstractIODispatch.java:114)
    at org.apache.http.impl.nio.reactor.BaseIOReactor.readable(BaseIOReactor.java:162)
    at org.apache.http.impl.nio.reactor.AbstractIOReactor.processEvent(AbstractIOReactor.java:337)
    at org.apache.http.impl.nio.reactor.AbstractIOReactor.processEvents(AbstractIOReactor.java:315)
    at org.apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:276)
    at org.apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:104)

Just wanted to add this observation to this enhancement request.

Thank you