reactor / reactor-kafka

Reactive Kafka Driver with Reactor
http://projectreactor.io
613 stars 227 forks source link

Questionable blocking behavior in KafkaSender #71

Closed Rabajaba closed 3 years ago

Rabajaba commented 5 years ago

Using KafkaSender I had noticed that thread behavior is not usual and may lead to an unexpected blocking behavior on threads you're not waiting to be blocked. First concern To simplify the case, let's use just plain test code: sender.send(Flux.just(SenderRecord.create(new ProducerRecord<>(existingTopicName, testKey, testValue), testKey))

Internal Flux here will be subscribed on "main" thread, but taking a look at implementation of reactor.kafka.sender.internals.DefaultKafkaSender#send one sees following:

@Override
    public <T> Flux<SenderResult<T>> send(Publisher<? extends SenderRecord<K, V, T>> records) {
        return producerMono.flatMapMany(producer -> new Flux<SenderResult<T>>() {
                @Override
                public void subscribe(CoreSubscriber<? super SenderResult<T>> s) {
                    Flux<SenderRecord<K, V, T>> senderRecords = Flux.from(records);
                    senderRecords.subscribe(new SendSubscriber<>(producer, s, senderOptions.stopOnError()));
                }
            }
        .doOnError(e -> log.trace("Send failed with exception {}", e))
        .publishOn(senderOptions.scheduler(), senderOptions.maxInFlight()));
    }

meaning that "records" is actually subscribed and upon successful event producing SenderSubscriber will be called, where you may find method reactor.kafka.sender.internals.DefaultKafkaSender.AbstractSendSubscriber#onNext and a part of it's codebase there is org.apache.kafka.clients.producer.KafkaProducer#send(org.apache.kafka.clients.producer.ProducerRecord<K,V>, org.apache.kafka.clients.producer.Callback) which is actually a blocking code, taking a look to here through stacktrace:

private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {
        TopicPartition tp = null;
        try {
            throwIfProducerClosed();
            // first make sure the metadata for the topic is available
            ClusterAndWaitTime clusterAndWaitTime;
            try {
                clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), maxBlockTimeMs);
            } catch (KafkaException e) {
                if (metadata.isClosed())
                    throw new KafkaException("Producer closed while send in progress", e);
                throw e;
            }
         ...

or even better example of blocking code lower with stacktrace with wait() and synchronized

public synchronized void awaitUpdate(final int lastVersion, final long maxWaitMs) throws InterruptedException {
        if (maxWaitMs < 0)
            throw new IllegalArgumentException("Max time to wait for metadata updates should not be < 0 milliseconds");

        long begin = System.currentTimeMillis();
        long remainingWaitMs = maxWaitMs;
        while ((this.version <= lastVersion) && !isClosed()) {
            AuthenticationException ex = getAndClearAuthenticationException();
            if (ex != null)
                throw ex;
            if (remainingWaitMs != 0)
                wait(remainingWaitMs);
            long elapsed = System.currentTimeMillis() - begin;
            if (elapsed >= maxWaitMs)
                throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms.");
            remainingWaitMs = maxWaitMs - elapsed;
        }
        if (isClosed())
            throw new KafkaException("Requested metadata update after close");
    }

All of above means that under a user thread in a stream with records some blocking code may be executed leading to unexpected consequences.

Second concern Taking a look at implementation of

@Override
    public <T> Flux<SenderResult<T>> send(Publisher<? extends SenderRecord<K, V, T>> records) {
        return producerMono.flatMapMany(producer -> new Flux<SenderResult<T>>() {
                @Override
                public void subscribe(CoreSubscriber<? super SenderResult<T>> s) {
                    Flux<SenderRecord<K, V, T>> senderRecords = Flux.from(records);
                    senderRecords.subscribe(new SendSubscriber<>(producer, s, senderOptions.stopOnError()));
                }
            }
        .doOnError(e -> log.trace("Send failed with exception {}", e))
        .publishOn(senderOptions.scheduler(), senderOptions.maxInFlight()));
    }

of reactor.kafka.sender.internals.DefaultKafkaSender it looks rather odd to put publishOn instead of subscriberOn (which would solve first issue completely by having some unbounded reusable thread). Because KafkaProducer when responds with data from network IO actually has its own thread pool тфьу kafka-producer-network-thread so switching from it onto SenderOptions.scheduler() doesn't bring much of a value by just changing thread where you'll receive RecordMeradata object.

Summary:

  1. Correct me if I'm wrong, but usually it's not expected that some library you use will do something unexpected on your thread.
  2. KafkaProducer blocking code should be wrapped on some other scheduler in order not to have blocking commands on user's threads

Am I missing something?

Version: Both 1.1.0.RELEASE and latest master

dwen77 commented 4 years ago

I have similar question, could anyone please help us? Thanks!

Kamalakannangv commented 3 years ago

With BlockHound tool, we could see the blocking call in send() method of reactor.kafka.sender.KafkaSender.

reactor.blockhound.BlockingOperationError: Blocking call! java.lang.Object#wait at java.base/java.lang.Object.wait(Object.java) at org.apache.kafka.common.utils.SystemTime.waitObject(SystemTime.java:55) at org.apache.kafka.clients.producer.internals.ProducerMetadata.awaitUpdate(ProducerMetadata.java:119) at org.apache.kafka.clients.producer.KafkaProducer.waitOnMetadata(KafkaProducer.java:1029) at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:883) at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:862) at reactor.kafka.sender.internals.DefaultKafkaSender$AbstractSendSubscriber.onNext(DefaultKafkaSender.java:276) at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:114) at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.innerNext(FluxConcatMap.java:274) at reactor.core.publisher.FluxConcatMap$ConcatMapInner.onNext(FluxConcatMap.java:851) at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:73) at reactor.core.publisher.MonoFlatMapMany$FlatMapManyInner.onNext(MonoFlatMapMany.java:242) at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.innerNext(FluxConcatMap.java:274) at reactor.core.publisher.FluxConcatMap$ConcatMapInner.onNext(FluxConcatMap.java:851) at reactor.core.publisher.FluxConcatMap$WeakScalarSubscription.request(FluxConcatMap.java:469) at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.set(Operators.java:2152) at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.drain(FluxConcatMap.java:435) at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.onNext(FluxConcatMap.java:243)

Is there any better approach than publishing/subscribing the send call on separate thread pool?

bsideup commented 3 years ago

This is a known issue of kafka-clients. It is possible to provide a custom scheduler to workaround it.