qubole / streaminglens

Qubole Streaminglens tool for tuning Spark Structured Streaming Pipelines
http://www.qubole.com
Apache License 2.0
17 stars 5 forks source link

StreamingLens Insights always showing "Streaming Query State: NONEWBATCHES" in Logs. #5

Open rpatid10 opened 3 years ago

rpatid10 commented 3 years ago

Hi All,

I am using StreamingLens in my spark structure streaming application but it's always showing same logs .BatchId is getting updated but Streaming Query State: NONEWBATCHES remains same. can someone suggest why the State and recommendations are not updating in logs.

** StreamingLens Insights ** BatchId: 344 Analysis Time: 00s 000ms Expected Micro Batch SLA: 120s 000ms Batch Running Time: 00s 000ms Critical Time: 00s 000ms Streaming Query State: NONEWBATCHES

21/10/01 15:50:04 WARN QueryInsightsManager: Streaming Lens failed key not found: BatchDescription(e68c3c2c-6d5f-469e-864a-)

Spark Submit Command:

spark-submit \ --verbose \ --name SparkStreamingLens \ --num-executors 1 \ --conf streamingLens.reporter.intervalMinutes=1 \ --jars /home/abc/jars/spark-streaminglens_2.11-0.5.3.jar,\ /home/abc/jars/kafka-clients-0.10.2.1.jar,\ --master yarn \ --deploy-mode cluster \ --driver-cores 1 --driver-memory 2G --executor-cores 1 --executor-memory 2G \ --supervise --class com.data.datalake.SparkStreamingLens \ /home/abc/jar/SparkStreamingLens-spark-utility_2.11-1.0.jar

@abhishekd0907 @itsvikramagr @shubhamtagra @jsensarma @mjose007 @akumarb2010 @itsvikramagr @Indu-sharma @akumarb2010 @iamrohit @beriaanirudh @mayurdb @michaelmior @rishitesh @emlyn @vrajat @fdemesmaeker @indit-qubole Kindly Suggest.

Kindly Guide if is there anything needs to change here.

https://github.com/qubole/streaminglens/blob/master/src/main/scala/com/qubole/spark/streaminglens/common/results/AggregateStateResults.scala

https://github.com/qubole/streaminglens/blob/master/src/main/scala/com/qubole/spark/streaminglens/common/results/StreamingCriticalPathResults.scala

As in Project (com.qubole.spark.streaminglens.QueryInsightsManager) below code is available to fetch the insights.

