apache / pulsar-adapters

Apache Pulsar Adapters
https://pulsar.apache.org/
24 stars 31 forks source link

kafka adaptor can not handle non-partitioned topic #38

Open casuallc opened 2 years ago

casuallc commented 2 years ago

Reproduce

error image

probable reason PulsarKafkaConsumer -> poll

public ConsumerRecords<K, V> poll(long timeoutMillis) {
        try {
            QueueItem item = receivedMessages.poll(timeoutMillis, TimeUnit.MILLISECONDS);
            if (item == null) {
                return (ConsumerRecords<K, V>) ConsumerRecords.EMPTY;
            }

            Map<TopicPartition, List<ConsumerRecord<K, V>>> records = new HashMap<>();

            int numberOfRecords = 0;

            while (item != null) {
                TopicName topicName = TopicName.get(item.consumer.getTopic());
                String topic = topicName.getPartitionedTopicName();
                int partition = topicName.isPartitioned() ? topicName.getPartitionIndex() : 0;
                Message<byte[]> msg = item.message;
                MessageId msgId = msg.getMessageId();
                if (msgId instanceof TopicMessageIdImpl) {
                    msgId = ((TopicMessageIdImpl) msgId).getInnerMessageId();
                }
                long offset = MessageIdUtils.getOffset(msgId);

                TopicPartition tp = new TopicPartition(topic, partition);
                if (lastReceivedOffset.get(tp) == null && !unpolledPartitions.contains(tp)) {
                    log.info("When polling offsets, invalid offsets were detected. Resetting topic partition {}", tp);
                    resetOffsets(tp);
                }

               // .. other code

            // If no interceptor is provided, interceptors list will an empty list, original ConsumerRecords will be return.
            return applyConsumerInterceptorsOnConsume(interceptors, new ConsumerRecords<>(records));
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

int partition = topicName.isPartitioned() ? topicName.getPartitionIndex() : 0; This code can not discriminate partitioned-topic or non-paritioned-topic.

casuallc commented 2 years ago

37