spring-projects / spring-framework

Spring Framework
https://spring.io/projects/spring-framework
Apache License 2.0
56.6k stars 38.13k forks source link

The flux created from bodyToFlux does not work with counting after groupBy [SPR-16148] #20696

Closed spring-projects-issues closed 7 years ago

spring-projects-issues commented 7 years ago

chao chang opened SPR-16148 and commented

I'm using web client to retrieve a text file,and then calculate the top N frequent words. the lines flux created from bodyToFlux() method seems does not work(the consumer of topFreqWords is not called).Alternatively if the lines flux is created from a string array,the program works properly. And,if we just subscribe to the lines flux created from bodyToFlux(),the consumer does get called. if we subscribed to wordGoups flux,the consumer will be called. But if we subscribed to wordCounts,the consumer will not be called.

The below is the code snippet.

  // get the content of the text file line by line
  Flux<String> lines = client.get().retrieve().bodyToFlux(String.class);

  // TODO: if the lines Flux is created using a String array,
  // the whole program does work
  // So I think this is related to WebClient.
//  lines = Flux.fromArray(new String[] {
//
//    "That was in the time of Burke and Fox and Rodney.",
//
//    " Spain and France and Holland had combined,",
//
//    " and in one great battle threatened to crush" });

  // convert to words
  Flux<String> words = lines.filter(it -> StringUtils.isNotBlank(it)).flatMapIterable(this::extractWords);

  // group by words
  Flux<GroupedFlux<String, String>> wordGroups = words.groupBy(it -> it);

  // get the number of for each word
  Flux<Pair<String, Long>> wordCounts = wordGroups.flatMap(gr -> gr.count().map(cnt -> Pair.of(gr.key(), cnt)));

  // get top N words
  Flux<Pair<String, Long>> topFreqWords = wordCounts.sort((p1, p2) -> Long.compare(p2.getRight(), p1.getRight()))
    .take(top);

  // print the top N words
  topFreqWords.subscribe(log::info);

Affects: 5.0.1

Reference URL: https://github.com/chang-chao/top-freq-words/blob/master/src/main/java/me/changchao/reactive/topfreqwords/TopFreqWordCounter.java

spring-projects-issues commented 7 years ago

Brian Clozel commented

This tracker is meant for issues or enhancement requests; for questions, please create a new question on StackOverflow.

In this case, Flux.fromArray will emit each array element as a value in the resulting Flux.

You're assuming that asking the WebClient for a Flux<String> will split on new lines but it's not true. It will just randomly split on chars, depending on what's read on the network and the demand on the reactive stream itself. You can compose that Flux with another operator to split on new lines (see Spring's StringDecoder), or you can configure the client codecs to split text on new lines, there's an option for that.