apache / pulsar

Apache Pulsar - distributed pub-sub messaging system
https://pulsar.apache.org/
Apache License 2.0
14.25k stars 3.58k forks source link

[Bug] function Interrupted for unknown reasons #21803

Open graysonzeng opened 10 months ago

graysonzeng commented 10 months ago

Search before asking

Version

Pulsar & Function 3.1.1

Minimal reproduce step

running with effectively once

What did you expect to see?

function can handle InterruptedException

What did you see instead?

When function is running, the following exception will occasionally occur, and function reading and writing are stuck.

023-12-19T23:55:03,134+0800 [pulsar-external-listener-6-1] ERROR org.apache.pulsar.client.impl.ConsumerBase - [persistent://pulsar/log_Production/alltables-partition-13][sink-v2] Message listener error in processing message: 777380:54372:13
java.lang.RuntimeException: java.lang.InterruptedException
    at org.apache.pulsar.functions.source.PushPulsarSource.consume(PushPulsarSource.java:64) ~[io.streamnative-pulsar-functions-instance-3.0.1.3.jar:3.0.1.3]
    at org.apache.pulsar.functions.source.MultiConsumerPulsarSource.received(MultiConsumerPulsarSource.java:81) ~[io.streamnative-pulsar-functions-instance-3.0.1.3.jar:3.0.1.3]
    at org.apache.pulsar.client.impl.ConsumerBase.callMessageListener(ConsumerBase.java:1153) ~[io.streamnative-pulsar-client-original-3.0.1.3.jar:3.0.1.3]
    at org.apache.pulsar.client.impl.ConsumerBase.lambda$triggerListener$10(ConsumerBase.java:1119) ~[io.streamnative-pulsar-client-original-3.0.1.3.jar:3.0.1.3]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) ~[?:?]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) ~[?:?]
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[io.netty-netty-common-4.1.94.Final.jar:4.1.94.Final]
    at java.lang.Thread.run(Thread.java:833) ~[?:?]
Caused by: java.lang.InterruptedException
    at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1638) ~[?:?]
    at java.util.concurrent.LinkedBlockingQueue.put(LinkedBlockingQueue.java:343) ~[?:?]
    at org.apache.pulsar.functions.source.PushPulsarSource.consume(PushPulsarSource.java:62) ~[io.streamnative-pulsar-functions-instance-3.0.1.3.jar:3.0.1.3]
    ... 7 more

According to the log, the close interface was called and the client was closed.

2023-12-27T10:01:14,985+0800 [Thread-3] INFO  function-functions-pro - start to close all the producers
2023-12-27T10:01:14,988+0800 [Thread-3] INFO  function-functions-pro - finish to close all the producers
2023-12-27T10:01:14,988+0800 [Thread-3] INFO  org.apache.pulsar.client.impl.PulsarClientImpl - Client closing. URL: pulsar://21.24.16.111:6650
2023-12-27T10:01:14,988+0800 [pulsar/log_Production/functions-pro-0] INFO  org.apache.pulsar.functions.instance.JavaInstanceRunnable - Encountered exception in sink write:
org.apache.pulsar.client.api.PulsarClientException: java.lang.InterruptedException
     org.apache.pulsar.client.api.PulsarClientException.unwrap(PulsarClientException.java:1031) ~[java-instance.jar:?]
     org.apache.pulsar.client.impl.ProducerBuilderImpl.create(ProducerBuilderImpl.java:89) ~[io.streamnative-pulsar-client-original-3.0.1.3.jar:3.0.1.3]
     com.tencent.DynamicTopicSink.getProducer(DynamicTopicSink.java:164) ~[8Ia5QqpY9lE3mwskma2zRA/:?]
     com.tencent.DynamicTopicSink.write(DynamicTopicSink.java:142) ~[8Ia5QqpY9lE3mwskma2zRA/:?]
     org.apache.pulsar.functions.instance.JavaInstanceRunnable.sendOutputMessage(JavaInstanceRunnable.java:429) ~[io.streamnative-pulsar-functions-instance-3.0.1.3.jar:?]
     org.apache.pulsar.functions.instance.JavaInstanceRunnable.handleResult(JavaInstanceRunnable.java:391) ~[io.streamnative-pulsar-functions-instance-3.0.1.3.jar:?]
     org.apache.pulsar.functions.instance.JavaInstanceRunnable.run(JavaInstanceRunnable.java:331) ~[io.streamnative-pulsar-functions-instance-3.0.1.3.jar:?]
     java.lang.Thread.run(Thread.java:833) ~[?:?]
Caused by: java.lang.InterruptedException
     java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:386) ~[?:?]
     java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2073) ~[?:?]
     org.apache.pulsar.client.impl.ProducerBuilderImpl.create(ProducerBuilderImpl.java:87) ~[io.streamnative-pulsar-client-original-3.0.1.3.jar:3.0.1.3]
