Open idarlington opened 7 years ago
@idarlington topic+partition+offset
should be unique within a Kafka cluster -- that is how you uniquely identify a topic which is why the connector uses that. You could have conflicting IDs if you're pulling data from two Kafka clusters where the same topic name is used. If this is the cause of your data loss, you could add a RegexpRouter
transformation to your connector to ensure the topic name is prepended with a cluster name as well so you'll have globally unique topic names.
Currently the ID is controlled by the key.ignore
setting. If it is true
then you get the topic+partition+offset
format. If it is false
, the ID will be the Kafka key, which allows you to update documents in Elasticsearch. I don't think there's a way to just use Elasticsearch IDs currently, though I think that's the behavior you would see if your data doesn't have keys.
Thanks @ewencp.
I am currently operating a single cluster. I noticed that the id is now topic+partition+logsize
.
From the offsetchecker it seems the offset is not updating.
Group Topic Pid Offset logSize Lag Owner
my_group gritServer 0 81848 142559 60711 none
Also, if the server is restarted does the offset value change. I currently have my log.dir
in tmp
Finally can you point me to examples of RegexpRouter
transformation. Thank you.
Interesting that the offset does not seem to be correct. I'm looking at the code where that ID is generated and it's definitely using the Kafka offset for the last part. That wouldn't rule out an issue in the framework, but I don't think anything has changed there. Is the logSize
and Lag
from that command increasing? If the offset isn't, that would imply that offsets are not being successfully committed. Could you check your Connect logs to see if there are any errors or messages about offset commit failing?
I think it's probably not your problem since you seem to have tracked the issue down to the offset, but although the docs don't have an example of the regex router, I can give a quick example here:
transforms=Rename
transforms.Rename.type=org.apache.kafka.connect.transforms.RegexRouter
transforms.Rename.regex=(.*)
transforms.Rename.replacement=prefix-$1
That will prefix the topic names with a constant prefix before the connector processes them.
Also, note that log.dir
in tmp
is a bad idea. It works fine when you're just developing/testing locally and don't care about the data, but depending on how your /tmp
is managed, files could seemingly randomly be deleted and data would disappear.
Thanks @ewencp
Yes, the logsize
and lag
are increasing and the offset
isn't increasing. BTW I am using Confluent 3.1.2
with ES 2.0.0
I can't find any errors in the logs.These are mostly the contents:
[2017-07-10 18:09:53,927] INFO WorkerSinkTask{id=elasticsearch-sink-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSinkTask:262)
[2017-07-10 18:10:53,927] INFO WorkerSinkTask{id=elasticsearch-sink-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSinkTask:262)
[2017-07-10 18:11:53,926] INFO WorkerSinkTask{id=elasticsearch-sink-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSinkTask:262)
[2017-07-10 18:12:53,927] INFO WorkerSinkTask{id=elasticsearch-sink-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSinkTask:262)
[2017-07-10 18:13:53,927] INFO WorkerSinkTask{id=elasticsearch-sink-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSinkTask:262)
[2017-07-10 18:14:53,927] INFO WorkerSinkTask{id=elasticsearch-sink-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSinkTask:262)
[2017-07-10 18:15:53,928] INFO WorkerSinkTask{id=elasticsearch-sink-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSinkTask:262)
[2017-07-10 18:16:53,926] INFO WorkerSinkTask{id=elasticsearch-sink-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSinkTask:262)
[2017-07-10 18:17:53,927] INFO WorkerSinkTask{id=elasticsearch-sink-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSinkTask:262)
[2017-07-10 18:18:53,928] INFO WorkerSinkTask{id=elasticsearch-sink-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSinkTask:262)
I would be updating the log.dir
then, I do hope moving the contents of the current dir and updating its value in server.properties
would suffice.
I'm also wondering if possibly you got into a bad state for the topic such that the consumer cannot make progress. If you increase the log level, you should see more messages indicating the sink task's progress. Increasing all the way to TRACE, if even only for WorkerSinkTask, would give log info about both the messages as they are consumed (including even logging the key and value) and the offsets that are actually being committed.
You might also just want to check that all the files we'd expect to be in the Kafka directory are there, i.e. that something didn't get deleted.
@ewencp
I have increased the log to TRACE, this is a snippet:
[2017-07-11 07:39:54,670] TRACE Executing batch 450 of 1 records (io.confluent.connect.elasticsearch.bulk.BulkProcessor:347)
[2017-07-11 07:39:54,671] DEBUG POST method created based on client request (io.searchbox.client.http.JestHttpClient:99)
[2017-07-11 07:39:54,671] DEBUG Request method=POST url=http://localhost:9200/_bulk (io.searchbox.client.http.JestHttpClient:84)
[2017-07-11 07:39:54,671] DEBUG CookieSpec selected: default (org.apache.http.client.protocol.RequestAddCookies:122)
[2017-07-11 07:39:54,672] DEBUG Auth cache not set in the context (org.apache.http.client.protocol.RequestAuthCache:76)
[2017-07-11 07:39:54,672] DEBUG Connection request: [route: {}->http://localhost:9200][total kept alive: 2; route allocated: 2 of 2; total allocated: 2 of 20] (org.apache.http.impl.conn.PoolingHttpClientConnectionManager:249)
[2017-07-11 07:39:54,672] DEBUG Connection leased: [id: 0][route: {}->http://localhost:9200][total kept alive: 1; route allocated: 2 of 2; total allocated: 2 of 20] (org.apache.http.impl.conn.PoolingHttpClientConnectionManager:282)
[2017-07-11 07:39:54,672] DEBUG http-outgoing-0: set socket timeout to 3000 (org.apache.http.impl.conn.DefaultManagedHttpClientConnection:90)
[2017-07-11 07:39:54,673] DEBUG Executing request POST /_bulk HTTP/1.1 (org.apache.http.impl.execchain.MainClientExec:255)
[2017-07-11 07:39:54,673] DEBUG Target auth state: UNCHALLENGED (org.apache.http.impl.execchain.MainClientExec:260)
[2017-07-11 07:39:54,673] DEBUG Proxy auth state: UNCHALLENGED (org.apache.http.impl.execchain.MainClientExec:266)
[2017-07-11 07:39:54,673] DEBUG http-outgoing-0 >> POST /_bulk HTTP/1.1 (org.apache.http.headers:135)
[2017-07-11 07:39:54,673] DEBUG http-outgoing-0 >> Content-Length: 2216 (org.apache.http.headers:138)
[2017-07-11 07:39:54,674] DEBUG http-outgoing-0 >> Content-Type: application/json; charset=UTF-8 (org.apache.http.headers:138)
[2017-07-11 07:39:54,674] DEBUG http-outgoing-0 >> Host: localhost:9200 (org.apache.http.headers:138)
[2017-07-11 07:39:54,674] DEBUG http-outgoing-0 >> Connection: Keep-Alive (org.apache.http.headers:138)
[2017-07-11 07:39:54,674] DEBUG http-outgoing-0 >> User-Agent: Apache-HttpClient/4.5.1 (Java/1.8.0_111) (org.apache.http.headers:138)
[2017-07-11 07:39:54,674] DEBUG http-outgoing-0 >> Accept-Encoding: gzip,deflate (org.apache.http.headers:138)
[2017-07-11 07:39:54,675] DEBUG http-outgoing-0 >> "POST /_bulk HTTP/1.1[\r][\n]" (org.apache.http.wire:72)
[2017-07-11 07:39:54,675] DEBUG http-outgoing-0 >> "Content-Length: 2216[\r][\n]" (org.apache.http.wire:72)
[2017-07-11 07:39:54,675] DEBUG http-outgoing-0 >> "Content-Type: application/json; charset=UTF-8[\r][\n]" (org.apache.http.wire:72)
[2017-07-11 07:39:54,676] DEBUG http-outgoing-0 >> "Host: localhost:9200[\r][\n]" (org.apache.http.wire:72)
[2017-07-11 07:39:54,676] DEBUG http-outgoing-0 >> "Connection: Keep-Alive[\r][\n]" (org.apache.http.wire:72)
[2017-07-11 07:39:54,676] DEBUG http-outgoing-0 >> "User-Agent: Apache-HttpClient/4.5.1 (Java/1.8.0_111)[\r][\n]" (org.apache.http.wire:72)
[2017-07-11 07:39:54,677] DEBUG http-outgoing-0 >> "Accept-Encoding: gzip,deflate[\r][\n]" (org.apache.http.wire:72)
[2017-07-11 07:39:54,677] DEBUG http-outgoing-0 >> "[\r][\n]" (org.apache.http.wire:72)
[2017-07-11 07:39:54,677] DEBUG http-outgoing-0 >> "{"index":{"_id":"gritServer+0+199211","_index":"grits","_type":"docs"}}[\n]" (org.apache.http.wire:72)
[2017-07-11 07:39:54,678] DEBUG http-outgoing-0 >> "{"data":[{"costSinceLast":[{"sourceName":"Sunhive_4","sourceType":"grid","costSinceLast":3.03,"configID_FK":"3Y5YFOYIA3KQKBZYVUB0EOM2JUC0YIK0"}],"powerSinceLast":[{"sourceName":"Sunhive_4","sourceType":"grid","powerSinceLast":141371.1,"configID_FK":"3Y5YFOYIA3KQKBZYVUB0EOM2JUC0YIK0"},{"sourceName":"","sourceType":"","powerSinceLast":252830.4,"configID_FK":"3Y5YFOYIA3KQKBZYVUB0EOM2JUC0YIK0"}],"energyTodaySource":[{"sourceName":"Sunhive_4","sourceType":"grid","energyToday":1111338.2,"configID_FK":"3Y5YFOYIA3KQKBZYVUB0EOM2JUC0YIK0"},{"sourceName":"","sourceType":"","energyToday":2078056.5,"configID_FK":"3Y5YFOYIA3KQKBZYVUB0EOM2JUC0YIK0"}],"activeSource":[{"sourceName":"Sunhive_4","sourceType":"grid","configID_FK":"3Y5YFOYIA3KQKBZYVUB0EOM2JUC0YIK0"}],"timeTodaySource":[{"sourceName":"Sunhive_4","sourceType":"grid","timeToday":1.3630964,"configID_FK":"3Y5YFOYIA3KQKBZYVUB0EOM2JUC0YIK0"},{"sourceName":"","sourceType":"","timeToday":0.0,"configID_FK":"3Y5YFOYIA3KQKBZYVUB0EOM2JUC0YIK0"}],"current":[{"sourceName":"Sunhive_4","value":[227.49,215.29,189.65],"sourceType":"grid","configID_FK":"3Y5YFOYIA3KQKBZYVUB0EOM2JUC0YIK0"},{"sourceName":"","value":[227.49,227.49,227.49,227.49,227.49],"sourceType":"","configID_FK":"3Y5YFOYIA3KQKBZYVUB0EOM2JUC0YIK0"}],"voltage":[{"sourceName":"Sunhive_4","value":[233.744,232.89,231.452],"sourceType":"grid","configID_FK":"3Y5YFOYIA3KQKBZYVUB0EOM2JUC0YIK0"},{"sourceName":"","value":[233.744,233.744,233.744,233.744,233.744],"sourceType":"","configID_FK":"3Y5YFOYIA3KQKBZYVUB0EOM2JUC0YIK0"}],"powerfactor":[{"sourceName":"Sunhive_4","value":[0.984,0.981,0.977],"sourceType":"grid","configID_FK":"3Y5YFOYIA3KQKBZYVUB0EOM2JUC0YIK0"},{"sourceName":"","value":[0.984,0.984,0.984,0.984,0.984],"sourceType":"","configID_FK":"3Y5YFOYIA3KQKBZYVUB0EOM2JUC0YIK0"}],"time":"2017-07-11T08:39:46.893+0100","Type":"1","id":"70:B3:D5:43:09:E6","costTodaySource":[{"sourceName":"Sunhive_4","sourceType":"grid","costToday":27783.455,"configID_FK":"3Y5YFOYIA3KQKBZYVUB0EOM2JUC0YIK0"}]}],"master":{"id":"70:B3:D5:43:09:E6","time":"2017-07-11T08:39:46.893+0100","configuration_IDs":["3Y5YFOYIA3KQKBZYVUB0EOM2JUC0YIK0"]}}[\n]" (org.apache.http.wire:72)
[2017-07-11 07:39:54,685] DEBUG http-outgoing-0 << "HTTP/1.1 200 OK[\r][\n]" (org.apache.http.wire:72)
[2017-07-11 07:39:54,685] DEBUG http-outgoing-0 << "Content-Type: application/json; charset=UTF-8[\r][\n]" (org.apache.http.wire:72)
[2017-07-11 07:39:54,685] DEBUG http-outgoing-0 << "Content-Length: 181[\r][\n]" (org.apache.http.wire:72)
[2017-07-11 07:39:54,686] DEBUG http-outgoing-0 << "[\r][\n]" (org.apache.http.wire:72)
[2017-07-11 07:39:54,686] DEBUG http-outgoing-0 << "{"took":7,"errors":false,"items":[{"index":{"_index":"grits","_type":"docs","_id":"gritServer+0+199211","_version":7,"_shards":{"total":2,"successful":1,"failed":0},"status":200}}]}" (org.apache.http.wire:86)
[2017-07-11 07:39:54,686] DEBUG http-outgoing-0 << HTTP/1.1 200 OK (org.apache.http.headers:124)
[2017-07-11 07:39:54,686] DEBUG http-outgoing-0 << Content-Type: application/json; charset=UTF-8 (org.apache.http.headers:127)
[2017-07-11 07:39:54,687] DEBUG http-outgoing-0 << Content-Length: 181 (org.apache.http.headers:127)
[2017-07-11 07:39:54,687] DEBUG Connection can be kept alive indefinitely (org.apache.http.impl.execchain.MainClientExec:284)
[2017-07-11 07:39:54,687] DEBUG Connection [id: 0][route: {}->http://localhost:9200] can be kept alive indefinitely (org.apache.http.impl.conn.PoolingHttpClientConnectionManager:314)
[2017-07-11 07:39:54,688] DEBUG Connection released: [id: 0][route: {}->http://localhost:9200][total kept alive: 2; route allocated: 2 of 2; total allocated: 2 of 20] (org.apache.http.impl.conn.PoolingHttpClientConnectionManager:320)
[2017-07-11 07:39:54,688] DEBUG Bulk operation was successfull (io.searchbox.core.Bulk:143)
tree output :
├── bin
│ ├── camus-config
│ ├── camus-run
│ ├── confluent-rebalancer
│ ├── connect-distributed
│ ├── connect-standalone
│ ├── control-center-3_0_0-reset
│ ├── control-center-3_0_1-reset
│ ├── control-center-console-consumer
│ ├── control-center-reset
│ ├── control-center-run-class
│ ├── control-center-set-acls
│ ├── control-center-start
│ ├── kafka-acls
│ ├── kafka-avro-console-consumer
│ ├── kafka-avro-console-producer
│ ├── kafka-configs
│ ├── kafka-console-consumer
│ ├── kafka-console-producer
│ ├── kafka-consumer-groups
│ ├── kafka-consumer-offset-checker
│ ├── kafka-consumer-perf-test
│ ├── kafka-mirror-maker
│ ├── kafka-preferred-replica-election
│ ├── kafka-producer-perf-test
│ ├── kafka-reassign-partitions
│ ├── kafka-replay-log-producer
│ ├── kafka-replica-verification
│ ├── kafka-rest-run-class
│ ├── kafka-rest-start
│ ├── kafka-rest-stop
│ ├── kafka-rest-stop-service
│ ├── kafka-run-class
│ ├── kafka-server-start
│ ├── kafka-server-stop
│ ├── kafka-simple-consumer-shell
│ ├── kafka-streams-application-reset
│ ├── kafka-topics
│ ├── kafka-verifiable-consumer
│ ├── kafka-verifiable-producer
│ ├── schema-registry-run-class
│ ├── schema-registry-start
│ ├── schema-registry-stop
│ ├── schema-registry-stop-service
│ ├── support-metrics-bundle
│ ├── windows
│ ├── zookeeper-security-migration
│ ├── zookeeper-server-start
│ ├── zookeeper-server-stop
│ └── zookeeper-shell
├── etc
│ ├── camus
│ ├── confluent-common
│ ├── confluent-control-center
│ ├── confluent-rebalancer
│ ├── kafka
│ ├── kafka-connect-elasticsearch
│ ├── kafka-connect-hdfs
│ ├── kafka-connect-jdbc
│ ├── kafka-connect-replicator
│ ├── kafka-rest
│ ├── rest-utils
│ └── schema-registry
etc/kafka
.
├── connect-console-sink.properties
├── connect-console-source.properties
├── connect-distributed.properties
├── connect-file-sink.properties
├── connect-file-source.properties
├── connect-log4j.properties
├── connect-standalone.properties
├── consumer.properties
├── log4j.properties
├── producer.properties
├── server.properties
├── tools-log4j.properties
└── zookeeper.properties
@idarlington Ok, so it looks like it's definitely making requests. We might want a larger snippet though because the TRACE and DEBUG output is so verbose -- most of that is from the underlying library making the request, but doesn't include much output from Connect itself. We'd probably want more lines that include io.confluent
or org.apache.kafka
logs.
@idarlington I'm not seeing any duplicates in that log? In fact it looks like requests are working as expected. I've grepped out a bit to help show what the connector is doing:
$ grep _id log | grep '>>' | cut -f 6 -d ' '
"{"index":{"_id":"gritServer+0+199576","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199577","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199578","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199579","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199580","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199581","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199582","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199583","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199584","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199585","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199586","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199587","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199588","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199589","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199590","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199591","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199592","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199593","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199594","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199595","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199596","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199597","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199598","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199599","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199600","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199601","_index":"grits","_type":"docs"}}[\n]"
"POST
"{"index":{"_id":"gritServer+0+199603","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199604","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199605","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199606","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199607","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199608","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199609","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199610","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199611","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199612","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199613","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199614","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199615","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199616","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199617","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199618","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199619","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199620","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199621","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199622","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199623","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199624","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199625","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199626","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199627","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199628","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199629","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199630","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199631","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199632","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199633","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199634","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199635","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199636","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199637","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199638","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199639","_index":"grits","_type":"docs"}}[\n]"
(It's missing one and has POST in there because the log is a bit mangled.) So for gritServer-0, we're seeing what we expect -- each message processed once, and in order.
Looking at the offset commits:
[2017-07-11 07:43:40,673] TRACE Flushing data to Elasticsearch with the following offsets: {dailyData-0=OffsetAndMetadata{offset=0, metadata=''}, gritServer-0=OffsetAndMetadata{offset=199607, metadata=''}} (io.confluent.connect.elasticsearch.ElasticsearchSinkT metadata=''}} (org.apache.kafka.clients.consumer.KafkaConsumer:1160)
[2017-07-11 07:43:40,675] TRACE Sending offset-commit request with {dailyData-0=OffsetAndMetadata{offset=0, metadata=''}, gritServer-0=OffsetAndMetadata{offset=199607, metadata=''}} to coordinator 45.76.39.129:9092 (id: 2147483647 rack: null) for group connect-elasticsearch-sink (org.aconnect-elasticsearch-sink committed offset 199607 for partition gritServer-0 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:640)
...snip...
[2017-07-11 07:43:58,194] TRACE Flushing data to Elasticsearch with the following offsets: {dailyData-0=OffsetAndMetadata{offset=0, metadata=''}, gritServer-0=OffsetAndMetadata{offset=199640, metadata=''}} (io.confluent.connect.SinkTask:130)
[2017-07-11 07:43:58,194] INFO WorkerSinkTask{id=elasticsearch-sink-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSinkTask:262)
You can see that the offset being committed for gritServer-0 is definitely increasing. What's notable is that dailyData-0 is not increasing and none of the IDs reported above have that in their name, which indicates no messages are being seen for it. Is there actually any data flowing into that topic partition? The log clearly shows that fetch requests are being sent:
[2017-07-11 07:43:29,127] TRACE Added fetch request for partition dailyData-0 at offset 0 (org.apache.kafka.clients.consumer.internals.Fetcher:639)
but it seems they are not receiving any data in response.
It looks like things are running fine, there just isn't data in one of the topic partitions.
@ewencp I want to add another input to this thread Using our predefined _id actually degradate the index time and index performance This is also one of the Elastic recommendation to use their generated id's instead of ours so each bulk will not have to check whether the _id already exists.
I can understand the design but for some cases like basic logs, we don't need the validation of the _id and as a result the degradation but just able to index with less performance costs
I don't think there's a way to just use Elasticsearch IDs currently, though I think that's the behavior you would see if your data doesn't have keys.
The way to do that is send null
as the document ID in the indexing / bulk request, as opposed to forcing it to be a string like you do now.
In the logging use case it is usually okay to have some messages added twice (or even completely dropped), so IMO there should be an opt-in configuration which allows you to rely on ES IDs. This will result in much faster inserts to ES.
See the note I added on issue #139 for a very minor code change I made to support allowing null document keys and thus auto-generating the ids in Elasticsearch. https://github.com/confluentinc/kafka-connect-elasticsearch/issues/139#issuecomment-461819978 I've been running this for several months now at an average rate of 2.5 billion messages per day with no issues.
I noticed that documents indexed in elasticsearch have their ids in the following format
topic+partition+offset
.I would prefer to use id's generated by elasticsearch. It seems
topic+partition+offset
is not usually unique so I am loosing data.How can I change that?