streamnative / kop

Kafka-on-Pulsar - A protocol handler that brings native Kafka protocol to Apache Pulsar
https://streamnative.io/docs/kop
Apache License 2.0
450 stars 137 forks source link

[BUG] DistributedClusterTest CI failed but local tests passed #184

Open BewareMyPower opened 4 years ago

BewareMyPower commented 4 years ago

See #183

The DistributedClusterTest CI failed with Github Action:

2020-09-23T08:54:33.4397994Z [INFO] Running io.streamnative.pulsar.handlers.kop.DistributedClusterTest
2020-09-23T08:54:55.9560950Z [ERROR] Tests run: 3, Failures: 3, Errors: 0, Skipped: 0, Time elapsed: 22.468 s <<< FAILURE! - in io.streamnative.pulsar.handlers.kop.DistributedClusterTest
2020-09-23T08:54:55.9563904Z [ERROR] testMutiBrokerAndCoordinator(io.streamnative.pulsar.handlers.kop.DistributedClusterTest)  Time elapsed: 0.225 s  <<< FAILURE!
2020-09-23T08:54:55.9565737Z java.lang.AssertionError: expected [2] but found [1]
2020-09-23T08:54:55.9566540Z    at org.testng.Assert.fail(Assert.java:96)
2020-09-23T08:54:55.9567449Z    at org.testng.Assert.failNotEquals(Assert.java:776)
2020-09-23T08:54:55.9568858Z    at org.testng.Assert.assertEqualsImpl(Assert.java:137)
2020-09-23T08:54:55.9569889Z    at org.testng.Assert.assertEquals(Assert.java:118)
2020-09-23T08:54:55.9570766Z    at org.testng.Assert.assertEquals(Assert.java:652)
2020-09-23T08:54:55.9571619Z    at org.testng.Assert.assertEquals(Assert.java:662)
2020-09-23T08:54:55.9575151Z    at io.streamnative.pulsar.handlers.kop.DistributedClusterTest.testMutiBrokerAndCoordinator(DistributedClusterTest.java:293)
2020-09-23T08:54:55.9578305Z    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
2020-09-23T08:54:55.9580208Z    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
2020-09-23T08:54:55.9582275Z    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
2020-09-23T08:54:55.9584712Z    at java.lang.reflect.Method.invoke(Method.java:498)
2020-09-23T08:54:55.9586249Z    at org.testng.internal.MethodInvocationHelper.invokeMethod(MethodInvocationHelper.java:124)
2020-09-23T08:54:55.9588259Z    at org.testng.internal.InvokeMethodRunnable.runOne(InvokeMethodRunnable.java:54)
2020-09-23T08:54:55.9720313Z    at org.testng.internal.InvokeMethodRunnable.run(InvokeMethodRunnable.java:44)
2020-09-23T08:54:55.9721872Z    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
2020-09-23T08:54:55.9723042Z    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
2020-09-23T08:54:55.9724563Z    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
2020-09-23T08:54:55.9726322Z    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
2020-09-23T08:54:55.9727406Z    at java.lang.Thread.run(Thread.java:748)
2020-09-23T08:54:55.9727744Z 
2020-09-23T08:54:55.9730248Z [ERROR] testMutiBrokerUnloadReload(io.streamnative.pulsar.handlers.kop.DistributedClusterTest)  Time elapsed: 0.044 s  <<< FAILURE!
2020-09-23T08:54:55.9732012Z java.lang.AssertionError: expected [true] but found [false]
2020-09-23T08:54:55.9732737Z    at org.testng.Assert.fail(Assert.java:96)
2020-09-23T08:54:55.9733507Z    at org.testng.Assert.failNotEquals(Assert.java:776)
2020-09-23T08:54:55.9734362Z    at org.testng.Assert.assertTrue(Assert.java:44)
2020-09-23T08:54:55.9735153Z    at org.testng.Assert.assertTrue(Assert.java:54)
2020-09-23T08:54:55.9737809Z    at io.streamnative.pulsar.handlers.kop.DistributedClusterTest.testMutiBrokerUnloadReload(DistributedClusterTest.java:420)
2020-09-23T08:54:55.9740912Z    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
2020-09-23T08:54:55.9742561Z    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
2020-09-23T08:54:55.9744684Z    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
2020-09-23T08:54:55.9746119Z    at java.lang.reflect.Method.invoke(Method.java:498)
2020-09-23T08:54:55.9747641Z    at org.testng.internal.MethodInvocationHelper.invokeMethod(MethodInvocationHelper.java:124)
2020-09-23T08:54:55.9749630Z    at org.testng.internal.InvokeMethodRunnable.runOne(InvokeMethodRunnable.java:54)
2020-09-23T08:54:55.9751318Z    at org.testng.internal.InvokeMethodRunnable.run(InvokeMethodRunnable.java:44)
2020-09-23T08:54:55.9752691Z    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
2020-09-23T08:54:55.9753764Z    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
2020-09-23T08:54:55.9755166Z    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
2020-09-23T08:54:55.9756774Z    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
2020-09-23T08:54:55.9757768Z    at java.lang.Thread.run(Thread.java:748)
2020-09-23T08:54:55.9758097Z 
2020-09-23T08:54:55.9759376Z [ERROR] testOneBrokerShutdown(io.streamnative.pulsar.handlers.kop.DistributedClusterTest)  Time elapsed: 0.033 s  <<< FAILURE!
2020-09-23T08:54:55.9760907Z java.lang.AssertionError: expected [true] but found [false]
2020-09-23T08:54:55.9761550Z    at org.testng.Assert.fail(Assert.java:96)
2020-09-23T08:54:55.9762438Z    at org.testng.Assert.failNotEquals(Assert.java:776)
2020-09-23T08:54:55.9763223Z    at org.testng.Assert.assertTrue(Assert.java:44)
2020-09-23T08:54:55.9764044Z    at org.testng.Assert.assertTrue(Assert.java:54)
2020-09-23T08:54:55.9766312Z    at io.streamnative.pulsar.handlers.kop.DistributedClusterTest.testOneBrokerShutdown(DistributedClusterTest.java:469)
2020-09-23T08:54:55.9768865Z    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
2020-09-23T08:54:55.9770995Z    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
2020-09-23T08:54:55.9773023Z    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
2020-09-23T08:54:55.9774565Z    at java.lang.reflect.Method.invoke(Method.java:498)
2020-09-23T08:54:55.9776208Z    at org.testng.internal.MethodInvocationHelper.invokeMethod(MethodInvocationHelper.java:124)
2020-09-23T08:54:55.9779338Z    at org.testng.internal.InvokeMethodRunnable.runOne(InvokeMethodRunnable.java:54)
2020-09-23T08:54:55.9782708Z    at org.testng.internal.InvokeMethodRunnable.run(InvokeMethodRunnable.java:44)
2020-09-23T08:54:55.9784098Z    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
2020-09-23T08:54:55.9785176Z    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
2020-09-23T08:54:55.9786582Z    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
2020-09-23T08:54:55.9788211Z    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
2020-09-23T08:54:55.9789285Z    at java.lang.Thread.run(Thread.java:748)

