lw-lin / CoolplaySpark

酷玩 Spark: Spark 源代码解析、Spark 类库等
3.46k stars 1.41k forks source link

[SS]《3.1 Structured Streaming 之状态存储解析》讨论区 #33

Open lw-lin opened 7 years ago

lw-lin commented 7 years ago

如需要贴代码,请复制以下内容并修改:

public static final thisIsJavaCode;
val thisIsScalaCode

谢谢!

junhero commented 7 years ago

@lw-lin 如果计算count distinct这种算uv的场景statestore方式不能做吧?

lw-lin commented 7 years ago

@junhero

这个跟数据集大小有关。如果数据集非常小,如 user id 的空间很小,那么 statestore 是没有问题的。如果 user id 的空间很大,但每天的 distinct user id 很小,那么 statestore 也是没有问题的。但如果 user id 空间很大,每天的 distinct user id 又很多,那 statestore 就有问题了。可以考虑其它方法如 hyperloglog 等。

junhero commented 7 years ago

谢谢

KevinZwx commented 7 years ago

您好,我想请教一下stateStore里具体存储的是什么内容?我看到在statefulOperators里的一些对state的put操作如下:

val thisIsScalaCode
val getKey = GenerateUnsafeProjection.generate(keyExpressions, child.output)
...
while (iter.hasNext) {
                val row = iter.next().asInstanceOf[UnsafeRow]
                val key = getKey(row)
                store.put(key, row)
                numUpdatedStateRows += 1
              }
lw-lin commented 7 years ago

@KevinZwx 是 UnsafeRow;key 和 value 都是 UnsafeRow。UnsafeRow 在 SparkSQL 模块里相当于 Object 在 Java 里的作用。UnsafeRow 里包含各种类型(数值、字符串等)的具体数据。

KevinZwx commented 7 years ago

好的谢谢

LinMingQiang commented 5 years ago

您好,我想请教下,是不是每次批次的数据在做状态更新的时候都要去hdfs拉一遍对应的stateStore,然后更新完之后再放回hdfs。

lecssmi commented 4 years ago

请问一个可能不算是state的问题。在structured streaming中,两个流之间Join, 但是两个流join的时间范围比较大,比如几个小时。那这部分缓存数据,如果内存存不下,会溢写到磁盘吗?