confluentinc / kafka-connect-elasticsearch

Kafka Connect Elasticsearch connector
Other
15 stars 435 forks source link

Support elastic-search routing for kafka-connect-elasticsearch sink. #223

Open rkalluri-clgx opened 6 years ago

rkalluri-clgx commented 6 years ago

I would like to know if we support routing of records to certain shards of elastic search as explained below.

https://www.elastic.co/guide/en/elasticsearch/reference/6.2/mapping-routing-field.html

Looks like we need to be able to specify routing param in the elastic url, and can vary from message to message. Looks like this needs to be dynamic with a SMT of some sort to pick routing from the message just like we pick the index. Either that or have a per partition route from Kafka for performance reasons. Just wanted to kickstart the discussions.

DevonPeroutky commented 6 years ago

+1 Would love to see this.

robgryn commented 6 years ago

+1 Would also benefit from this

maxsel commented 6 years ago

+1 This feature would enable parent-child relationship in target Elasticsearch cluster, otherwise I have no idea how to achieve this.

matpersonne commented 6 years ago

+1 I realy need this feature, I changed all my ES mapping to join / routing. I'm now stuck with kafka connect to add the routing={id}

mikelsanvi commented 6 years ago

+1 This would be really useful

fubhy commented 5 years ago

I also need this feature for defining parent/child join relations: https://www.elastic.co/guide/en/elasticsearch/reference/6.3/parent-join.html

MottiniMauro commented 5 years ago

+1 Having issues finding a way to implement parent-child relationship without this

frankkoornstra commented 5 years ago

+1 Would be great to make our cluster more efficient

dgthomugo commented 4 years ago

@levzem Is there a plan to support this feature? we really need a way to flush parent-child relationship records in kafka-connect-elasticsearch.

mIkhail-zaretsky commented 4 years ago

Is there any workaround??

arungitan commented 4 years ago

same here. Would like to be able to insert parent-child records via elasticsearch connector. Any updates on this?

Arsennikum commented 4 years ago

Can we achieve that by using a custom ID?
In formula shard_num = hash(_routing) % num_primary_shards
Elastic use document id as _routing,
so we can pass our own id for achieving something like custom routing, isn't so?

frankkoornstra commented 4 years ago

That leaks a details Kafka Connect like how the shard number is calculated and how many shards the target index has. Besides, it would involve storing state about which ids are still available, creating an algo to come up with the next id for a targeted shard... all things that shouldn't be necessary if routing would be supported.

arungitan commented 4 years ago

for those still trying to solve this, I found this cool workaround (if you have some control on the elasticsearch side): Use an ingest script processor! https://www.elastic.co/guide/en/elasticsearch/reference/current/script-processor.html https://www.elastic.co/guide/en/elasticsearch/reference/current/accessing-data-in-pipelines.html The line of interest is this: The following metadata fields are accessible by a processor: _index, _type, _id, _routing So, not only _routing but even _id and _index itself can be conveniently scripted.

janpetr11 commented 3 years ago

Workaround is fine for the index operation, but it isn't useful for the delete (tombstones) operation because ES will not start ingest pipeline for the update and delete operations. Correct me if I'm not right. In bulk operations, each entry must contain a routing value using the routing field. Therefore, it would be great if the connector supported it.

frankkoornstra commented 3 years ago

Does anyone have an idea of the work involved? Maybe it can be picked up by a few people but someone guiding the work that has a good overview could help immensely.

hartmut-co-uk commented 3 years ago

Hi, I've had also been planning to evaluate routing option and do some testing. There's an old PR which had been closed but didn't look so bad at a quick glance: https://github.com/confluentinc/kafka-connect-elasticsearch/pull/156

frankkoornstra commented 3 years ago

Good find! I commented in the PR. Maybe the contributer is still around

hartmut-co-uk commented 3 years ago

note: I'm about to test implement this feature with current state.. will create a PR if this is working out.

hartmut-co-uk commented 3 years ago

Update: going well, implementation was relatively simple, also was able today to manually test ingesting avro data from kafka topic into defined ES index with mapping and parent-join.

Though I noticed for parent-join use case - in addition to adding the routing, the payload also potentially needs to be enriched with the my_join_field.name and my_join_field.parent (for children). https://www.elastic.co/guide/en/elasticsearch/reference/current/parent-join.html

PUT my-index-000001/_doc/3?routing=1&refresh 
{
  "my_id": "3",
  "text": "This is an answer",
  "my_join_field": {
    "name": "answer", 
    "parent": "1" 
  }
}

I wonder if it would be worth to also natively build into this connector, instead of forcing the user to enrich data upfront, or build+require a custom SMT.

Note: I tried with InsertField SMT - but since it only supports flat fields it's impossible to enrich the struct for children. https://docs.confluent.io/platform/current/connect/transforms/insertfield.html