\t... 6 more
2023-12-27T10:01:14,989+0800 [pulsar-client-io-17-5] INFO  org.apache.pulsar.client.impl.ProducerImpl - [persistent://pulsar/log_Production/EquipInfo-partition-3] [sink-EquipInfo] Closed Producer
2023-12-27T10:01:14,989+0800 [Thread-3] INFO  org.apache.pulsar.client.impl.ProducerImpl - [persistent://pulsar/log_Production/CombatEffectivenessFlow-partition-4] [sink-CombatEffectivenessFlow] Closed Producer (not connected)
2023-12-27T10:01:14,990+0800 [Thread-3] INFO  org.apache.pulsar.client.impl.ProducerImpl - [persistent://pulsar/log_Production/CombatEffectivenessFlow-partition-1] [sink-CombatEffectivenessFlow] Closed Producer (not connected)
2023-12-27T10:01:14,990+0800 [Thread-3] INFO  org.apache.pulsar.client.impl.ProducerImpl - [persistent://pulsar/log_Production/CombatEffectivenessFlow-partition-3] [sink-CombatEffectivenessFlow] Closed Producer (not connected)
2023-12-27T10:01:14,990+0800 [Thread-3] INFO  org.apache.pulsar.client.impl.ProducerImpl - [persistent://pulsar/log_Production/CombatEffectivenessFlow-partition-2] [sink-CombatEffectivenessFlow] Closed Producer (not connected)
2023-12-27T10:01:14,990+0800 [pulsar-client-io-17-5] INFO  org.apache.pulsar.client.impl.ProducerImpl - [persistent://pulsar/log_Production/shenqibianhua-partition-4] [sink-shenqibianhua] Closed Producer
2023-12-27T10:01:14,990+0800 [pulsar-client-io-17-5] INFO  org.apache.pulsar.client.impl.ProducerImpl - [persistent://pulsar/log_Production/shenqibianhua-partition-1] [sink-shenqibianhua] Closed Producer
....
2023-12-27T10:01:14,991+0800 [pulsar-client-io-17-9] INFO  org.apache.pulsar.client.impl.ProducerImpl - [persistent://pulsar/log_Production/EquipInfo-partition-0] [sink-EquipInfo] Closed Producer
2023-12-27T10:01:14,991+0800 [pulsar-client-io-17-7] INFO  org.apache.pulsar.client.impl.PartitionedProducerImpl - [persistent://pulsar/log_Production/shenqibianhua] Closed Partitioned Producer
2023-12-27T10:01:14,991+0800 [pulsar-client-io-17-9] INFO  org.apache.pulsar.client.impl.PartitionedProducerImpl - [persistent://pulsar/log_Production/EquipInfo] Closed Partitioned Producer
2023-12-27T10:01:14,991+0800 [pulsar/log_Production/functions-pro-0] ERROR org.apache.pulsar.functions.instance.JavaInstanceRunnable - [pulsar/log_Production/functions-pro:0] Uncaught exception in Java Instance
java.lang.RuntimeException: Failed to process message: 1306487:441847:0
     org.apache.pulsar.functions.source.PulsarSource.lambda$buildRecord$6(PulsarSource.java:155) ~[io.streamnative-pulsar-functions-instance-3.0.1.3.jar:3.0.1.3]
     org.apache.pulsar.functions.source.PulsarRecord.fail(PulsarRecord.java:133) ~[io.streamnative-pulsar-functions-instance-3.0.1.3.jar:3.0.1.3]
     org.apache.pulsar.functions.instance.JavaInstanceRunnable.sendOutputMessage(JavaInstanceRunnable.java:434) ~[io.streamnative-pulsar-functions-instance-3.0.1.3.jar:?]
     org.apache.pulsar.functions.instance.JavaInstanceRunnable.handleResult(JavaInstanceRunnable.java:391) ~[io.streamnative-pulsar-functions-instance-3.0.1.3.jar:?]
     org.apache.pulsar.functions.instance.JavaInstanceRunnable.run(JavaInstanceRunnable.java:331) ~[io.streamnative-pulsar-functions-instance-3.0.1.3.jar:?]
     java.lang.Thread.run(Thread.java:833) ~[?:?]
2023-12-27T10:01:14,991+0800 [pulsar-client-io-17-7] INFO  org.apache.pulsar.client.impl.ProducerImpl - [persistent://pulsar/log_Production/ShiMenMission-partition-3] [sink-ShiMenMission] Closed Producer

I override the close interface

    @Override
    public void close() throws Exception {
        logger.info("start to close all the producers");
        closeProducer();
        closeClient();
    }

    public void closeProducer() {
        List<CompletableFuture<Void>> closeFutures = new ArrayList<>(producerMap.size());
        try {
            for (Map.Entry<String, Producer<byte[]>> entry : producerMap.entrySet()) {
                Producer<byte[]> producer = entry.getValue();
                closeFutures.add(producer.closeAsync());
                producerMap.clear();
            }
            org.apache.pulsar.common.util.FutureUtil.waitForAll(closeFutures);
            logger.info("finish to close all the producers");
        } catch (Exception e) {
            logger.warn("Failed to close all the producers", e);
        }
    }

    public void closeClient() {
        List<CompletableFuture<Void>> closeFutures = new ArrayList<>(10);
        try {
            closeFutures.add(pulsarClient.closeAsync());
            org.apache.pulsar.common.util.FutureUtil.waitForAll(closeFutures);
            logger.info("finish to close all the client");
        } catch (Exception e) {
            logger.warn("Failed to close all the producers", e);
        }
    }

Anything else?

No response

Are you willing to submit a PR?

error.log

Technoboy- commented 10 months ago

@jiangpengcheng could you help take a look ?