dehora / nakadi-java

🌀 Client library for the Nakadi Event Broker (examples: http://bit.ly/njc-examples, site: https://dehora.github.io/nakadi-java/)
MIT License
30 stars 19 forks source link

Can we have a blocking stream processor for the native API? #319

Closed olayinkasf closed 5 years ago

olayinkasf commented 6 years ago

I am currently writing a offset search using binary search and it is quite painful to write. See below for the snippet I wrote just to read one event at a particular offset. Also I am not sure how the thread is managed which makes the processor#stop painful to call. Having a cyclic reference between the processor and the observer is also not clean to write.

private OffsetDateTime findOccurredAt(String eventName, String partition, String offset) {
    StreamConfiguration streamConfig = new StreamConfiguration().eventTypeName(eventName)
                                                                .batchBufferCount(1)
                                                                .batchLimit(1)
                                                                .cursor(new Cursor(partition, offset));
    FirstEventMetadataObserver observer = new FirstEventMetadataObserver(eventName, partition, offset);
    StreamObserverProvider<String> observerProvider = new StreamObserverProvider<String>() {
        @Override
        public StreamObserver<String> createStreamObserver() {
            return observer;
        }

        @Override
        public TypeLiteral<String> typeLiteral() {
            return TypeLiterals.OF_STRING;
        }
    };
    StreamProcessor streamProcessor = resources.streamBuilder(streamConfig)
                                               .streamObserverFactory(observerProvider)
                                               .build();
    observer.setListener(streamProcessor::stop);
    streamProcessor.start();
    resilient(10).until(() -> observer.getEventMetadata() != null);//approx 1 minute wait
    observer.setListener(null);
    EventMetadata eventMetadata = observer.getEventMetadata();
    return eventMetadata.getOccurredAt();
}