confluentinc / kafka-connect-storage-cloud

Kafka Connect suite of connectors for Cloud storage (Amazon S3)
Other
11 stars 329 forks source link

Data being regularly written to S3 but offsets not being committed to Kafka #93

Closed niksajakovljevic closed 6 years ago

niksajakovljevic commented 7 years ago

I believe that this problem is not specific to this particular connector but would like to confirm that. I've been using version 3.2.2 for quite some time in production and bumped into quite sever problem recently. Discovered following log lines:

WARN :: WorkerSinkTask:337 - Ignoring invalid task provided offset topic-2/OffsetAndMetadata{offset=1000, metadata=''} -- partition not assigned
WARN :: WorkerSinkTask:337 - Ignoring invalid task provided offset topic-2/OffsetAndMetadata{offset=2000, metadata=''} -- partition not assigned
WARN :: WorkerSinkTask:337 - Ignoring invalid task provided offset topic-2/OffsetAndMetadata{offset=3000, metadata=''} -- partition not assigned
WARN :: WorkerSinkTask:337 - Ignoring invalid task provided offset topic-2/OffsetAndMetadata{offset=4000, metadata=''} -- partition not assigned

As you can see from the log lines the offset is increasing on every commit attempt and data is being written to S3 (although the files that were written didn't really follow the defined flush.size) . Once I restarted the service it started from the last committed offset (1000) but then due to files written before not enforcing flush.size the process is not idempotent as the offset (which is part of the filename) is different then it was before (due to rule of flush.size now being enforced). However the state of S3 is now corrupted.

Looking at WorkerSinkTask:337 and surrounding code which looks like:

 final Map<TopicPartition, OffsetAndMetadata> commitableOffsets = new HashMap<>(lastCommittedOffsets);
        for (Map.Entry<TopicPartition, OffsetAndMetadata> taskProvidedOffsetEntry : taskProvidedOffsets.entrySet()) {
            final TopicPartition partition = taskProvidedOffsetEntry.getKey();
            final OffsetAndMetadata taskProvidedOffset = taskProvidedOffsetEntry.getValue();
            if (commitableOffsets.containsKey(partition)) {
                if (taskProvidedOffset.offset() <= currentOffsets.get(partition).offset()) {
                    commitableOffsets.put(partition, taskProvidedOffset);
                } else {
                    log.warn("Ignoring invalid task provided offset {}/{} -- not yet consumed", partition, taskProvidedOffset);
                }
            } else {
                log.warn("Ignoring invalid task provided offset {}/{} -- partition not assigned", partition, taskProvidedOffset);
            }
        }

It seems funny that somehow taskProvidedOffsets contains TopicPartition which is not in commitableOffsets. What is even stranger is that this was running in this state for couple of days with files being written but offset not committed. Important to note is that this problem appeared about the same time when the few of worker tasks died. I run 4 instances of Kafka-connect-storage-cloud and each with 8 worker tasks, so I guess some rebalancing happened and possibly corrupted some internal WorkerSinkTask state. Even with offsets not being committed to Kafka if the flush.size was respected this would not be such a huge problem as reprocessing would overwrite the files in S3 - however I somehow experienced this non deterministic behavior.

ewencp commented 7 years ago

This sounds a lot like https://issues.apache.org/jira/browse/KAFKA-5567, do you have any transformations for this connector?

niksajakovljevic commented 6 years ago

@ewencp Thanks for getting back. As I don't have any transformations on records I don't think that could be the case. However in the meanwhile I've discovered that I had some bug in the custom partitioner code that led to system not being idempotent in the given scenario. Anyways I am still puzzled with the offsets not being committed and files being written / which lasted for couple of days: WARN :: WorkerSinkTask:337 - Ignoring invalid task provided offset topic-2/OffsetAndMetadata{offset=1000, metadata=''} -- partition not assigned

I've upgraded to latest Kafka 0.11.0.1 and will see if problem happens again.

kkonstantine commented 6 years ago

@niksajakovljevic I'm suspecting the issues with offset commit might be related with this fix (https://github.com/confluentinc/kafka-connect-storage-cloud/pull/72) that made it to 3.3.0 but not 3.2.2 and is related to a bug affecting some partitioners.

Will appear in a forthcoming 3.2.3. But upgrading to 3.3.0 S3 connector is compatible with Confluent platform 3.2.2. I'd go ahead with a connector upgrade if I were you and try again with the custom partitioner.

kkonstantine commented 6 years ago

To the best of my knowledge, this has been fixed. Closing.