streamnative / pulsar-archived

Apache Pulsar - distributed pub-sub messaging system
https://pulsar.apache.org
Apache License 2.0
73 stars 26 forks source link

ISSUE-12508: [Java client] Deadlock in Message.getValue() if executed inside a Pulsar async callback, due to blocking download of the Schema #3189

Open sijie opened 2 years ago

sijie commented 2 years ago

Original Issue: apache/pulsar#12508


Describe the bug In case of a AVRO message Message.getValue() needs to download the schema from the Registry performing a blocking operation that involves a request to the Broker itself.

If you call Message.getValue() on the completion of a CompletableFuture returned by Pulsar Client, like Reader.readNextAsync() then there is a good chance to create a deadlock.

This is my code

private CompletableFuture<?> readNextMessageIfAvailable(Reader<Op> reader, Consumer<Message> consumer) {
        return reader
                .hasMessageAvailableAsync()
                .thenCompose(hasMessageAvailable -> {
                    if (hasMessageAvailable == null
                            || !hasMessageAvailable) {
                        return CompletableFuture.completedFuture(null);
                    } else {
                        CompletableFuture<Message<Op>> opMessage = reader.readNextAsync();
                        return opMessage.thenCompose(msg -> {
                            Op value = msg.getValue();       // <------ DEADLOCK !!!
                            consumer.accept(value);
                            return readNextMessageIfAvailable(reader);
                        });
                    }
                });
    }

In this code msg.getValue() is executed on completion of readNextAsync() or of hasMessageAvailableAsync() and this may happen in the ClientCnx thread.

This is the stacktrace:

