confluentinc / kafka-connect-jdbc

Kafka Connect connector for JDBC-compatible databases
Other
1.01k stars 953 forks source link

Timeout when reading offset of config.storage.topic #1370

Open sabancihan opened 7 months ago

sabancihan commented 7 months ago

2023-11-14 05:11:42,432 WARN || Timeout while reading log to end for topic 'connect_configs'. Retrying automatically. This may occur when brokers are unavailable or unreachable. Reason: Timed out while waiting to get end offsets for topic 'connect_configs' on brokers at broker-1:9092 [org.apache.kafka.connect.util.KafkaBasedLog]

This is the error I get, but broker seems to be available and reachable at the time, after multiple tries of accessing offset of connect_configs it becomes accessible after around 3 minutes.

I traced where error message comes from and found that it is from endoffsets method of TopicAdmin.class

    public Map<TopicPartition, Long> endOffsets(Set<TopicPartition> partitions) {
        if (partitions == null || partitions.isEmpty()) {
            return Collections.emptyMap();
        }
        Map<TopicPartition, OffsetSpec> offsetSpecMap = partitions.stream().collect(Collectors.toMap(Function.identity(), tp -> OffsetSpec.latest()));
        ListOffsetsResult resultFuture = admin.listOffsets(offsetSpecMap, new ListOffsetsOptions(IsolationLevel.READ_UNCOMMITTED));
        // Get the individual result for each topic partition so we have better error messages
        Map<TopicPartition, Long> result = new HashMap<>();
        for (TopicPartition partition : partitions) {
            try {
                ListOffsetsResultInfo info = resultFuture.partitionResult(partition).get();
                result.put(partition, info.offset());
            } catch (ExecutionException e) {
                Throwable cause = e.getCause();
                String topic = partition.topic();
                if (cause instanceof AuthorizationException) {
                    String msg = String.format("Not authorized to get the end offsets for topic '%s' on brokers at %s", topic, bootstrapServers);
                    throw new ConnectException(msg, e);
                } else if (cause instanceof UnsupportedVersionException) {
                    // Should theoretically never happen, because this method is the same as what the consumer uses and therefore
                    // should exist in the broker since before the admin client was added
                    String msg = String.format("API to get the get the end offsets for topic '%s' is unsupported on brokers at %s", topic, bootstrapServers);
                    throw new UnsupportedVersionException(msg, e);
                } else if (cause instanceof TimeoutException) {
                    String msg = String.format("Timed out while waiting to get end offsets for topic '%s' on brokers at %s", topic, bootstrapServers);
                    throw new TimeoutException(msg, e);
                } else if (cause instanceof LeaderNotAvailableException) {
                    String msg = String.format("Unable to get end offsets during leader election for topic '%s' on brokers at %s", topic, bootstrapServers);
                    throw new LeaderNotAvailableException(msg, e);
                } else if (cause instanceof org.apache.kafka.common.errors.RetriableException) {
                    throw (org.apache.kafka.common.errors.RetriableException) cause;
                } else {
                    String msg = String.format("Error while getting end offsets for topic '%s' on brokers at %s", topic, bootstrapServers);
                    throw new ConnectException(msg, e);
                }
            } catch (InterruptedException e) {
                Thread.interrupted();
                String msg = String.format("Interrupted while attempting to read end offsets for topic '%s' on brokers at %s", partition.topic(), bootstrapServers);
                throw new RetriableException(msg, e);
            }
        }
        return result;
    }

I did not get this timeout error when using a kafka cluster instead of single broker but I am not sure if it is really related to issue, broker seems to be up when offset read happens.

What may cause this timeout error?

parvezDevIT commented 5 months ago

Check the size of topic created by config.storage.topic . If size is big, delete and re-create topic and check if it solves the issue.