qubole / streaminglens

Qubole Streaminglens tool for tuning Spark Structured Streaming Pipelines
http://www.qubole.com
Apache License 2.0
17 stars 5 forks source link

Not able to run streaminglens in intellij idea #1

Open mjose007 opened 4 years ago

mjose007 commented 4 years ago

I am running my code locally in intellij idea with sreaming lens maven dependancy . I am getting below error . No output , let me know what i am doing wrong here


package com.manu.sstreaming;

import com.qubole.spark.streaminglens.StreamingLens;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.streaming.StreamingQuery;

import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;

import org.apache.spark.sql.streaming.Trigger;
import scala.Predef;
import scala.collection.JavaConversions.*;
import scala.collection.JavaConverters;
import scala.collection.Seq;

/**
 * @author Manu Jose
 * create on : 16/04/20
 */
public class SStreamingNC {
    public static void main(String[] args) throws Exception {

        String host = "localhost";
        int port = 9999;
        //int port = Integer.parseInt(args[0]);

        SparkSession spark = SparkSession
                .builder()
                .appName("JavaStructuredNetworkWordCount")
                .master("local")
                .getOrCreate();

        Map<String, String> options = new HashMap<>();
        options.put("streamingLens.reporter.intervalMinutes", "1");

        scala.collection.immutable.Map<String, String> scalaMap = JavaConverters.mapAsScalaMapConverter(options).asScala().toMap(
                Predef.conforms());
        StreamingLens streamingLens = new StreamingLens(spark, scalaMap);
        streamingLens.registerListeners();

        // Create DataFrame representing the stream of input lines from connection to host:port
        spark.sql("SET spark.sql.streaming.metricsEnabled=true");
        Dataset<Row> lines = spark
                .readStream()
                .format("socket")
                .option("host", host)
                .option("port", port)
                .load();

        // Split the lines into words
        Dataset<String> words = lines.as(Encoders.STRING()).flatMap(
                (FlatMapFunction<String, String>) x -> Arrays.asList(x.split(" ")).iterator(),
                Encoders.STRING());

        // Generate running word count
        Dataset<Row> wordCounts = words.groupBy("value").count();

        // Start running the query that prints the running counts to the console
        StreamingQuery query = wordCounts.writeStream()
                .outputMode("update")
                .format("console")
                .queryName("Query_name")
                .trigger(Trigger.ProcessingTime(2 * 1000))
                .start();

       spark.streams().awaitAnyTermination();

    }

}
20/05/01 01:06:10 INFO StateStore: Getting StateStoreCoordinatorRef
20/05/01 01:06:10 INFO StateStore: Retrieved reference to StateStoreCoordinator: org.apache.spark.sql.execution.streaming.state.StateStoreCoordinatorRef@11ddc5d8
20/05/01 01:06:10 INFO StateStore: Env is not null
20/05/01 01:06:10 INFO StateStore: Getting StateStoreCoordinatorRef
20/05/01 01:06:10 INFO StateStore: Retrieved reference to StateStoreCoordinator: org.apache.spark.sql.execution.streaming.state.StateStoreCoordinatorRef@c1c5bf
20/05/01 01:06:10 INFO StateStore: Env is not null
20/05/01 01:06:10 INFO StateStore: Getting StateStoreCoordinatorRef
20/05/01 01:06:10 INFO StateStore: Retrieved reference to StateStoreCoordinator: org.apache.spark.sql.execution.streaming.state.StateStoreCoordinatorRef@62726145
mjose007 commented 4 years ago

Any updates

abhishekd0907 commented 4 years ago

@mjose007

Sorry for the delayed response.

Are you able to see any streaming lens related logs (even errors or failures) in your spark driver logs?

Also, you don't need need to registerListeners explicitly, this line is not needed streamingLens.registerListeners();

only this should be sufficient StreamingLens streamingLens = new StreamingLens(spark, scalaMap);

My guess is it would be throwing an exception since you're trying two register the listeners twice based on this code


    try {
      sparkSession.sparkContext.addSparkListener(streamingAppListener)
      logDebug("Successfully registered Spark Listener")
    } catch {
      case e: Exception =>
        throw new SparkException("Error in registering Spark Listener " +
          "Won't report StreamingLens Insights" + e.getMessage)
    }
    try {
      sparkSession.streams.addListener(queryProgressListener)
      logDebug("Successfully registered StreamingQuery Listener")
    } catch {
      case e: Exception =>
        sparkSession.sparkContext.removeSparkListener(streamingAppListener)
        throw new SparkException("Error in registering StreamingQuery Listener " +
          "Won't report StreamingLens Insights" + e.getMessage)
    }

  }
rpatid10 commented 3 years ago

@mjose007 Kindly suggest https://github.com/qubole/streaminglens/issues/5.

For me state is always showing same. NONEWBATCH