allwefantasy / spark-binlog

A library for querying Binlog with Apache Spark structure streaming, for Spark SQL , DataFrames and [MLSQL](https://www.mlsql.tech).
Apache License 2.0
154 stars 54 forks source link

HBase WAL Support Doc Design #7

Closed allwefantasy closed 4 years ago

allwefantasy commented 4 years ago

spark-binlog 基本工作机制以及要求

spark-binlog通过如下两个步骤来实现将任意数据源转换为Spark可使用的数据源:

  1. 启动异步消费线程,消费原始事件,将其转换格式,并写入spark-binlog-WAL(目前放在HDFS上)
  2. spark-binlog-WAL作为底层数据源,对外提供服务。

该工作模式,需要原始事件有一个唯一的递增ID,该ID需要满足两个目标:

  1. 唯一标记一个原始事件
  2. 可以体现事件的发生先后顺序

spark-binlog-WAL也支持分区存储,如果事件要求能够稳定的映射到某个分区,则事件ID仅需满足在本分区内唯一递增,无需全局。

HBase WAL

HBase WAL(HLog) 记录了HBase写入相关的事件。对HBase WAL我们核心是需要给他找到每个event的递增id.

目前来看,似乎难以生成该针对每个event的递增id.但是,考虑到WAL 本质上是记录每个regioin对应的事件,并且该事件有一个基于region 内递增的sequenceId,这意味着,假设我们有10个region,那么我们读取这些region的HLog并且将其写入10个spark-binlog-WAL,这样每个文件只包含了对应region的event,sequenceId在这个文件里便是单调递增的。 这意味着,spark可以记录多个(startoffset,endffset)对,并且在必要的时候重复读取。

缺点也很明显,就是如果region数足够多,会导致spark-binlog 文件数过多。

逻辑图:

HBase HLog In HDFS -> spark-binlog ->
                                              events in region 0  --start/end range mapping-->  spark partition 0
                                              events in region 1    -s/e range mapping->  spark partition 1
                                              events in region 2   -s/e range mapping-->  spark partition 2
                                               。。。。。
                                              events in region n     -s/e range mapping-->  spark partition 3

Region变化

HBase会根据情况动态分裂出新的Region。这意味着spark-binlog-WAL分区数会根据Region数动态调整。

HBase WAL 容灾

HBase WAL会将已经flush掉的HLog file 从WALs 转移到oldWALs,并且最后在oldWALs中会被转化为pv2-xxxxxxxxxxxxxnumber.log 文件。

spark-binlog 会在内存中记录当前已经commit到自己WAL的所有region的最新sequenceId,以及没有commit到自己WAL还在内存里所有region的最新sequenceId。会进行比较,避免重复提交。如果在读取一个hlog时,发现文件已经被移动,那么放弃读取,然后重新到oldWAL寻找文件,并且重新读取,读取过程中,会比较还未commit到WAL中的regionzuixin sequenceId,如果小于则会丢弃,大于则更新最新sequenceid.

假设此时executor节点挂掉,那么当我们消费时,我们会先到WAL进行消费,得到所有region的最新sequenceId,然后基于此进行读取。

支持版本

目前我们只考虑hbase 2.x,并且第一阶段我们只支持一次订阅一个表。避免用户误用订阅了过多表导致region过多产生太多WAL文件。

allwefantasy commented 4 years ago

HBaseWALClient 设计

类似MySQL里的BinaryLogClient, 我们也需要实现HBaseWALClient用以获取HLog的事件。HBaseWALClient 的设计总体会采用异步回调,他会不断的读取HLog里的Event(WALKey/WALEdit),并且触发一个回调函数。需要注册自己的函数:

hbaseWALClient.registerEventListener(new HBaseWALClient.EventListener() {
def onEvent(event: HBaseWALEvent): Unit = {
  val walKey = event.getHeader()
  val walEdit = event.getBody()
   ........  
}

onEvent里的事件可以重复发生,即保证至少发生一次。

之后在使用HBaseWALClient时,我们需要维护两个Map

committedRegionToOffsetMap 该map持有region -> 已经持久化到了spark-binlog-WAL最新sequecneId 的映射关系。
regionToOffsetMap 该map持有region -> 从HBaseWALClient接受到的最新sequecneId 的映射关系。

通过这两个map实现failover 以及避免重复消费。

HBaseWALClient应该接收构造函数(hbaseWALHDFSPath:String,startTime:Long). 每次重新消费HLog时,我们会通过startTime来控制从哪里开始重新消费,外部通过committedRegionToOffsetMap来保证在此之后的数据才会真正被处理。

aidahz commented 4 years ago

@allwefantasy 疑问1:spark-binlog-WAL 最终存储的格式是什么? 疑问2: 对源数据的binlog,启动异步消费线程,这个是一直守护,实时解析的还是怎样的逻辑?

allwefantasy commented 4 years ago

@allwefantasy 疑问1:spark-binlog-WAL 最终存储的格式是什么? 疑问2: 对源数据的binlog,启动异步消费线程,这个是一直守护,实时解析的还是怎样的逻辑?

针对疑问1:

spark-binlog-WAL 是一个标准Write Ahead Log实现, 二进制存储,存在在HDFS上。适合顺序读取,支持按时间过滤文件。

针对疑问2:

消费最原始数据源的线程是一直守护,持续读取的。譬如这里我们会通过HBaseWALClient 持续轮训HBase WAL目录,然后读取新文件,解析文件里的事件,通过回调调用用户的代码实现。

wypb commented 4 years ago

总体设计挺好的。

不过 HBase WAL 是 RegionServer 级别的,一个 RegionServer 对应一个 WAL,一个 RegionServer 包含 N 个 Region,这 N 个 Region 都是往同一个 WAL 写数据的。

WAL 里面一条记录存储了 HBase 里面的一列,比如 HBase 里面一行数据有 m 列,那么最终解析出来的WAL 会有 m 行的数据与之对应。对于一张频繁变化的表,这可能会导致 spark-binlog-WAL 最终存储到 HDFS 上的数据很大呢。

allwefantasy commented 4 years ago

总体设计挺好的。

不过 HBase WAL 是 RegionServer 级别的,一个 RegionServer 对应一个 WAL,一个 RegionServer 包含 N 个 Region,这 N 个 Region 都是往同一个 WAL 写数据的。

WAL 里面一条记录存储了 HBase 里面的一列,比如 HBase 里面一行数据有 m 列,那么最终解析出来的WAL 会有 m 行的数据与之对应。对于一张频繁变化的表,这可能会导致 spark-binlog-WAL 最终存储到 HDFS 上的数据很大呢。

因为我们是以region 为分区,里面的sequenceIid为编号,所以多个RegionServer影响不大。我们需要读取多个RegionServer下的文件即可。对于数据大的,我们应该需要对spark-binlog-WAL控制保留多久的数据。比如只保留最近一小时的spark-binlog-WAL,用于方便后面做回溯。本质上 spark-binlog-WAL其实就是起了Kafka的作用。