knockdata / spark-highcharts

Support Highcharts in Apache Zeppelin
Apache License 2.0
81 stars 14 forks source link

spark-highcharts not plotting with kinesis stream #31

Open plamen-paskov opened 7 years ago

plamen-paskov commented 7 years ago

I'm using zeppelin with kinesis stream like this:

import com.knockdata.spark.highcharts._
import com.knockdata.spark.highcharts.model._
import org.apache.spark.sql.types.{StructType,StructField,StringType,IntegerType}

spark.conf.set("spark.sql.streaming.checkpointLocation", "/tmp/test")

val df = spark
  .readStream
  .format("kinesis")
  .option("streams", "bla")
  .option("endpointUrl", "kinesis.us-west-2.amazonaws.com")
  .option("initialPositionInStream", "earliest")
  .option("format", "csv")
  .schema(
    StructType(
      StructField("device_id", StringType) ::
      StructField("temperature", IntegerType) ::
      Nil
    ))
  .load

val query = highcharts(df.seriesCol("device_id").series("y" -> col("temperature")), z, "append")

in the next zeppelin paragraph i have :

StreamingChart(z)

but the chart is not displayed. When i inspect the browser's console there are some javascript errors. I'm trying to automatically update the chart when new data arrive in the stream. Any suggestions what i'm doing wrong?

Infrastructure:

Screenshot: https://i.stack.imgur.com/HWIn0.png

rockie-yang commented 7 years ago

Have you includes Highcharts js files in Zeppelin?

You can first test with a normal plot instead of streaming.

plamen-paskov commented 7 years ago

I forgot to include the highcharts js files. Actually i expected spark-highcharts to include it. Now the js errors are gone but the call to StreamingChart(z) display just "null".

When i try with this sample code it's working just fine:

import com.knockdata.spark.highcharts._
import com.knockdata.spark.highcharts.model._
import sqlContext.implicits._

val Tokyo = Seq(7.0, 6.9, 9.5, 14.5, 18.2, 21.5, 25.2, 26.5, 23.3, 18.3, 13.9, 9.6)
    .map(("Tokyo", _))
val NewYork = Seq(-0.2, 0.8, 5.7, 11.3, 17.0, 22.0, 24.8, 24.1, 20.1, 14.1, 8.6, 2.5)
  .map(("New York", _))
val Berlin = Seq(-0.9, 0.6, 3.5, 8.4, 13.5, 17.0, 18.6, 17.9, 14.3, 9.0, 3.9, 1.0)
  .map(("Berlin", _))
val London = Seq(3.9, 4.2, 5.7, 8.5, 11.9, 15.2, 17.0, 16.6, 14.2, 10.3, 6.6, 4.8)
  .map(("London", _))

val dataFrame = (Tokyo ++ NewYork ++ Berlin ++ London).toDF("city", "temperature")

dataFrame.show()

val chart = highcharts(dataFrame
  .seriesCol("city")
  .series("y" -> col("temperature")))
  chart.plot()
rockie-yang commented 7 years ago

are you using spark-highcharts:0.6.5?

plamen-paskov commented 7 years ago

Yes

rockie-yang commented 7 years ago

It works on Sample data. I just tested using zeppelin-highcharts container

You can replace StreamingChart(z) with these two lines.

import org.apache.zeppelin.interpreter.InterpreterContext
z.get(InterpreterContext.get().getParagraphId)

It shall print a <div> and <script>

plamen-paskov commented 7 years ago

i replaced the call to StreamingChart(z) with the lines you gave but it's returning:

import org.apache.zeppelin.interpreter.InterpreterContext
res111: Object = null
rockie-yang commented 7 years ago

The content is write during the execution of streaming. It means nothing has been set if it's empty.

z.get(InterpreterContext.get().getParagraphId) 

Please check if it's using spark-highcharts:0.6.5, one of the previous version has issue.

You can tested with the zeppelin-highcharts container.

plamen-paskov commented 7 years ago

Here is what i tested:

paragraph 1:

%angular
<script type="text/javascript">
    $(function () {
        if (typeof Highcharts == "undefined") {
            $.getScript("http://code.highcharts.com/highcharts.js")
              .done(function( script, textStatus ) {
                console.log( "load http://code.highcharts.com/highcharts.js " + textStatus );
              })
              .fail(function(jqxhr, settings, exception ) {
                 console.log("load http://code.highcharts.com/highcharts.js " + exception);
              });
        } else {
            console.log("highcharts already loaded");
        }
    });