But it succeed in local environment:

image

Merged this PR first.

BewareMyPower commented 4 years ago

Since the CI environment is Ubuntu 18.04, the local tests passed in MacOS. I've tried the DistributedClusterTest in CentOS 7, which is another Linux system.

Run:

mvn test '-Dtest=DistributedClusterTest' -pl tests

The error log:

[ERROR] testMutiBrokerUnloadReload(io.streamnative.pulsar.handlers.kop.DistributedClusterTest)  Time elapsed: 30.003 s  <<< FAILURE!
org.testng.internal.thread.ThreadTimeoutException: Method io.streamnative.pulsar.handlers.kop.DistributedClusterTest.testMutiBrokerUnloadReload() didn't finish within the time-out 30000
    at io.streamnative.pulsar.handlers.kop.DistributedClusterTest.testMutiBrokerUnloadReload(DistributedClusterTest.java:433)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.testng.internal.MethodInvocationHelper.invokeMethod(MethodInvocationHelper.java:124)
    at org.testng.internal.InvokeMethodRunnable.runOne(InvokeMethodRunnable.java:54)
    at org.testng.internal.InvokeMethodRunnable.run(InvokeMethodRunnable.java:44)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

It's a little different with the CI tests result, which often failed just because the 2 namespace bundles were both owned by one broker. From the testMutiBrokerUnloadReload's log, we can see:

21:40:09.107 [TestNG-method=testMutiBrokerUnloadReload-1:io.streamnative.pulsar.handlers.kop.DistributedClusterTest@445] INFO  io.streamnative.pulsar.handlers.kop.DistributedClusterTest - Re Publish / Consume again.
21:40:10.588 [TestNG-method=testMutiBrokerUnloadReload-1:io.streamnative.pulsar.handlers.kop.DistributedClusterTest@234] DEBUG io.streamnative.pulsar.handlers.kop.DistributedClusterTest - kConsumer kopMutiBrokerUnloadReload10consumer-group-1 start poll message: 0
/* repeat nearly 30 times */
21:40:37.614 [TestNG-method=testMutiBrokerUnloadReload-1:io.streamnative.pulsar.handlers.kop.DistributedClusterTest@234] DEBUG io.streamnative.pulsar.handlers.kop.DistributedClusterTest - kConsumer kopMutiBrokerUnloadReload10consumer-group-1 start poll message: 0

https://github.com/streamnative/kop/blob/d684f4bd295ed686aa6c6f371864f29f91c02e00/tests/src/test/java/io/streamnative/pulsar/handlers/kop/DistributedClusterTest.java#L444-L448

https://github.com/streamnative/kop/blob/d684f4bd295ed686aa6c6f371864f29f91c02e00/tests/src/test/java/io/streamnative/pulsar/handlers/kop/DistributedClusterTest.java#L226-L263

We can see after the unload operation, these two consumers cannot consume messages.

But if we close these two consumers and create with the same group id again:

        kConsumer1.close();
        kConsumer2.close();
        kConsumer1 = new KConsumer(kafkaTopicName, getKafkaBrokerPort(), "consumer-group-1");
        kConsumer2 = new KConsumer(kafkaTopicName, getKafkaBrokerPort(), "consumer-group-2");

then the newly created consumers worked well.