apache / pulsar

Apache Pulsar - distributed pub-sub messaging system
https://pulsar.apache.org/
Apache License 2.0
14.17k stars 3.57k forks source link

[Bug] Deadlock on Pulsar Java client side #19784

Closed ThomasTaketurns closed 1 year ago

ThomasTaketurns commented 1 year ago

Search before asking

Version

The broker version is https://pulsar.apache.org/release-notes/versioned/pulsar-2.10.1/ And the client is : https://javadoc.io/doc/org.apache.pulsar/pulsar-client-api/2.10.3/index.html

Minimal reproduce step

We are currently facing what seems to be a deadlock issue on our testing environment. We are working with the Java SDK. In my thread dump I can see 24 threads stuck on this line : final Transaction txn = this.getClient().newTransaction().withTransactionTimeout(1, TimeUnit.MINUTES).build().get(); Where getClient returns an instance of PulsarClient. at jdk.internal.misc.Unsafe.park(java.base@17.0.2/Native Method)

What did you expect to see?

No deadlock

What did you see instead?

A deadlock

Anything else?

Here is the thread dump. Please tell me if you would need anything else. dump-2023-03-08.txt

Thanks for your help,

Thomas

Are you willing to submit a PR?

lifepuzzlefun commented 1 year ago

@ThomasTaketurns hi, can you provide some example code in your case?

ThomasTaketurns commented 1 year ago

@lifepuzzlefun , Hi, we are still trying to find the good race condition/scenario to reproduce the issue so I do not have code to sistematically reproduce.

I can still share parts of our code though :

This is where the 24 threads are waiting (first line) :

final Transaction txn = this.getClient().newTransaction().withTransactionTimeout(1, TimeUnit.MINUTES) .build().get(); for (final TakeTurnsEvent<E, DomainEvent> event : events) { @SuppressWarnings("rawtypes") final TypedMessageBuilder message = this.getProducer().newMessage(txn); message.eventTime(System.currentTimeMillis()); message.key(event.getEntityId().toString()); message.value(event.toPulsarEvent(this.schemaRegistry)); message.property("traceId", Span.current().getSpanContext().getTraceId()); message.property("spanId", Span.current().getSpanContext().getSpanId()); log.info("Sending event to Pulsar : {} ", event); message.sendAsync(); } txn.commit();

This is the only place where I found our code in the stack traces.

Let me know if I can share anything else.

Thanks,

Thomas

lifepuzzlefun commented 1 year ago

@ThomasTaketurns thanks for provide the code. also need the code create PulsarClient and Producer. and do you use schema to send message?

ThomasTaketurns commented 1 year ago

@lifepuzzlefun , thanks for getting back to me.

Regarding pulsar client :

public void initPulsarClient() throws TakeTurnsTechnicalException {
    try {
        log.info("Creating client to Pulsar with url: {}", this.pulsarConfig.getUrl());
        this.client = PulsarClient.builder()
                .serviceUrl(this.pulsarConfig.getUrl())
                .enableTlsHostnameVerification(false)
                .allowTlsInsecureConnection(true)
                .enableTransaction(true)
                .listenerThreads(this.pulsarConfig.getConsumerThreads())
                .build();
    } catch (final PulsarClientException e) {
        throw new TakeTurnsTechnicalException(e.getMessage(), e);
    }
}

And for producers :

public void connectPulsarProducer() throws TakeTurnsTechnicalException {
    if (this.pulsarConfig.getProducer().booleanValue()) {
        if (this.producer != null && this.producer.isConnected()) {
            throw new TakeTurnsTechnicalException("A Pulsar producer is already connected");
        }
        try {
            log.info("Creating producer to Pulsar topic: {}", this.getFullTopicReference());
            this.producer = this.getClient().newProducer(SCHEMA).autoUpdatePartitions(true)
                    .topic(this.getFullTopicReference()).blockIfQueueFull(true).enableBatching(false)
                    .sendTimeout(0, TimeUnit.MILLISECONDS).intercept(oTelProducerInterceptor).create();
            log.info("Producer connected to topic : {}", this.getFullTopicReference());
        } catch (final PulsarClientException e) {
            throw new TakeTurnsTechnicalException(e.getMessage(), e);
        }
    }
}

We do use schemas.

private static final AvroSchema SCHEMA = AvroSchema.of(PulsarEvent.class);