Open yyyyb opened 5 years ago
spark-submit的使用 使用spark-submit来提交我们的spark应用程序运行的脚本(生产) ./spark-submit --master local[2] --class org.apache.spark.examples.streaming.NetworkWordCount --name NetworkWordCount /App/spark-2.2.0-bin-2.6.0-cdh5.7.0/examples/jars/spark-examples_2.11-2.2.0.jar hadoop000 9999 使用spark-shell来提交(测试) ./spark-shell --master local[2]
import org.apache.spark.streaming.{Seconds, StreamingContext}
val ssc = new StreamingContext(sc, Seconds(1))
val lines = ssc.socketTextStream("hadoop000", 9999)
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
wordCounts.print()
ssc.start()
ssc.awaitTermination()
工作原理:粗粒度 spark streaming接收到实时数据流,把数据按照指定的时间段切成一片片小的数据块,然后把小的数据块传给Spark Engine处理。 工作原理:细粒度
spark streaming核心 核心概念: StreamingContext def this(sparkContext: SparkContext, batchDuration: Duration) = { this(sparkContext,null,batchDuration)} def this(conf:SparkConf, batchDuration: Duration) = { this(StreamingContext.createNewSparkContext(conf),null,batchDuration)} batch interval可以根据你的应用程序需求的延迟要求以及集群可用的资源情况来设置
Discretized Streams(DStreams) Internally, a DStream is represented by a continuous series of RDDs. Each RDD in a DStream contains data from a certain inerval. 对DStream操作算子,比如map/flatMap,其实底层会被翻译为对DStream中的每个RDD都做相同的操作,因为一个DStream是由不同批次的RDD所构成的。
Input DStream and Receivers Every input DStream (except file stream, discussed later in this section) is associated with a Receiver object which receives the data from a source and stores it in Spark’s memory for processing. local[]模式下一定要设置local[]中的数字大于receiver的数量。
Transformations on DStreams 在DStreams上做的一些具体操作,具体有比如map,flatMap,filter等 Output Operations on DStreams 就是将DStream上经过Transformations之后的结果输出出来,有比如print,saveAsTextFiles等操作
updateStateByKey算子 需求:统计到目前为止累积出现的单词的个数(需要保持住以前的状态) 如果使用了stateful的算子,必须配置checkpoint
ForeachRDD: 需求:将统计结果写入到MySQL中 create table wordcount( word varchar(50) default null, wordcount int(10) default null ); 通过该sql将统计结果写入到Mysql insert into wordcount(word,wordcount) values('" + record._1 + "', " + record._2 + “) 存在的问题 1)对于已有的数据没有做更新,而是所有的数据均为insert 改进思路: a)插入数据前先判断单词是否存在,如果存在就update,不存在就insert b)工作中:HBase/Redis
window:定时地进行一个时间段内的数据处理
window length - The duration of the window (3 in the figure).窗口长度
sliding interval - The interval at which the window operation is performed (2 in the figure).窗口间隔
这两个参数和我们的batch size有关系:整数倍的关系
特点: 低延时 能从错误中高效的恢复 能够运行在成百上千的节点 能够将批处理、机器学习、图计算等子框架和spark streaming综合起来使用 安装:One stack to rule them all :一栈式 github:https://github.com/apache/spark