apache / beam

Apache Beam is a unified programming model for Batch and Streaming data processing.
https://beam.apache.org/
Apache License 2.0
7.76k stars 4.21k forks source link

Support for reading Kafka topics from any startReadTime in Java #21610

Open damccorm opened 2 years ago

damccorm commented 2 years ago

https://github.com/apache/beam/blob/fd8546355523f67eaddc22249606fdb982fe4938/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ConsumerSpEL.java#L180-L198

 

Right now the 'startReadTime' config for KafkaIO.Read looks up an offset in every topic partition that is newer or equal to that timestamp. The problem is that if we use a timestamp that is so new, that we don't have any newer/equal message in the partition. In that case the code fails with an exception. Meanwhile in certain cases it makes no sense as we could actually make it work.

If we don't get an offset from calling consumer.offsetsForTimes, we should call endOffsets, and use the returned offset **** 1. That is actually the offset we will have to read next time.

Even if endOffsets can't return an offset we could use 0 as the offset to read from.

 

Am I missing something here? Is it okay to contribute this?

Imported from Jira BEAM-14518. Original Jira may contain additional context. Reported by: bnemeth.

johnjcasey commented 2 years ago

I'm not sure that simply using endOffsets would work in all cases. It may make sense to instead have the caller of this method try to handle getting a correct offset.

nbali commented 2 years ago

So basically the ability to support a provider that calculates an offset to be used if it's not found, and the default provider being the current implementation that throws an exception?

johnjcasey commented 2 years ago

Essentially. This method is public, so we shouldn't change the general pattern here. I would add a new method that calls this, but tries for end offset if it gets the exception, and then use that utility in the relevant places in the kafka readers

nbali commented 2 years ago

I'm not sure I will have time in the near future to implement this, but given how slowly the discussion went I created a working solution for reading the whole kafka stream in a batch pipeline. So whoever needs a quicker workaround that is even more customizable:

/**
 * Using {@link KafkaIO.Read#withStopReadTime(org.joda.time.Instant)} will try to acquire an offset for the given timestamp.<br>
 * If there are only older offsets than the provided timestamp the default implementation fails with an exception.<br>
 * This implementation falls back to the newest available offset instead - essentially reading till the newest available message.
 */
@Slf4j
public class MyKafkaConsumer<K, V> extends KafkaConsumer<K, V> {

    public MyKafkaConsumer(Map<String, Object> configs) {
        super(configs);
    }

    @Override
    public Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch, Duration timeout) {
        final Map<TopicPartition, OffsetAndTimestamp> result = super.offsetsForTimes(timestampsToSearch, timeout);

        final List<TopicPartition> topicPartitionsWithoutProperOffset =
                result.keySet().stream()
                        .filter(topicPartition -> result.get(topicPartition) == null)
                        .collect(Collectors.toList());

        endOffsets(topicPartitionsWithoutProperOffset).forEach((topicPartition, endOffset) -> {
            final Long timestampToSearch = timestampsToSearch.get(topicPartition);
            log.warn("Offset for topicPartition: {}, timestamp: {} was not found, replaced by endOffset: {}",
                    topicPartition, timestampToSearch, endOffset);
            result.put(topicPartition, new OffsetAndTimestamp(endOffset, timestampToSearch));
        });

        return result;
    }

}
KafkaIO.readBytes()
    .withStopReadTime(Instant.now())
    .withConsumerFactoryFn(MyKafkaConsumer::new) // required for stopReadTime