"pulsar-client-io-54-1" #152 prio=5 os_prio=31 cpu=89.65ms elapsed=7.12s tid=0x00007fa8bee81000 nid=0x29e03 waiting on condition  [0x000070000f2fa000]
   java.lang.Thread.State: WAITING (parking)
    at jdk.internal.misc.Unsafe.park(java.base@11.0.11/Native Method)
    - parking to wait for  <0x000000079d15a990> (a java.util.concurrent.CompletableFuture$Signaller)
    at java.util.concurrent.locks.LockSupport.park(java.base@11.0.11/LockSupport.java:194)
    at java.util.concurrent.CompletableFuture$Signaller.block(java.base@11.0.11/CompletableFuture.java:1796)
    at java.util.concurrent.ForkJoinPool.managedBlock(java.base@11.0.11/ForkJoinPool.java:3128)
    at java.util.concurrent.CompletableFuture.waitingGet(java.base@11.0.11/CompletableFuture.java:1823)
    at java.util.concurrent.CompletableFuture.get(java.base@11.0.11/CompletableFuture.java:1998)
    at org.apache.pulsar.client.impl.schema.reader.AbstractMultiVersionReader.getSchemaInfoByVersion(AbstractMultiVersionReader.java:119)
    at org.apache.pulsar.client.impl.schema.reader.MultiVersionAvroReader.loadReader(MultiVersionAvroReader.java:47)
    at org.apache.pulsar.client.impl.schema.reader.AbstractMultiVersionReader$1.load(AbstractMultiVersionReader.java:52)
    at org.apache.pulsar.client.impl.schema.reader.AbstractMultiVersionReader$1.load(AbstractMultiVersionReader.java:49)
    at com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3529)
    at com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2278)
    at com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2155)
    - locked <0x000000079d10d5e0> (a com.google.common.cache.LocalCache$StrongAccessEntry)
    at com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2045)
    at com.google.common.cache.LocalCache.get(LocalCache.java:3951)
    at com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:3974)
    at com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4935)
    at org.apache.pulsar.client.impl.schema.reader.AbstractMultiVersionReader.getSchemaReader(AbstractMultiVersionReader.java:83)
    at org.apache.pulsar.client.impl.schema.reader.AbstractMultiVersionReader.read(AbstractMultiVersionReader.java:90)
    at org.apache.pulsar.client.impl.schema.AbstractStructSchema.decode(AbstractStructSchema.java:67)
    at org.apache.pulsar.client.impl.MessageImpl.decode(MessageImpl.java:471)
    at org.apache.pulsar.client.impl.MessageImpl.getValue(MessageImpl.java:449)
    at io.streamnative.pulsar.handlers.kop.schemaregistry.model.impl.PulsarSchemaStorage.lambda$readNextMessageIfAvailable$0(PulsarSchemaStorage.java:143)
    at io.streamnative.pulsar.handlers.kop.schemaregistry.model.impl.PulsarSchemaStorage$$Lambda$1421/0x0000000800abd840.apply(Unknown Source)
    at java.util.concurrent.CompletableFuture.uniComposeStage(java.base@11.0.11/CompletableFuture.java:1106)
    at java.util.concurrent.CompletableFuture.thenCompose(java.base@11.0.11/CompletableFuture.java:2235)
    at io.streamnative.pulsar.handlers.kop.schemaregistry.model.impl.PulsarSchemaStorage.lambda$readNextMessageIfAvailable$1(PulsarSchemaStorage.java:142)
    at io.streamnative.pulsar.handlers.kop.schemaregistry.model.impl.PulsarSchemaStorage$$Lambda$1369/0x0000000800a99840.apply(Unknown Source)
    at java.util.concurrent.CompletableFuture$UniCompose.tryFire(java.base@11.0.11/CompletableFuture.java:1072)
    at java.util.concurrent.CompletableFuture.postComplete(java.base@11.0.11/CompletableFuture.java:506)
    at java.util.concurrent.CompletableFuture.complete(java.base@11.0.11/CompletableFuture.java:2073)
    at org.apache.pulsar.client.impl.ConsumerImpl.lambda$hasMessageAvailableAsync$48(ConsumerImpl.java:1925)
    at org.apache.pulsar.client.impl.ConsumerImpl$$Lambda$1367/0x0000000800a99040.accept(Unknown Source)
    at java.util.concurrent.CompletableFuture$UniAccept.tryFire(java.base@11.0.11/CompletableFuture.java:714)
    at java.util.concurrent.CompletableFuture.postComplete(java.base@11.0.11/CompletableFuture.java:506)
    at java.util.concurrent.CompletableFuture.complete(java.base@11.0.11/CompletableFuture.java:2073)
    at org.apache.pulsar.client.impl.ConsumerImpl.lambda$internalGetLastMessageIdAsync$53(ConsumerImpl.java:2042)
    at org.apache.pulsar.client.impl.ConsumerImpl$$Lambda$1364/0x0000000800a98440.accept(Unknown Source)
    at java.util.concurrent.CompletableFuture$UniAccept.tryFire(java.base@11.0.11/CompletableFuture.java:714)
    at java.util.concurrent.CompletableFuture.postComplete(java.base@11.0.11/CompletableFuture.java:506)
    at java.util.concurrent.CompletableFuture.complete(java.base@11.0.11/CompletableFuture.java:2073)
    at org.apache.pulsar.client.impl.ClientCnx.handleGetLastMessageIdSuccess(ClientCnx.java:491)
    at org.apache.pulsar.common.protocol.PulsarDecoder.channelRead(PulsarDecoder.java:298)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
    at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324)
    at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:296)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
    at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
    at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)

There is a workaround, that is to use "thenComposeAsync" instead of "thenCompose", but apart from this specific problem, the real problem is that Message.getValue() is a blocking operation that performs a network request (that is piled up on the same eventloop that triggered the call).

Fixing this is not trivial, as Message.getValue() triggers the Schema Implementation, in this case AbstractStructSchema, that in turn needs to download the Schema, so it finally depends on the Schema Implementation.

Originally the problem was reported by @vroyer in the context of Pulsar IO, my case is different, but in any case this problem affects any kind of trial of implementing a fully non blocking API for Pulsar

To Reproduce Use the sample code

Expected behavior No deadlock

Proposal One possibility is to force loading of the Schema inside the readNextAsync() method and let the CompletableFuture be completed only when the schema is fully loaded, this way the Message.getValue() won't block anymore.

This change will need to change the Schema API in order to support asynchronous decoding or at least asynchronous pre-fetching of the schema.

github-actions[bot] commented 2 years ago

The issue had no activity for 30 days, mark with Stale label.