confluentinc / kafka-connect-elasticsearch

Kafka Connect Elasticsearch connector
Other
12 stars 435 forks source link

topic to index mapping #12

Closed dmikenz closed 8 years ago

dmikenz commented 8 years ago

Hi There,

First of all thanks for the work on this connector - looking forward to getting it up and running.

In Kafka I have my topics created with an uppercase naming, so when starting up the ES sink connector it fails as per the following...that's ok, I get that:

[2016-07-30 10:49:31,717][DEBUG][action.admin.indices.create] [Mandrill] [REP-SOE.CUSTOMERS] failed to create
[REP-SOE.CUSTOMERS] InvalidIndexNameException[Invalid index name [REP-SOE.CUSTOMERS], must be lowercase]

So I thought to map between the topic and the index as per the following:

name=elasticsearch-sink
connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
tasks.max=1
topics=REP-SOE.CUSTOMERS
key.ignore=true
connection.url=http://localhost:9200
type.name=kafka-connect
topic.index.map=rep-soe.customers

But I get this error:

 (io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig:178)
[2016-07-30 10:53:17,941] ERROR Task elasticsearch-sink-0 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:142)
java.lang.ArrayIndexOutOfBoundsException: 1

Can you please explain how to configure this mapping properly (that is between REP-SOE.CUSTOMERS (topic) and the rep-soe.customers (index)).

Note also that I have not created this index in Elastic as I saw that the sink connector does this in my initial tests. Would it still create a new index when these mappings are used?

Thanks for your help here.

Ishiihara commented 8 years ago

@dmikenz I think for the topic.index.map config, you need something like topic.index.map=REP-SOE.CUSTOMERS:rep-soe.customers. This config is intended to set the topic to index map for multiple topics. If you want to set mapping to multiple topics, you can use topic.index.map=topic1:index1, topic2:index2.

rmoff commented 8 years ago

@Ishiihara thanks - this syntax worked for me.

aseroj commented 7 years ago

@Ishiihara worked for me as well. saved 2 days of head banging. THANKS

dcdebug commented 6 years ago

@Ishiihara Where the config file that include the topic.index.map option ? My Englis is very poor .Thank you !

sslavian812 commented 5 years ago

Hi. I have the same issue as the topic starter. However, I have a lot of different kafka topics names in camelCase. Is there a way to automatically convert topic names to lowercase instead of manually specifying every topic mapping?

topic.index.map=TopicOne:topicone, TopicTwo:topictwo, ... ,TopicHundred:topichundred

rmoff commented 5 years ago

What version are you using? I believe this is now done automatically, at least in 5.1.

sslavian812 commented 5 years ago

I'm using a snapshot of your master, which I've built in December. It's has 5.1 version. You are right, I've found place in the code which is doing lowercasing.

final String indexOverride = topicToIndexMap.get(topic);
String index = indexOverride != null ? indexOverride : topic.toLowerCase();

I'd like my indices to be prepended with a prefix, for example myTopic -> kafka.mytopic. I was thinking about using elasticsearch.index.prefix property, but it seems to be deprecated and removed. What should I use instead?

rmoff commented 5 years ago

If you want to amend the topic name then a Single Message Transform is perfect for that. Check out some examples in https://www.confluent.io/blog/simplest-useful-kafka-connect-data-pipeline-world-thereabouts-part-3/ and docs for the regex router here: https://docs.confluent.io/current/connect/transforms/regexrouter.html#regexrouter

sslavian812 commented 5 years ago

That helped, thank you @rmoff !

rishabhk09 commented 1 year ago

@rmoff @sslavian812 Looks like topic.index.map property is not working with the latest version of kafka-connect-elasticsearch If not, any work around for the same?

rmoff commented 1 year ago

@rishabhk09 this is a closed issue from over four years ago. Please open a new ticket with your issue and/or go to https://www.confluent.io/en-gb/community/ask-the-community/.