AbsaOSS / hyperdrive

Extensible streaming ingestion pipeline on top of Apache Spark
Apache License 2.0
44 stars 13 forks source link

Move CheckpointOffsetManager logic to Reader and Writer #130

Closed kevinwallimann closed 4 years ago

kevinwallimann commented 4 years ago

The StreamManager has two methods with signatures

  def configure(streamReader: DataStreamReader, configuration: Configuration): DataStreamReader

  def configure(streamWriter: DataStreamWriter[Row], configuration: Configuration): DataStreamWriter[Row]

It's hard to imagine any other use case than the checkpoint location which would justify having the concept of a stream manager which configures solely reader and writer (but not the transformer for example). In fact, the implementation of the CheckpointOffsetManager can hardly be reused for a different data source since the concept of starting offsets is tightly coupled to kafka. Moreover, for the reader config, the checkpoint location is only needed to determine the starting offsets, which is an implementation detail. Arguably, it may be surprising to the developer that the starting offsets are not set in the KafkaStreamReader but in the CheckpointOffsetManager For all these reasons, I believe the extra indirection from having a StreamManager concept is not justified.

manager.checkpoint.base.location can be replaced by reader.kafka.checkpoint.base.location and writer.kafka.base.location and writer.parquet.base.location Another possibility is to replace it by a "top-level" property spark.ingestor.checkpoint.base.location since the checkpoint location is mandatory for every structured streaming query (see https://github.com/apache/spark/blob/695cb617d42507eded9c7e50bc7cd5333bbe6f83/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala#L259)

Decision Replace manager.checkpoint.base.location by writer.common.checkpoint.base.location because the checkpoint location is a config property for the DataStreamWriter per the Spark documentation. It's least surprising to add it as a writer property here as well. Unfortunately, the reader will have to depend on this property as well. Also, rename checkpoint.base.location to checkpoint.location, since the concept of a base location has been given up in #85

Migration for Trigger Replace

manager.checkpoint.base.location

with

writer.common.checkpoint.location

Replace

"component.manager=za.co.absa.hyperdrive.ingestor.implementation.manager.checkpoint.CheckpointOffsetManager",

with (empty string)