</script>

paragraph 2:

import org.apache.spark.sql.execution.streaming.MemoryStream

implicit val ctx = spark.sqlContext

case class NuclearStockpile(country: String, stockpile: Int, year: Int)

val input = MemoryStream[NuclearStockpile]

spark.conf.set("spark.sql.streaming.checkpointLocation","/tmp/test3")

val USA = Seq(0, 0, 0, 0, 0, 6, 11, 32, 110, 235, 369, 640,
  1005, 1436, 2063, 3057, 4618, 6444, 9822, 15468, 20434, 24126,
  27387, 29459, 31056, 31982, 32040, 31233, 29224, 27342, 26662,
  26956, 27912, 28999, 28965, 27826, 25579, 25722, 24826, 24605,
  24304, 23464, 23708, 24099, 24357, 24237, 24401, 24344, 23586,
  22380, 21004, 17287, 14747, 13076, 12555, 12144, 11009, 10950,
  10871, 10824, 10577, 10527, 10475, 10421, 10358, 10295, 10104).
    zip(1940 to 2006).map(p => NuclearStockpile("USA", p._1, p._2))

val USSR = Seq(0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
  5, 25, 50, 120, 150, 200, 426, 660, 869, 1060, 1605, 2471, 3322,
  4238, 5221, 6129, 7089, 8339, 9399, 10538, 11643, 13092, 14478,
  15915, 17385, 19055, 21205, 23044, 25393, 27935, 30062, 32049,
  33952, 35804, 37431, 39197, 45000, 43000, 41000, 39000, 37000,
  35000, 33000, 31000, 29000, 27000, 25000, 24000, 23000, 22000,
  21000, 20000, 19000, 18000, 18000, 17000, 16000).
    zip(1940 to 2006).map(p => NuclearStockpile("USSR/Russia", p._1, p._2))

input.addData(USA.take(30) ++ USSR.take(30))
val structureDataFrame = input.toDF

paragraph 3:

import com.knockdata.spark.highcharts._
import com.knockdata.spark.highcharts.model._

val query = highcharts(
  structureDataFrame.seriesCol("country")
    .series("x" -> "year", "y" -> "stockpile")
    .orderBy(col("year")), z, "append")

paragraph 4:

StreamingChart(z)

paragraph 5 (to update the chart) :

input.addData(USA.drop(30) ++ USSR.drop(30))

the result when i run paragraph 4 is : null

I looked at src/main/scala/com/knockdata/spark/highcharts/CustomSinkProvider.scala and found these lines:

z.put(chartParagraphId, plotData)
println(s"run $chartParagraphId")
z.run(chartParagraphId)

Correct me if i'm wrong but z.put() will set a variable not a paragraph text?

rockie-yang commented 7 years ago

z.put is to set the data to ZeppelinContext variable. It will be use by StreamingChart(z).

I had tested with the same code in docker environment.

Can you share the screenshot for the spark-highcharts version?

plamen-paskov commented 7 years ago

sure, here it is: https://i.imgur.com/8fc9Vvx.png i downloaded it from : http://central.maven.org/maven2/com/knockdata/spark-highcharts/0.6.5/spark-highcharts-0.6.5.jar

Question: do you run awaitTermination() in your tests ? when i call query.awaitTermination i can see that the new incoming data is written to the sink but running the paragraph that will print the angular code to update the chart is stuck in pending state and never finish. also i cannot cancel the paragraph that is writing to the custom sink. I also found an issue in zeppelin board that describe exactly what i'm expiriencing: https://issues.apache.org/jira/browse/ZEPPELIN-2563 I see an exception in the log file as well:


