Open Roiocam opened 2 years ago
在 Akka 的 Projection 中,以从事件溯源中投影的典型用例为例子。有如下结构图:
从整个架构图上,可以看出 Akka Projection 中,Projection 提供了数据库和 EventHandler
之间的桥梁,而 ProjectionBehavior 承担着 运行时(这个 Actor 运行时提供了隔离、分片、通信的能力)的作用。
相较于 Phoenix 的 EventPublish 的实现,其实 ShardingEventPublishActor
也提供了分片运行的能力,但是 EventPublish 的并行度只能根据数据源实现,这是因为分片的思路不同。Akka Projection 提供并行能力的关键点在于 SouredProvider
接口,SouredProvider 定义了数据的来源(包括 “如何”
在 “某个”
数据存储查询 “哪些”
数据)
因此,深入了解一下 Akka Projection 如何在事件存储上查询事件并存储查询进度(偏移量)的。
Akka Projection 在 EventSoured 的模式下支持两种模式:
Akka Persitence 为事件打上标签的流程如下:
如下图所示,Akka 增加了两张表,在不修改原表结构上,通过增加一张表的形式增加事件属性,并通过一张 Projection 表存储 Projection 的偏移量。
从实现上看,Akka Projection 使用 Tag 方式标记 Event,再后续中查询,以此提供了“并行”的能力,而 Phoenix 这边更像是 EventBySlice 的做法,后续再看一下 EventBySlice 的方式如何实现并行。
ProjectionBehavior 的整体结构如下:
从上面的结构来看,ProjectionBehavior 的能力比较简单,更多的是提供一个 Actor 的能力(异步通信、分片)
通过 Actor 能力,Akka Projection 为 Projection 实现了 Management 功能,也就是开发者可以通过 Command 的方式来查询,控制 以达到管理 Projection 的目的。
Projection 实际上是 Akka Projection 核心的 CQRS 实现部分
以 JDBCProjection 为例子,其 API 如下
从精确一次和最少一次的实现上看,只有两个区别:
那么 akka 是如何使用这两者的,从整个 projection 的原理透视。主要是在 Akka Projection 运行的 Stream 中。
其中 OffsetStrategy 决定了在什么时候保存 offset(例如 atLeastOnce 在 handler 处理完之后,保存 offset;atExactlyOnce 则是在 handler 的事务中已经保存)
HandlerStrategy(也就是 AdaptedJdbcHandler) 决定了如何处理事件. 例如 atExactlyOnce 在 jdbc 事务中先保存 offset 再处理
在前面已经分析了一波 Tag 下的事件读取,现在来看看整个事件读取的抽象。
整个 EventStored 的事件读取模块十分简单,只有 2 个类,并且 API 也不多:
eventByTag 的核心接口是 akka-persistence-query 中的 EventTagQuery
. 目前只有 JDBC、Cassandra 实现
同理,eventBySlice 的实现基于 EventSliceQuery
, 其实现只支持 R2DBC 和 gRPC
Akka 对 handler 也做了一些抽象和实现,并对不同的 Handler 提供了一个 Adapter。
下面是 akka 支持的 handler 模式:
CompletionStage
表示完成其实更像是 JDBC 和 R2DBC 的区别。(目前 r2dbc 模块只支持 postgreSQL)
R2DBC 的场景相对而言是低延迟解决方案的场景,因为整个 R2DBC 的设计理念也是基于 Reactive Programming 的,而 gRPC 则是相较于 Kafka 作为两个微服务事件 CQRS 通信的替代方案。(gRPC 因为直接是 RPC 通信,因此延迟更低,但 Projection-gRPC 也实现了背压机制,基于 Consumer Pull/Request )
最关键的低延迟其实不是 R2DBC,而是在 R2DBC 体系下,akka-projection 支持 broker-less 的 Pub/Sub. 在 R2DBC 的体系下,支持直接将 Event 发布到集群中,然后通过 R2DBC 轮询数据库做为可靠交付的最终保证(虽然仍有轮询数据库,但频率更低)。代价就是更高的 CPU 负载和网络带宽
由 Projection JDBC 可知, Projection 基本的运行都是相同的,重要的是 Projection 的实现,官方 API 中,读取实现如下:
// 定义切片范围(并行度实现)
List<Pair<Integer, Integer>> sliceRanges = Persistence.get(system).getSliceRanges(numberOfProjectionInstances);
// 定义 gRPC 查询插件
GrpcReadJournal eventsBySlicesQuery = GrpcReadJournal.create(system,List.of(protobufFileDescriptors));
// 定义数据源, 使用 gRPC 查询
SourceProvider<Offset, EventEnvelope<Object>> sourceProvider = EventSourcedProvider.eventsBySlices(
system,
eventsBySlicesQuery,
eventsBySlicesQuery.streamId(),
sliceRange.first(),
sliceRange.second());
// 定义 R2dbcProjection (这里用的还是 R2JDBC,用于存储 offset)
Projection projection = R2dbcProjection.atLeastOnceAsync(
projectionId,
Optional.empty(),
sourceProvider,
() -> new EventHandler(projectionId),
system);
// 返回 ProjectionActor
return ProjectionBehavior.create(projection);
因为 gRPC 是 RPC 模型,并且从 gRPC Service 中读取,因此一定有一个事件生产的生产者(这里并不是从数据库中读取,R2DBC 只用于 offset), 下面来看官方的示例:
// 将内部 Event 转换为 Protobuf message
Transformation transformation =
Transformation.empty()
.registerMapper(ShoppingCart.ItemAdded.class, event -> Optional.of(transformItemAdded(event)))
.registerMapper(ShoppingCart.ItemQuantityAdjusted.class, event -> Optional.of(transformItemQuantityAdjusted(event)))
.registerMapper(ShoppingCart.ItemRemoved.class, event -> Optional.of(transformItemRemoved(event)))
.registerMapper(ShoppingCart.CheckedOut.class, event -> Optional.of(transformCheckedOut(event)));
// 定义事件生产者
EventProducerSource eventProducerSource = new EventProducerSource(
"ShoppingCart", // entityType
"cart", // streamID, 暂不清楚这里的作用
transformation, // 转换
EventProducerSettings.apply(system)
);
// 返回 grpc Handler(处理 http 请求和其他)
return EventProducer.grpcServiceHandler(system, eventProducerSource);
上面的代码中,EventProducerSource 和 EventProducer.grpcServiceHandler()
的内部原理不清楚,因此先深入后者。
从 scala 代码上看比较简单,主要是两个内容:
在深入前者, 可以看到 gRPC service 提供了三个 API 的实现,这里暂不解析用法,只看 eventsBySlices 方法的实现。(基本上就是:runEventsBySlices, 其他的都是异常处理),这里源码的内容太多,只放一些流程
上面分析了生产者和消费者的用法,以及生产者代码的内部实现,下面就看一下消费者的代码的实现原理,因为消费者代码比较少,还有一些 R2DBC 的内容,从文档和注释来看这一部分主要是 offset 的内容。因此在这里深入生产者中缺失的 EventSliceQuery
的实现, 这里选用 gRPC 的实现
streamId
的实现比较简单,直接是 settings.streamId
eventsBySlices
实现的源码比较长,这里不展示,而是写一下流程:
看完源代码比较混乱,为什么这里又回到了 gRPC, 从头图来看则明白了,这里的 eventsBySlices
实际上是 Consumer 微服务的,在 Producer 微服务中,使用的 eventsBySlices
的 ReadJournal 实现则不是 GrpcReadJournal
, 而还是 R2DBC 的实现。因为实际上 Event 还是放在数据库中.这里暂不展开,放在下面统一对 R2DBC 展开
这里主要是针对 gRPC 模块中 Producer 端查询事件能力以及 projection 能力的解析。
这里直接用源码添加注释来分析,可以看到此方法主要实现是合并了数据查询流和 brokerLess 的 pub/sub 订阅流. 和投影区别中文档的低延迟能力对应上了
DAO 查询实现的源码特别复杂,大量用了 AkkaStream 的内容,而且还不是简单的流,是 Graph 流
这里暂时就不展开源码具体分析,而是简单描述一下。
bySlice 返回一个 BySliceQuery,传入(QueryDao, 解析数据库 Row 到 Event 的转换,提取 Offset 的方法)。BySliceQuery 有三个方法
实际上发生查询的地方,这里返回的是一个 ContinuousQuery,这里定义了如何用 Stream 的方式读取数据库,比较复杂,ContinuousQuery 拼接了大量方法,如当前状态是什么(offset),下一次的状态是什么(nextOffset),查询之前做什么(beforeQuery查一下总数)等
最终查库使用的是 BySliceQuery 参数中的 QueryDao.rowsBySlices(entityType,minSlice,maxSlice,fromTimestamp,toTimestamp,behindCurrentTime)
这里的实现则比较简单,是 JDBC 的内容.
这个 SQL 并不复杂,和 Phoenix 中查询 event_store 基本是一样的,不过用了 entity_type 过滤了一下,避免整表查询(挺好的,也能提供并行度)
看完上面之后,了解了 R2DBC 是如何做查询的,但是低延迟的 pub/sub 只看到了订阅,没有看到发布过程。
根据配置,定位到 R2dbcJournal
实现,如果开启,则会提供一个 Pub/Sub
然后在 publish 方法中使用了此 Pub/Sub
最终在事件写入时,调用了 publish 方法
与 JDBC 版本对比原理则更明显,因此低延迟方案只能在 R2DBC 模式下运行