Open davecromberge opened 9 months ago
@davecromberge Does FlinkDeltaConnector
emit CDC (change data capture) stream? How does it going to handle records update or delete? In other words, what would be the strategy to sync data between delta lake and pinot using Flink Delta Connector?
In order to sync with delta lake using Flink Delta Connector, we have 2 ways:
FlinkDeltaConnector
can emit CDC like stream, we can ingest data by enabling the upsert mode.@snleee we do not plan to support CDC directly through this interface. This issue is more about using Flink to build segments and upload them directly to the controller with the intention of giving the user more control over the ingestion process.
Whilst a delta lake might support the operations you mention, we only currently support INSERT/append through our connector. In some sense, we are tackling this problem in increments - eventually we will have to consider the delta lake semantics and synchronise the state between Pinot and the Delta lake by one of the two methods you describe. However, this is out of scope for the issue here which is narrowed to Flink and Pinot. It is entirely possible that this could fall away if there was enough segment / input file lineage to make the sync a reality - which could even be done via a spark job which optimises the delta lake and applies mutation. For our use case having the building blocks in place allows us to replay the INSERT operations from the delta lake from a given checkpoint / version into Pinot should we need to rebuild a table from scratch.
this PR bumps the Flink version to 1.19.0
There are two more enhancements that can be considered:
What needs to be done?
Upgrade Flink version The version of Flink in the pinot project pom should be updated from 1.14.6 to the latest version of Flink, 1.18.1.
Authentication The
org.apache.pinot.controller.helix.ControllerRequestClient
does not currently accept anorg.apache.pinot.spi.auth.AuthProvider
. Similarly, theorg.apache.pinot.connector.flink.sink.PinotSinkFunction<T>
does not accept an AuthProvider.Error handling Verify that the underlying SegmentWriter handles errors appending records to a segment and that errors propagate correctly to the Flink runtime. Check that the SegmentUploader handles transmission errors and propagates these correctly.
Schema and Data Type Mapping Correctly convert types from Flink timestamps with timezones to the Pinot equivalent. See this comment. Double / decimal conversions - see this comment.
Nice to haves:
Segment size parameter Currently
segmentFlushMaxNumRecords
controls when the segment is flushed according to the number of ingested rows. A dual to this could bedesiredSegmentSize
that could be used to flush segments when the number of bytes approaches or exceeds a size threshold.Checkpoint support Understanding the limitations behind only supporting Batch mode execution in Flink. Can the current segment writer be serialized and is there support for resuming from the serialized state?
Connector assembly Packaging the connector as a single assembly with shaded dependencies so that it can be used within the FlinkSQL environment. This is done in other connectors such as Delta Lake, Google BigQuery etc.
Other questions:
Why the feature is needed
Our particular use case involves using pre-aggregation before ingestion into Pinot using Apache Datasketches. These are serialized as binary and can be in the order of megabytes. These are appended to a Delta Lake. The idea is to stream records continuously from the Delta Lake using the Flink Delta Connector and have fine grained control over Pinot Segment generation. These segments are to be uploaded directly to Pinot. Our Pinot controllers are secured using Basic Authentication.
It is possible to clone and modify the existing connector and make modifications but some of these enhancements might benefit other users and discussing here is better.
Initial idea/proposal Discuss the points above and collaborate on implementation.