Open martin-frydl opened 4 years ago
consumeStrings
is a convenience method that makes a lot of assumptions so it probably won't suit some use cases. You can create your own consumer by calling createStringConsumer
or createConsumer
with the desired configuration and use that instead.
But that method is practically unusable when trying to read less message than there are in topic. The rest will be always lost. And if that is desired behavior, it should be noted in javadoc since I spent more than hour trying to find out what's going on. The same applies to consume() method itself. When I pass consumer with autocommit into it, it will discard some messages. Also should have some note in documentation.
It's not desired behaviour but rather an unfortunate side effect. The consume*
helpers are something I regret in hindsight so it's probably time to deprecate them. They make a lot of assumptions about their usage and haven't aged well.
I am not sure it's possible to document all possible ways in which they could go wrong but I'd be happy to accept a PR if you have any suggestions.
I'd suggest using the following pattern as an immediate workaround for your problem:
KafkaConsumer<String, String> consumer = kafkaRule.helper().createStringConsumer(kafkaRule.helper().consumerConfig(false));
List<String> result = kafkaRule.helper().consume("my-test-topic", consumer,3).get().stream().map(ConsumerRecord::value).collect(Collectors.toList());
When there are multiple messages in topic and consumeStrings() is called with number of messages less than number present in there, the extra messages will be discarded.
Example:
Here I send two string messages into topic and then try to read one-by-one using consumeStrings(). The first one will succeed and return "a" while the second one will block forever.
The reason is enable.auto.commit set to true. The consumer will receive both messages in one poll() and commit only one (RecordConsumer.call(), line 311). This is correct but since autocommit is enabled, both message were already committed. The problem is that I'm not able to override properties passed to KafkaConsumer or set autocommit to false in consumerConfig() - as these are both called automatically from consumeStrings().
I think the solution could be to use consumerConfig(false) for creating consumer in consumeStrings(). But I haven't tested it.