Aiven-Open / opensearch-connector-for-apache-kafka

Aiven's OpenSearch® Connector for Apache Kafka®
Apache License 2.0
64 stars 35 forks source link

Data Stream Support #85

Closed forrestrice closed 2 years ago

forrestrice commented 2 years ago

I had been using kafka-connect-opensearch to ingest kafka data to an OpenSearch data stream. I see from looking at the code that this support was removed in this project. Are there plans to support data streams in the future? I am open to submitting a PR but figured there might be a reason they were removed in the first place.

willyborankin commented 2 years ago

Hi @forrestrice, yes we are planning to add DataStreams support. They were removed since implementation for it fully different compared to ES connector.

LucasStinchcombe commented 2 years ago

+1 for Datastream support.

@willyborankin would it be possible to provide an outline of the changes that need to be made? In the meantime looks like Opensearch equivalent of Manage timeseries data without data streams is a solution.

willyborankin commented 2 years ago

So if you take a look on an example workflow here: https://opensearch.org/docs/latest/opensearch/data-streams/ it is not like in ES. What we can do as a first step:

and as a second step:

LucasStinchcombe commented 2 years ago

Sounds very good, the first step is the use case I had in mind.

Btw I was just trying to use the connector to write to an alias with an ISM rollover policy to mimick a datastream as done in Sample policy with ISM template for auto rollover, but this results in connector task failure due to writing to an index name for which an alias name already exists. I confirmed that POSTing directly via Opensearch dev tools works and rolls over correctly.

So it seems that the temporary solution I had in mind won't work; do you have any ideas?

Stack trace from crashed connector task:

org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
    at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:611)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:333)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:234)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
    at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: org.apache.kafka.connect.errors.ConnectException: Failed to create index my-index after total of 1 attempt(s)
    at io.aiven.kafka.connect.opensearch.RetryUtil.callWithRetry(RetryUtil.java:137)
    at io.aiven.kafka.connect.opensearch.RetryUtil.callWithRetry(RetryUtil.java:105)
    at io.aiven.kafka.connect.opensearch.OpensearchClient.withRetry(OpensearchClient.java:244)
    at io.aiven.kafka.connect.opensearch.OpensearchClient.createIndex(OpensearchClient.java:119)
    at io.aiven.kafka.connect.opensearch.OpensearchSinkTask.ensureIndexExists(OpensearchSinkTask.java:161)
    at io.aiven.kafka.connect.opensearch.OpensearchSinkTask.tryWriteRecord(OpensearchSinkTask.java:110)
    at io.aiven.kafka.connect.opensearch.OpensearchSinkTask.put(OpensearchSinkTask.java:100)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:581)
    ... 10 more
Caused by: [my-index] OpenSearchStatusException[OpenSearch exception [type=invalid_index_name_exception, reason=Invalid index name [my-index], already exists as alias]]
    at org.opensearch.rest.BytesRestResponse.errorFromXContent(BytesRestResponse.java:202)
    at org.opensearch.client.RestHighLevelClient.parseEntity(RestHighLevelClient.java:2075)
    at org.opensearch.client.RestHighLevelClient.parseResponseException(RestHighLevelClient.java:2052)
    at org.opensearch.client.RestHighLevelClient.internalPerformRequest(RestHighLevelClient.java:1775)
    at org.opensearch.client.RestHighLevelClient.performRequest(RestHighLevelClient.java:1745)
    at org.opensearch.client.RestHighLevelClient.performRequestAndParseEntity(RestHighLevelClient.java:1709)
    at org.opensearch.client.IndicesClient.create(IndicesClient.java:160)
    at io.aiven.kafka.connect.opensearch.OpensearchClient.lambda$createIndex$3(OpensearchClient.java:123)
    at io.aiven.kafka.connect.opensearch.RetryUtil.callWithRetry(RetryUtil.java:119)
    ... 17 more
    Suppressed: org.opensearch.client.ResponseException: method [PUT], host [https://<MY_INSTANCE>.aivencloud.com:28071], URI [/my-index?master_timeout=30s&timeout=30s], status line [HTTP/1.1 400 Bad Request]
{"error":{"root_cause":[{"type":"invalid_index_name_exception","reason":"Invalid index name [my-index], already exists as alias","index":"my-index","index_uuid":"_na_"}],"type":"invalid_index_name_exception","reason":"Invalid index name [my-index], already exists as alias","index":"my-index","index_uuid":"_na_"},"status":400}
        at org.opensearch.client.RestClient.convertResponse(RestClient.java:344)
        at org.opensearch.client.RestClient.performRequest(RestClient.java:314)
        at org.opensearch.client.RestClient.performRequest(RestClient.java:289)
        at org.opensearch.client.RestHighLevelClient.internalPerformRequest(RestHighLevelClient.java:1762)
        ... 22 more
nathanMiniovich commented 2 years ago

+1 for Datastream support

@willyborankin Any thoughts on a workaround for the time being? Is @LucasStinchcombe on the right track in https://github.com/aiven/opensearch-connector-for-apache-kafka/issues/85#issuecomment-1225168694?

willyborankin commented 2 years ago

Hi @nathanMiniovich, there is no workaround so far sorry, since this is different REST API for data streams. Regarding an exception: The connector checks index by index rest api not by alias api. It works as in existing ES connector for Kafka.