Open bjkonglu opened 6 years ago
在处理一个广告业务时,需要同时处理广告曝光日志和广告点击日志,同时需要知道一段时间内的广告曝光点击率。所以,需要对广告曝光日志和广告点击日志进行join操作。在调研过程中,发现spark-2.3.1版本里的Structured Streaming支持stream-stream joins.
在实现stream-stream的join首先要解决的问题就是避免无限增长的状态,所以在使用spark的stream-stream joins时需要定义watermark,告诉引擎什么时候可以清除老的状态;
同时定义一个事件时间约束,这样引擎那些老的记录将不会再被需求去配置另一个流中的记录。这个时间约束可以用下面两种方式定义:
Time range join conditions (e.g. ...JOIN ON leftTime BETWEN rightTime AND rightTime + INTERVAL 1 HOUR),
Join on event-time windows (e.g. ...JOIN ON leftTimeWindow = rightTimeWindow).
实现stream-stream joins的代码示例如下:
import org.apache.spark.sql.functions.expr val impressions = spark.readStream. ... val clicks = spark.readStream. ... // Apply watermarks on event-time columns val impressionsWithWatermark = impressions.withWatermark("impressionTime", "2 hours") val clicksWithWatermark = clicks.withWatermark("clickTime", "3 hours") // Join with event-time constraints impressionsWithWatermark.join( clicksWithWatermark, expr(""" clickAdId = impressionAdId AND clickTime >= impressionTime AND clickTime <= impressionTime + interval 1 hour """) )
背景
在处理一个广告业务时,需要同时处理广告曝光日志和广告点击日志,同时需要知道一段时间内的广告曝光点击率。所以,需要对广告曝光日志和广告点击日志进行join操作。在调研过程中,发现spark-2.3.1版本里的Structured Streaming支持stream-stream joins.
stream-stream joins操作
在实现stream-stream的join首先要解决的问题就是避免无限增长的状态,所以在使用spark的stream-stream joins时需要定义watermark,告诉引擎什么时候可以清除老的状态;
同时定义一个事件时间约束,这样引擎那些老的记录将不会再被需求去配置另一个流中的记录。这个时间约束可以用下面两种方式定义:
Time range join conditions (e.g. ...JOIN ON leftTime BETWEN rightTime AND rightTime + INTERVAL 1 HOUR),
Join on event-time windows (e.g. ...JOIN ON leftTimeWindow = rightTimeWindow).
实现stream-stream joins的代码示例如下: