Closed ppawel closed 1 month ago
Thanks @ppawel for reporting! I can work on that
.take-issue
@ppawel Is the 'idleDurationThreshold' the only parameter you'd like to have control over or do you want to be able to override the entire WatermarkPolicy
object?
I can either
1) expose the withWatermarkIdleDurationThreshold()
configuration to the SolaceIO.read().withWatermarkIdleDurationThreshold()
or
2) expose the WatermarkPolicy
, where you would have to override the getWatermark()
and update()
methods if you'd opt in for your own implementation. You would set it then via SolaceIO.read().withWatermarkPolicy(...)
.
I'm leaning towards ad 1, where the user would have one simple setting to tweak, which seems consistent with other connectors.
@bzablocki For me personally, either one is fine, though option 1 seems cleaner and should be enough for my needs :+1:
Waiting for a review in #32109
@bzablocki I guess this can be closed now? We are using this new parameter and seems to be working fine.
Yes, feel free to close this. Thank you!
What would you like to happen?
Currently, there is no public API in SolaceIO to configure watermark behavior - classes like WatermarkPolicy and WatermarkParameters are package-private.
In effect, the user is required to use the default/hardcoded watermark idle threshold of 30 seconds (see
org.apache.beam.sdk.io.solace.read.WatermarkParameters#STANDARD_WATERMARK_IDLE_DURATION_THRESHOLD
). This is too long for my use case as a stuck watermark causes downstream windows to be stuck.Example: I have a 10-second session window right after the SolaceIO.Read transform which needs the watermark to be moving more often than 30 seconds, otherwise the elements are not moving through the whole pipeline as expected if there are no incoming Solace messages.
CC @bzablocki
Issue Priority
Priority: 2 (default / most feature requests should be filed as P2)
Issue Components