** StreamingLens Inisights BatchId: ${results.batchId} Analysis Time: ${pd(results.analysisTime)} Expected Micro Batch SLA: ${pd(results.streamingCriticalPathResults.expectedMicroBatchSLA)} Batch Running Time: ${pd(results.streamingCriticalPathResults.batchRunningTime)} Critical Time: ${pd(results.streamingCriticalPathResults.criticalTime)} Streaming Query State: ${results.streamingCriticalPathResults.streamingQueryState.toString}
      """.stripMargin)**

Here we are taking all the details from streamingCriticalPathResults and here only code available for **NONEWBATCH State**

case class StreamingCriticalPathResults(expectedMicroBatchSLA: Long = 0, batchRunningTime: Long = 0, criticalTime: Long = 0, streamingQueryState: StreamingState.Value = StreamingState.NONEWBATCHES)

Also in com.qubole.spark.streaminglens.common.results AggregateStateResults.scala below code is available.

**package com.qubole.spark.streaminglens.common.results

case class AggregateStateResults(state: String = "NO NEW BATCHES",

                         recommendation: String = "Streaming Query State: NO NEW BATCHES<br>")**

KIndly Suggest.

rpatid10 commented 3 years ago

Someone kindly help. Or suggest if is thr any other support channel is available for the same. I.e. slack channel.

rpatid10 commented 3 years ago

@abhishekd0907 kindly help.

abhishekd0907 commented 3 years ago

Can you attach full driver logs?

rpatid10 commented 3 years ago

@abhishekd0907 I Have attached the Log.Kindly Check.

@abhishekd0907 let me know if I need to share any other logs also.

Thanks.

abhishekd0907 commented 3 years ago

Streaminglens was build and tested with Spark 2.4 Application is using Spark 2.2 There has been change in some internal APIs between Spark 2.2 and Spark 2.4 which Streaminglens uses, so present code is not working with Spark 2.2 and leading to following error.

21/10/01 15:50:04 WARN QueryInsightsManager: Streaming Lens failed key not found: BatchDescription(e68c3c2c-6d5f-469e-864a-5d353d6e4bc2,2)
rpatid10 commented 3 years ago

Streaminglens was build and tested with Spark 2.4 Application is using Spark 2.2 There has been change in some internal APIs between Spark 2.2 and Spark 2.4 which Streaminglens uses, so present code is not working with Spark 2.2 and leading to following error.

21/10/01 15:50:04 WARN QueryInsightsManager: Streaming Lens failed key not found: BatchDescription(e68c3c2c-6d5f-469e-864a-5d353d6e4bc2,2)

@abhishekd0907 but this warning is not showing when I have removed the checkpoint location.all other details are same when I am removing checkpoint Location .I am attaching Fresh Location with new CheckPoint Location. Kindly Suggest.

abhishekd0907 commented 3 years ago

In the new logs2.txt without checkpoint location, new batches are not even being created. I can see only

21/10/03 12:44:24 INFO HoodieStreamingSink: Micro batch id=0 succeeded

but no logs for micro batches 1,2,3,.... and so on. This is because of removing checkpoint location. Since new batches are not being created, Streaminglens has nothing to analyze so there are no logs,

rpatid10 commented 3 years ago

@abhishekd0907 Okay Thanks a lot .So there Is no other way to Use StreamingLens with Spark 2.2 ryt? or is there any workaround we can do to use StreamingLens with Spark 2.2.? I am trying to do it I will Share some more logs by tomorrow morning if any new logs shows with Micro batch id value and without this warning "WARN QueryInsightsManager: Streaming Lens failed key not found: BatchDescription".

rpatid10 commented 3 years ago

Hi @abhishekd0907 ,

I am able to Remove this warning ,was getting this warning because my application batch interval was 4 min which was less than the default analysis Interval of StreamingLense i.e. 5 min. So currentTime - lastAnalyzedTimeMills >= streamingLensConfig.analysisIntervalMinutes 60 1000 this condition was giving false as boolean value and 'logWarning(s"Streaming Lens failed " + e.getMessage)' and this line was giving warning.

I was debugging the issue and observed below points.

  1. In QueryInsightsManager.scala.

if (insights.streamingCriticalPathResults.streamingQueryState.equals(StreamingState.ERROR)) { throw new SparkException("Unexpected Error or Timeout occurred during Analysis") } streamingLensResultsBuffer.enqueue(insights) eventsReporter.foreach(.sendEvent()) } }_

This value always will be insights.streamingCriticalPathResults.streamingQueryState 'NONEWBATCHES' as in StreamingCriticalPathResults only 'NONEWBATCHES' state specific code is available. and it is always giving no new batch and taking expectedMicroBatchSLA: Long = 0, batchRunningTime: Long = 0, criticalTime: Long = 0, streamingQueryState: StreamingState.Value = StreamingState.NONEWBATCHES) as default values from StreamingCriticalPathResults.

case class StreamingCriticalPathResults(expectedMicroBatchSLA: Long = 0, batchRunningTime: Long = 0, criticalTime: Long = 0, streamingQueryState: StreamingState.Value = StreamingState.NONEWBATCHES)

private def startStreamingAnalysis(queryProgress: QueryProgress): Unit = { val currentTime = System.currentTimeMillis() if (shouldTriggerAnalysis(currentTime)) { val insights = streamingQueryAnalyzer.analyze(queryProgress) logResultsIfNecessary(insights) println("Insides form startStreamingAnalysis method of QueryInsightManager" +insights) lastAnalyzedTimeMills = currentTime println("insights.streamingCriticalPathResults.streamingQueryState value : " +insights.streamingCriticalPathResults.streamingQueryState) if (insights.streamingCriticalPathResults.streamingQueryState.equals(StreamingState.ERROR)) { throw new SparkException("Unexpected Error or Timeout occurred during Analysis") } streamingLensResultsBuffer.enqueue(insights) eventsReporter.foreach(.sendEvent()) } }_**

This Block is always taking Values from StreamingCriticalPathResults.streamingQueryState which is NONEWBATCHES.

Kindly see the below logs and Suggest.

QueryInsightsManager.analysis block results:

value of queryProgress : QueryProgress(22,12900529-7063-4446-a4b8-b58a94194e89,2021-10-04T21:07:43.483Z,3,0.22653477308766895) lastProgress.batchId=22 lastProgress.id=12900529-7063-4446-a4b8-b58a94194e89 lastProgress.timestamp=2021-10-04T21:07:43.483Z lastProgress.numInputRows=3 lastProgress.processedRowsPerSecond=0.22653477308766895 currentTime : 1633381676786 analysisIntervalMinutes time: 300000, value of default interval in milliseconds this should be <= application batch interval time in milliseconds : 300000

StreamingQueryAnalyzer.scala Results : queryProgress : QueryProgress(22,12900529-7063-4446-a4b8-b58a94194e89,2021-10-04T21:07:43.483Z,3,0.22653477308766895) lastAnalyzedBatchId : -1 batchStartAndEndTimes : (1633381663483,1633381676726) batchStartAndEndTimes._1 : 1633381663483 batchStartAndEndTimes._2 : 1633381676726 batchRunningTime : 13243 batchDescription : BatchDescription(12900529-7063-4446-a4b8-b58a94194e89,22)

QueryInsightsManager.analysisTask() results:

value of queryProgress : QueryProgress(23,12900529-7063-4446-a4b8-b58a94194e89,2021-10-04T21:10:00.000Z,0,0.0) lastProgress.batchId=23 lastProgress.id=12900529-7063-4446-a4b8-b58a94194e89 lastProgress.timestamp=2021-10-04T21:10:00.000Z lastProgress.numInputRows=0 lastProgress.processedRowsPerSecond=0.0 currentTime : 1633381800115 analysisIntervalMinutes time: 300000 value of default interval in milliseconds this should be <= application batch interval time in milliseconds : 300000


Note: here also if batchrunningtime is '0' (no data is available to process) then only its printing the insights by calling (StreamingCriticalPathResults(300000,0,0,NONEWBATCHES)) and generating logs. and if batch running time is greater then 0(data is available to process) then is not giving the insights.

StreamingQueryAnalyzer.scala Results : queryProgress : QueryProgress(23,12900529-7063-4446-a4b8-b58a94194e89,2021-10-04T21:10:00.000Z,0,0.0) lastAnalyzedBatchId : -1 batchStartAndEndTimes : (1633381800000,0) batchStartAndEndTimes._1 : 1633381800000 batchStartAndEndTimes._2 : 0 insightsStreamingLensResults(23,0,StreamingCriticalPathResults(300000,0,0,NONEWBATCHES)) Hello I am Inside QueryInsightsManager.scala Try Block Streaming Query Analyzer Results : queryProgress : QueryProgress(23,12900529-7063-4446-a4b8-b58a94194e89,2021-10-04T21:10:00.000Z,0,0.0) lastAnalyzedBatchId : -1 batchStartAndEndTimes : (1633381800000,0) batchStartAndEndTimes._1 : 1633381800000 batchStartAndEndTimes._2 : 0

Insides form startStreamingAnalysis method of QueryInsightManager StreamingLensResults(23,0,StreamingCriticalPathResults(120000,0,0,NONEWBATCHES)) insights.streamingCriticalPathResults.streamingQueryState value : NONEWBATCHES

value of queryProgress : QueryProgress(23,12900529-7063-4446-a4b8-b58a94194e89,2021-10-04T21:15:00.000Z,0,0.0)

lastProgress.batchId=23 lastProgress.id=12900529-7063-4446-a4b8-b58a94194e89 lastProgress.timestamp=2021-10-04T21:15:00.000Z lastProgress.numInputRows=0 lastProgress.processedRowsPerSecond=0.0 currentTime : 1633382100114 lastAnalyzedTimeMills: 1633381800123 analysisIntervalMinutes time: 120000 value of default interval in milliseconds this should be <= application batch interval time in milliseconds : 300000

StreamingQueryAnalyzer.scala Results : Results : queryProgress : QueryProgress(23,12900529-7063-4446-a4b8-b58a94194e89,2021-10-04T21:15:00.000Z,0,0.0) lastAnalyzedBatchId : -1 batchStartAndEndTimes : (1633382100000,0) batchStartAndEndTimes._1 : 1633382100000 batchStartAndEndTimes._2 : 0 insightsStreamingLensResults(23,0,StreamingCriticalPathResults(300000,,0,0,NONEWBATCHES)) Hello I am Inside Try Block Streaming Query Analyzer Results : queryProgress : QueryProgress(23,12900529-7063-4446-a4b8-b58a94194e89,2021-10-04T21:15:00.000Z,0,0.0) lastAnalyzedBatchId : -1 batchStartAndEndTimes : (1633382100000,0) batchStartAndEndTimes._1 : 1633382100000 batchStartAndEndTimes._2 : 0

Insides form startStreamingAnalysis method of QueryInsightsManager.scala QueryInsightManagerStreamingLensResults(23,0,StreamingCriticalPathResults(300000,0,0,NONEWBATCHES)) insights.streamingCriticalPathResults.streamingQueryState value : NONEWBATCHES**


  1. While printing the Insights from QueryInsightsManager.scala.

    StreamingLens Inisights BatchId: ${results.batchId} Analysis Time: ${pd(results.analysisTime)} Expected Micro Batch SLA: ${pd(results.streamingCriticalPathResults.expectedMicroBatchSLA)} Batch Running Time: ${pd(results.streamingCriticalPathResults.batchRunningTime)} Critical Time: ${pd(results.streamingCriticalPathResults.criticalTime)} Streaming Query State: ${results.streamingCriticalPathResults.streamingQueryState.toString}
      """.stripMargin)

    logInfo(logOutput.toString())

expectedMicroBatchSLA,batchRunningTime,criticalTime,streamingQueryState are taking from streamingCriticalPathResults which are hardcoded for NONEWBATCHES.

Kindly suggest if Any Changes are required.

abhishekd0907 commented 2 years ago

Have you made the changes I suggested you?

rpatid10 commented 2 years ago

@abhishekd0907 yes