reactor / reactor-core

Non-Blocking Reactive Foundation for the JVM
http://projectreactor.io
Apache License 2.0
4.92k stars 1.2k forks source link

The number of groups can't be got after groupBy in some cases. #931

Closed chang-chao closed 6 years ago

chang-chao commented 6 years ago

Expected behavior

After groupBy,the number of groups can be retrieved using count() method .

Actual behavior

In the reproducing code below,when the number of words is small,count().block() will return,as expected. But when the number of words is a not so small(say about 600 words),count().block() will get stuck.

Steps to reproduce

The code below reproduce the issue.

// read a text file line by line
Flux<String> lines = fromPath(Paths.get(TopFreqWordsApplication.INPUT_TXT_FILE));

// convert to words
Flux<String> words = lines.filter(it -> StringUtils.isNotBlank(it)).flatMapIterable(WordUtils::extractWords);

// group by words
Flux<GroupedFlux<String, String>> wordGroups = words.groupBy(it -> it);

// !!the line below gets blocked forever!!
log.info(wordGroups.count().block());

the code above can be found here

code doing the same using rxjava works as expected

Observable<String> lines = Observable.fromIterable(stream::iterator);

// convert to words
Observable<String> words = lines.filter(it -> StringUtils.isNotBlank(it))
        .flatMapIterable(WordUtils::extractWords);

// group by words
Observable<GroupedObservable<String, String>> wordGroups = words.groupBy(it -> it);

Long groupCount = wordGroups.count().blockingGet();
log.info("groupCount=" + groupCount);

the code above can be found here

the input text file for reproducing the issue can be found here

Reactor Core version

3.1.0

JVM version (e.g. java -version)

1.8.0_131

OS version (e.g. uname -a)

Windows 10

akarnokd commented 6 years ago

When you flatMap groups, the concurrency level should be at least the number of expected groups, otherwise the flow hangs due to lack of consumption on all of its groups. In RxJava, the default concurrency level of flatMap is 128 so you happen to have less than that. For Reactor, it is 32 which is not enough apparently.

simonbasle commented 6 years ago

@chang-chao as @akarnokd said, groupBy needs continuous consumption of the groups, which can be impeded if e.g. a flatMap doesn't have sufficient concurrency level. This is hinted at in the javadoc of groupBy. Note the level can be tuned using the appropriate flatMap overload.

That said, maybe groupBy is not the ideal tool for this job. Consider exploring using Flux.collect(Supplier, BiConsumer), where the container is e.g. a Map<String, Integer> and the BiConsumer increments the Integer when it encounters a word several times.

chang-chao commented 6 years ago

@akarnokd @simonbasle Thanks for the explanation and suggestion.