apache / beam

Apache Beam is a unified programming model for Batch and Streaming data processing.
https://beam.apache.org/
Apache License 2.0
7.88k stars 4.27k forks source link

[Failing Test]: JmsIOIT.testPublishingThenReadingAll highly flaky #26175

Open Abacn opened 1 year ago

Abacn commented 1 year ago

What happened?

Affecting https://ci-beam.apache.org/view/PostCommit/job/beam_PreCommit_Java_Jms_IO_Direct_Cron/

java.lang.AssertionError: actual number of records 14220 smaller than expected: 100000.
    at org.junit.Assert.fail(Assert.java:89)
    at org.junit.Assert.assertTrue(Assert.java:42)
    at org.apache.beam.sdk.io.jms.JmsIOIT.testPublishingThenReadingAll(JmsIOIT.java:195)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
    at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
    at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
    at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
    at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
    at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
    at org.apache.beam.sdk.testing.TestPipeline$1.evaluate(TestPipeline.java:323)

We should probably decrease the number of element when test is running locally. There is also likely a problem the element not emitted on time.

Issue Failure

Failure: Test is flaky

Issue Priority

Priority: 2 (backlog / disabled test but we think the product is healthy)

Issue Components

Abacn commented 1 year ago

Related to #25945. CC: @Amraneze

Amraneze commented 1 year ago

Related to #25945. CC: @Amraneze

I made a workaround to cancel the pipeline because it was running for more than 30 mins even though all messages were published & received. What do you think it would be the best approach to do ?

Amraneze commented 1 year ago

And I can see in the logs of the failing test that there is an issue with connection.

image

We have some ghost connections and the JmsIO is creating new connections but for reading not publishing. JmsIO$UnboundedJmsReader.closeConnection(JmsIO.java:649). It feels as connection leak even though the broker is down. If we can just try to reconnect and or force to close the connection and session.

Abacn commented 1 year ago

@Amraneze from the log you linked there are lots of connection gets created. This is because the number of DoFn instance can be many in streaming. Defer to connect when first element is received may mitigate the ghost connection issue. Connection pool is a long term solution.

Amraneze commented 1 year ago

@Amraneze from the log you linked there are lots of connection gets created. This is because the number of DoFn instance can be many in streaming. Defer to connect when first element is received may mitigate the ghost connection issue. Connection pool is a long term solution.

Yeah the instance of the DoFn is created over and over because in the code we throw the exception and DoFn catches it to run TearDown function. But, I'm not sure if the connection is closed for sure. I'm trying to find time to work on the connection pool in the next few weeks. We also use finalize method which is deprecated and we already call the function doClose in the overridden close function of UnboundedReader. I guess it's better to remove it. What do you think ?

Abacn commented 1 year ago

I have opened #26179 to see if it works. I tested that the integration test passed locally but on Jenkins it has higher possibility of failure. This may be due to the ci nodes have higher nodes and higher possibility of connection issue

Amraneze commented 1 year ago

I will run it with gradle until the test fails to see if I can reproduce it.