Philippus / elastic4s

🔍 Elasticsearch Scala Client - Reactive, Non Blocking, Type Safe, HTTP Client
Apache License 2.0
1.64k stars 692 forks source link

Streaming more than 10 results may not complete #358

Closed dspasojevic closed 9 years ago

dspasojevic commented 9 years ago

Hi,

I'm investigating an issue we are seeing where streamed results never complete.

I've run ScrollPublisherIntegrationTest locally, and it seems to have the same issue. Many more results than expected are returned. Putting some debugging in com.sksamuel.elastic4s.streams.PublishActor#PublishActor, I see that the scroll id is flipping between null, not-null, null, etc (see the output at the bottom).

I think that this is because no keep alive is specified on the PublishActor's scroll request. That seems to result in org.elasticsearch.action.search.SearchScrollRequest#writeTo (v1.6.0) setting the scroll flag to false. SearchScrollRequest requires both a keep alive and a scroll id to perform the scroll and return a new scroll id.

Is this a bug in elastic4s? If so, I'm happy to submit a pull request that sets the keep alive to a value supplied to the subscriber. I think that checking the result of the .await in the tests is probably also worth doing.

Thanks, -Dan

Output:

/Library/Java/JavaVirtualMachines/jdk1.8.0_45.jdk/Contents/Home/bin/java -Didea.launcher.port=7534 "-Didea.launcher.bin.path=/Applications/IntelliJ IDEA 14.app/Contents/bin" -Dfile.encoding=UTF-8 -classpath "/Users/dan/Library/Application Support/IntelliJIdea14/Scala/lib/scala-plugin-runners.jar:/Users/dan/Library/Application Support/IntelliJIdea14/Scala/lib/Runners.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_45.jdk/Contents/Home/lib/ant-javafx.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_45.jdk/Contents/Home/lib/dt.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_45.jdk/Contents/Home/lib/javafx-mx.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_45.jdk/Contents/Home/lib/jconsole.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_45.jdk/Contents/Home/lib/packager.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_45.jdk/Contents/Home/lib/sa-jdi.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_45.jdk/Contents/Home/lib/tools.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_45.jdk/Contents/Home/jre/lib/charsets.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_45.jdk/Contents/Home/jre/lib/deploy.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_45.jdk/Contents/Home/jre/lib/javaws.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_45.jdk/Contents/Home/jre/lib/jce.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_45.jdk/Contents/Home/jre/lib/jfr.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_45.jdk/Contents/Home/jre/lib/jfxswt.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_45.jdk/Contents/Home/jre/lib/jsse.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_45.jdk/Contents/Home/jre/lib/management-agent.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_45.jdk/Contents/Home/jre/lib/plugin.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_45.jdk/Contents/Home/jre/lib/resources.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_45.jdk/Contents/Home/jre/lib/rt.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_45.jdk/Contents/Home/jre/lib/ext/cldrdata.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_45.jdk/Contents/Home/jre/lib/ext/dnsns.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_45.jdk/Contents/Home/jre/lib/ext/jfxrt.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_45.jdk/Contents/Home/jre/lib/ext/localedata.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_45.jdk/Contents/Home/jre/lib/ext/nashorn.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_45.jdk/Contents/Home/jre/lib/ext/sunec.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_45.jdk/Contents/Home/jre/lib/ext/sunjce_provider.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_45.jdk/Contents/Home/jre/lib/ext/sunpkcs11.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_45.jdk/Contents/Home/jre/lib/ext/zipfs.jar:/Users/dan/dev/elastic4s/elastic4s-streams/target/scala-2.11/test-classes:/Users/dan/dev/elastic4s/elastic4s-streams/target/scala-2.11/classes:/Users/dan/dev/elastic4s/elastic4s-core/target/scala-2.11/test-classes:/Users/dan/dev/elastic4s/elastic4s-core/target/scala-2.11/classes:/opt/activator-dist-1.3.5/repository/com.fasterxml.jackson.core/jackson-annotations/2.5.3/bundles/jackson-annotations.jar:/opt/activator-dist-1.3.5/repository/com.fasterxml.jackson.core/jackson-core/2.5.3/bundles/jackson-core.jar:/opt/activator-dist-1.3.5/repository/com.fasterxml.jackson.core/jackson-databind/2.5.3/bundles/jackson-databind.jar:/Users/dan/.ivy2/cache/com.fasterxml.jackson.module/jackson-module-scala_2.11/bundles/jackson-module-scala_2.11-2.5.3.jar:/Users/dan/.ivy2/cache/com.spatial4j/spatial4j/bundles/spatial4j-0.4.1.jar:/Users/dan/.ivy2/cache/com.thoughtworks.paranamer/paranamer/jars/paranamer-2.6.jar:/Users/dan/.ivy2/cache/commons-io/commons-io/jars/commons-io-2.4.jar:/Users/dan/.ivy2/cache/log4j/log4j/bundles/log4j-1.2.17.jar:/Users/dan/.ivy2/cache/org.antlr/antlr-runtime/jars/antlr-runtime-3.5.jar:/Users/dan/.ivy2/cache/org.apache.lucene/lucene-analyzers-common/jars/lucene-analyzers-common-4.10.4.jar:/Users/dan/.ivy2/cache/org.apache.lucene/lucene-core/jars/lucene-core-4.10.4.jar:/Users/dan/.ivy2/cache/org.apache.lucene/lucene-grouping/jars/lucene-grouping-4.10.4.jar:/Users/dan/.ivy2/cache/org.apache.lucene/lucene-highlighter/jars/lucene-highlighter-4.10.4.jar:/Users/dan/.ivy2/cache/org.apache.lucene/lucene-join/jars/lucene-join-4.10.4.jar:/Users/dan/.ivy2/cache/org.apache.lucene/lucene-memory/jars/lucene-memory-4.10.4.jar:/Users/dan/.ivy2/cache/org.apache.lucene/lucene-misc/jars/lucene-misc-4.10.4.jar:/Users/dan/.ivy2/cache/org.apache.lucene/lucene-queries/jars/lucene-queries-4.10.4.jar:/Users/dan/.ivy2/cache/org.apache.lucene/lucene-queryparser/jars/lucene-queryparser-4.10.4.jar:/Users/dan/.ivy2/cache/org.apache.lucene/lucene-sandbox/jars/lucene-sandbox-4.10.4.jar:/Users/dan/.ivy2/cache/org.apache.lucene/lucene-spatial/jars/lucene-spatial-4.10.4.jar:/Users/dan/.ivy2/cache/org.apache.lucene/lucene-suggest/jars/lucene-suggest-4.10.4.jar:/Users/dan/.ivy2/cache/org.codehaus.groovy/groovy/jars/groovy-2.3.7.jar:/Users/dan/.ivy2/cache/org.elasticsearch/elasticsearch/jars/elasticsearch-1.6.0.jar:/Users/dan/.ivy2/cache/org.mockito/mockito-all/jars/mockito-all-1.9.5.jar:/Users/dan/.ivy2/cache/org.ow2.asm/asm/jars/asm-4.1.jar:/Users/dan/.ivy2/cache/org.ow2.asm/asm-commons/jars/asm-commons-4.1.jar:/Users/dan/.ivy2/cache/org.scala-lang/scala-library/jars/scala-library-2.11.7.jar:/Users/dan/.ivy2/cache/org.scala-lang/scala-reflect/jars/scala-reflect-2.11.4.jar:/Users/dan/.ivy2/cache/org.scala-lang.modules/scala-xml_2.11/bundles/scala-xml_2.11-1.0.1.jar:/Users/dan/.ivy2/cache/org.scala-lang.modules/scala-xml_2.11/bundles/scala-xml_2.11-1.0.2.jar:/Users/dan/.ivy2/cache/org.scalatest/scalatest_2.11/bundles/scalatest_2.11-2.2.5.jar:/Users/dan/.ivy2/cache/org.scoverage/scalac-scoverage-plugin_2.11/jars/scalac-scoverage-plugin_2.11-1.0.2.jar:/Users/dan/.ivy2/cache/org.scoverage/scalac-scoverage-runtime_2.11/jars/scalac-scoverage-runtime_2.11-1.0.2.jar:/Users/dan/.ivy2/cache/org.slf4j/log4j-over-slf4j/jars/log4j-over-slf4j-1.7.12.jar:/Users/dan/.ivy2/cache/org.slf4j/slf4j-api/jars/slf4j-api-1.7.12.jar:/Users/dan/.ivy2/cache/org.yaml/snakeyaml/bundles/snakeyaml-1.12.jar:/Users/dan/dev/elastic4s/elastic4s-jackson/target/scala-2.11/test-classes:/Users/dan/dev/elastic4s/elastic4s-jackson/target/scala-2.11/classes:/Users/dan/dev/elastic4s/elastic4s-testkit/target/scala-2.11/classes:/opt/activator-dist-1.3.5/repository/org.scala-lang/scala-reflect/2.11.2/jars/scala-reflect.jar:/Users/dan/.ivy2/cache/com.beust/jcommander/jars/jcommander-1.12.jar:/Users/dan/.ivy2/cache/com.typesafe/config/bundles/config-1.2.1.jar:/Users/dan/.ivy2/cache/com.typesafe.akka/akka-actor_2.11/jars/akka-actor_2.11-2.3.11.jar:/Users/dan/.ivy2/cache/junit/junit/jars/junit-3.8.1.jar:/opt/activator-dist-1.3.5/repository/org.beanshell/bsh/2.0b4/jars/bsh.jar:/Users/dan/.ivy2/cache/org.reactivestreams/reactive-streams/jars/reactive-streams-1.0.0.jar:/Users/dan/.ivy2/cache/org.reactivestreams/reactive-streams-examples/jars/reactive-streams-examples-1.0.0.jar:/Users/dan/.ivy2/cache/org.reactivestreams/reactive-streams-tck/jars/reactive-streams-tck-1.0.0.jar:/Users/dan/.ivy2/cache/org.testng/testng/jars/testng-5.14.10.jar:/Applications/IntelliJ IDEA 14.app/Contents/lib/idea_rt.jar" com.intellij.rt.execution.application.AppMain org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner -s com.sksamuel.elastic4s.streams.ScrollPublisherIntegrationTest -showProgressMessages true -C org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestReporter
Testing started at 9:35 PM ...
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
[17-Jul 21:35:37:963] INFO  [ScalaTest-run                           ] node.<init> - [Cornelius van Lunt] version[1.6.0], pid[6049], build[cdd3ac4/2015-06-09T13:36:34Z]
[17-Jul 21:35:37:966] INFO  [ScalaTest-run                           ] node.<init> - [Cornelius van Lunt] initializing ...
[17-Jul 21:35:37:969] INFO  [ScalaTest-run                           ] plugins.<init> - [Cornelius van Lunt] loaded [], sites []
[17-Jul 21:35:38:017] INFO  [ScalaTest-run                           ] env.maybeLogPathDetails - [Cornelius van Lunt] using [1] data paths, mounts [[/ (/dev/disk1)]], net usable_space [251.6gb], net total_space [464.7gb], types [hfs]
[17-Jul 21:35:38:825] WARN  [ScalaTest-run                           ] bootstrap.<clinit> - JNA not found. native methods will be disabled.
[17-Jul 21:35:39:013] WARN  [ScalaTest-run                           ] script.processDisableDynamicDeprecatedSetting - [Cornelius van Lunt] deprecated setting [script.disable_dynamic] is set, replace with fine-grained scripting settings (e.g. script.inline, script.indexed, script.file)
[17-Jul 21:35:39:132] INFO  [ScalaTest-run                           ] node.<init> - [Cornelius van Lunt] initialized
[17-Jul 21:35:39:132] INFO  [ScalaTest-run                           ] node.start - [Cornelius van Lunt] starting ...
[17-Jul 21:35:39:134] INFO  [ScalaTest-run                           ] transport.doStart - [Cornelius van Lunt] bound_address {local[1]}, publish_address {local[1]}
[17-Jul 21:35:39:142] INFO  [ScalaTest-run                           ] discovery.doStart - [Cornelius van Lunt] elasticsearch/EJ3NosLDR-uBMWfFQ_0IwQ
[17-Jul 21:35:39:143] INFO  [an Lunt][clusterService#updateTask][T#1]] service.run - [Cornelius van Lunt] master {new [Cornelius van Lunt][EJ3NosLDR-uBMWfFQ_0IwQ][put-it-in-the-vault.local][local[1]]{http.enabled=true, local=true}}, removed {[Cornelius van Lunt][z6ixnz-XRrO_uvsE6gjKEQ][put-it-in-the-vault.local][local[1]]{http.enabled=true, local=true},}, reason: local-disco-initial_connect(master)
[17-Jul 21:35:39:228] INFO  [an Lunt][clusterService#updateTask][T#1]] gateway.clusterStateProcessed - [Cornelius van Lunt] recovered [0] indices into cluster_state
[17-Jul 21:35:39:243] INFO  [ScalaTest-run                           ] http.doStart - [Cornelius van Lunt] bound_address {inet[/0:0:0:0:0:0:0:0:9200]}, publish_address {inet[/10.0.1.104:9200]}
[17-Jul 21:35:39:244] INFO  [ScalaTest-run                           ] node.start - [Cornelius van Lunt] started
[17-Jul 21:35:39:406] INFO  [an Lunt][clusterService#updateTask][T#1]] metadata.execute - [Cornelius van Lunt] [scrollpubint] creating index, cause [api], templates [], shards [1]/[0], mappings []
[17-Jul 21:35:39:783] INFO  [an Lunt][clusterService#updateTask][T#1]] metadata.processIndexMappingTasks - [Cornelius van Lunt] [scrollpubint] update_mapping [emperors] (dynamic)Queue is empty, querying with scroll id [null].
Response: [{
  "_scroll_id" : "cXVlcnlBbmRGZXRjaDsxOzE6RUozTm9zTERSLXVCTVdmRlFfMEl3UTswOw==",
  "took" : 34,
  "timed_out" : false,
  "_shards" : {
    "total" : 1,
    "successful" : 1,
    "failed" : 0
  },
  "hits" : {
    "total" : 19,
    "max_score" : 1.0,
    "hits" : [ Augustus -> Titus ]
  }
}].
Got response, switching scroll id from [null] to [cXVlcnlBbmRGZXRjaDsxOzE6RUozTm9zTERSLXVCTVdmRlFfMEl3UTswOw==]
AU6bzGGaa8_2uf2LEVko - {"name":"Augustus"}
AU6bzGGaa8_2uf2LEVkp - {"name":"Tiberius"}
AU6bzGGaa8_2uf2LEVkq - {"name":"Caligua"}
AU6bzGGaa8_2uf2LEVkr - {"name":"Claudius"}
AU6bzGGaa8_2uf2LEVks - {"name":"Nero"}
AU6bzGGaa8_2uf2LEVkt - {"name":"Galba"}
AU6bzGGaa8_2uf2LEVku - {"name":"Otho"}
AU6bzGGaa8_2uf2LEVkv - {"name":"Vitellius"}
AU6bzGGaa8_2uf2LEVkw - {"name":"Vespasian"}
AU6bzGGaa8_2uf2LEVkx - {"name":"Titus"}
Queue is empty, querying with scroll id [cXVlcnlBbmRGZXRjaDsxOzE6RUozTm9zTERSLXVCTVdmRlFfMEl3UTswOw==].
Response: [{
  "took" : 3,
  "timed_out" : false,
  "_shards" : {
    "total" : 1,
    "successful" : 1,
    "failed" : 0
  },
  "hits" : {
    "total" : 19,
    "max_score" : null,
    "hits" : [ Domitian ->  Diocletion ]
  }
}].
Got response, switching scroll id from [cXVlcnlBbmRGZXRjaDsxOzE6RUozTm9zTERSLXVCTVdmRlFfMEl3UTswOw==] to [null]
AU6bzGGaa8_2uf2LEVky - {"name":"Domitian"}
AU6bzGGaa8_2uf2LEVkz - {"name":"Nerva"}
AU6bzGGaa8_2uf2LEVk0 - {"name":"Trajan"}
AU6bzGGaa8_2uf2LEVk1 - {"name":"Hadrian"}
AU6bzGGaa8_2uf2LEVk2 - {"name":"Antoninus Pius"}
AU6bzGGba8_2uf2LEVk3 - {"name":"Marcus Aurelius"}
AU6bzGGba8_2uf2LEVk4 - {"name":"Commodus"}
AU6bzGGba8_2uf2LEVk5 - {"name":"Pertinax"}
AU6bzGGba8_2uf2LEVk6 - {"name":"Diocletion"}
Queue is empty, querying with scroll id [null].
Response: [{
  "_scroll_id" : "cXVlcnlBbmRGZXRjaDsxOzI6RUozTm9zTERSLXVCTVdmRlFfMEl3UTswOw==",
  "took" : 2,
  "timed_out" : false,
  "_shards" : {
    "total" : 1,
    "successful" : 1,
    "failed" : 0
  },
  "hits" : {
    "total" : 19,
    "max_score" : 1.0,
    "hits" : [ Augustus -> Titus ]
  }
}].
Got response, switching scroll id from [null] to [cXVlcnlBbmRGZXRjaDsxOzI6RUozTm9zTERSLXVCTVdmRlFfMEl3UTswOw==]
sksamuel commented 9 years ago

Good spot @dspasojevic Absolutely the search scroll id should have a further keep alive on it.

sksamuel commented 9 years ago

PR most welcome!

sksamuel commented 9 years ago

@dspasojevic how did you find the streams implementation overall?

sksamuel commented 9 years ago

I've published 1.6.6 which has this fix and another relating to streams.