WARN [2017-10-03 12:03:04,386] ({Thread-21} RemoteInterpreterProcess.java[releaseClient]:106) - exception occurred during releasing thrift client
java.lang.IllegalStateException: Object has already been returned to this pool or is invalid
        at org.apache.commons.pool2.impl.GenericObjectPool.returnObject(GenericObjectPool.java:599)
        at org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcess.releaseClient(RemoteInterpreterProcess.java:104)
        at org.apache.zeppelin.interpreter.remote.RemoteInterpreterEventPoller.progressRemoteZeppelinControlEvent(RemoteInterpreterEventPoller.java:325)
        at org.apache.zeppelin.interpreter.remote.RemoteInterpreterEventPoller.run(RemoteInterpreterEventPoller.java:231)```
rockie-yang commented 7 years ago

No. It does not need awaitTermination.

I have extract the same code. and it works. It will be really hard to know what the problem extract is without seeing your real environment.

I attached the note book here. Which is working for me.

streaming.json.zip

plamen-paskov commented 7 years ago

I removed the awaitTermination and it seems it's working fine now. I found something else related to the streaming charts. What i'm trying to achieve is to have continuously updated streaming chart (import the attached notebook to see how i'm doing it).

streaming_chart_kinesis.json.zip

The problem i encountered with this requirement is that when the sink receive the second batch of data the chart paragraph does not run and there is an exception InterpreterException("Can not run current Paragraph") . You will not see this exception anywhere in the logs. You have to manually attach a onFailure callback to the Future in ZeppelinContextHolder (src/main/scala/com/knockdata/spark/highcharts/ZeppelinContextHolder.scala) . It looks like the first time when you run the paragraph containing highcharts() singleton call the sink is able to successfully run the paragraph but when new data arrive later on the stream the current context paragraph id was changed to be the paragraph id of the last running paragraph (in this case it equals the paragraph id of the first sink call). I inspected the zeppelin code and i didn't find a z.run method overload where you can disable the paragraph id validation. A possible solution to this is to update the ZeppelinContextHolder.scala and replace:

Future(z.run(paragraphId))

with something like

    Future({
      val context = z.getInterpreterContext
      val noteId = context.getNoteId

      val runners = z.getInterpreterContextRunner(noteId, paragraphId, context)
      if (runners.size <= 0) throw new InterpreterException("Paragraph " + paragraphId + " not found " + runners.size)
      else {
        val i$ = runners.iterator
        while (i$.hasNext) {
          val r: InterpreterContextRunner = i$.next
          r.run()
        }
      }
    })

which will do the job until zeppelin devs handle this somehow. Without this i will not be able to complete my requirement. If you have another solution to the problem i will appreciate to share it with me.

Thanks

rockie-yang commented 7 years ago

Great to hear it works for you.

The method highcharts only need be invoked once. The internal sink will be triggered when there are new data comes. And new chart will be rendered.

plamen-paskov commented 7 years ago

Consider the following scenario:

Future(z.run(paragraphId))

with

    val f = Future(z.run(paragraphId))

    f.onFailure({
      case err: Throwable => {
        val file = new File("/path/to/file.log")
        val ps = new PrintStream(file)
        err.printStackTrace(ps)
        ps.close()
      }
    })
org.apache.zeppelin.interpreter.InterpreterException: Can not run current Paragraph
        at org.apache.zeppelin.spark.ZeppelinContext.run(ZeppelinContext.java:332)
        at org.apache.zeppelin.spark.ZeppelinContext.run(ZeppelinContext.java:321)
        at com.knockdata.spark.highcharts.ZeppelinContextHolder$$anonfun$1.apply$mcV$sp(ZeppelinContextHolder.scala:17)
        at com.knockdata.spark.highcharts.ZeppelinContextHolder$$anonfun$1.apply(ZeppelinContextHolder.scala:17)
        at com.knockdata.spark.highcharts.ZeppelinContextHolder$$anonfun$1.apply(ZeppelinContextHolder.scala:17)
        at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
        at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
        at scala.concurrent.impl.ExecutionContextImpl$AdaptedForkJoinTask.exec(ExecutionContextImpl.scala:121)
        at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

The exception above is caused by the following validation :

if(paragraphId.equals(context.getParagraphId())) {
    throw new InterpreterException("Can not run current Paragraph");
}

I guess it might be caused by the fact that context paragraph id value was changed to match the id of paragraph 3 after the first event was received by zeppelin. Send the new data via console script and not from zeppelin GUI !!!

One more thing related to streaming data charts: currently what StreamingChart(z) is printing is a call to the jquery highcharts plugin which will cause the chart to "blink" when it's refreshed. To avoid this effect you can add another functionality to print a javascript call to addSeries() which will not reload the whole chart but will just append the new data

rockie-yang commented 7 years ago

The exception indicate that you can not run the current paragraph from the current paragraph.

This is also why we need a separate paragraph just put StreamingChart(z). You can create a separate graph and get id then run it. If you want to run the next paragraph, check the code, which has find the next paragraph id.

That's right, there is a blink when new data comes. The solution you mentioned I had think to add it, why got no time now to work on it. I'm wondering if you have some time create a PR?