GoogleCloudPlatform / DataflowTemplates

Cloud Dataflow Google-provided templates for solving in-Cloud data tasks
https://cloud.google.com/dataflow/docs/guides/templates/provided-templates
Apache License 2.0
1.14k stars 950 forks source link

[Bug]: Flaky test: StreamingDataGeneratorIT#testFakeMessagesToKafka #1752

Open Polber opened 1 month ago

Polber commented 1 month ago

Related Template(s)

StreamingDataGenerator

Template Version

HEAD

What happened?

StreamingDataGeneratorIT#testFakeMessagesToKafka is flaky due to connection issue to the kafka broker on self-hosted GitHub Actions workers.

Relevant log output

[docker-java-stream-2024204139] INFO org.apache.beam.it.testcontainers.TestContainerResourceManager - confluentinc/cp-kafka:7.3.1: [2024-07-26 02:48:53,854] ERROR [KafkaApi-1] Unexpected error handling request RequestHeader(apiKey=INIT_PRODUCER_ID, apiVersion=4, clientId=producer-6, correlationId=64) -- InitProducerIdRequestData(transactionalId=null, transactionTimeoutMs=2147483647, producerId=-1, producerEpoch=-1) with context RequestContext(header=RequestHeader(apiKey=INIT_PRODUCER_ID, apiVersion=4, clientId=producer-6, correlationId=64), connectionId='172.17.0.17:9093-10.128.0.16:37716-248', clientAddress=/10.128.0.16, principal=User:ANONYMOUS, listenerName=ListenerName(PLAINTEXT), securityProtocol=PLAINTEXT, clientInformation=ClientInformation(softwareName=apache-kafka-java, softwareVersion=3.7.0), fromPrivilegedListener=false, principalSerde=Optional[org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder@61ef1d46]) (kafka.server.KafkaApis)

[docker-java-stream-2024204139] INFO org.apache.beam.it.testcontainers.TestContainerResourceManager - confluentinc/cp-kafka:7.3.1: org.apache.kafka.common.errors.TimeoutException: Timed out waiting for next producer ID block

[docker-java-stream-2024204139] INFO org.apache.beam.it.testcontainers.TestContainerResourceManager - confluentinc/cp-kafka:7.3.1: [2024-07-26 02:48:53,934] INFO [BrokerToControllerChannelManager broker=1 name=forwarding] Disconnecting from node 1 due to request timeout. (org.apache.kafka.clients.NetworkClient)

[docker-java-stream-2024204139] INFO org.apache.beam.it.testcontainers.TestContainerResourceManager - confluentinc/cp-kafka:7.3.1: [2024-07-26 02:48:53,934] INFO [BrokerToControllerChannelManager broker=1 name=forwarding] Cancelled in-flight API_VERSIONS request with correlation id 169 due to node 1 being disconnected (elapsed time since creation: 30030ms, elapsed time since send: 30030ms, request timeout: 30000ms) (org.apache.kafka.clients.NetworkClient)

[docker-java-stream-2024204139] INFO org.apache.beam.it.testcontainers.TestContainerResourceManager - confluentinc/cp-kafka:7.3.1: [2024-07-26 02:48:53,934] INFO [BrokerToControllerChannelManager broker=1 name=forwarding]: Recorded new controller, from now on will use node b010d8cb7964:9092 (id: 1 rack: null) (kafka.server.BrokerToControllerRequestThread)

[docker-java-stream-2024204139] INFO org.apache.beam.it.testcontainers.TestContainerResourceManager - confluentinc/cp-kafka:7.3.1: [2024-07-26 02:48:53,934] WARN [RPC ProducerId Manager 1]: Timed out when requesting AllocateProducerIds from the controller. (kafka.coordinator.transaction.RPCProducerIdManager)

[docker-java-stream-2024204139] INFO org.apache.beam.it.testcontainers.TestContainerResourceManager - confluentinc/cp-kafka:7.3.1: [2024-07-26 02:48:53,934] ERROR [KafkaApi-1] Unexpected error handling request RequestHeader(apiKey=INIT_PRODUCER_ID, apiVersion=4, clientId=producer-16, correlationId=64) -- InitProducerIdRequestData(transactionalId=null, transactionTimeoutMs=2147483647, producerId=-1, producerEpoch=-1) with context RequestContext(header=RequestHeader(apiKey=INIT_PRODUCER_ID, apiVersion=4, clientId=producer-16, correlationId=64), connectionId='172.17.0.17:9093-10.128.0.16:37736-248', clientAddress=/10.128.0.16, principal=User:ANONYMOUS, listenerName=ListenerName(PLAINTEXT), securityProtocol=PLAINTEXT, clientInformation=ClientInformation(softwareName=apache-kafka-java, softwareVersion=3.7.0), fromPrivilegedListener=false, principalSerde=Optional[org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder@3bd007d2]) (kafka.server.KafkaApis)

[docker-java-stream-2024204139] INFO org.apache.beam.it.testcontainers.TestContainerResourceManager - confluentinc/cp-kafka:7.3.1: org.apache.kafka.common.errors.TimeoutException: The request timed out.

[docker-java-stream-2024204139] INFO org.apache.beam.it.testcontainers.TestContainerResourceManager - confluentinc/cp-kafka:7.3.1: [2024-07-26 02:48:53,984] INFO [BrokerToControllerChannelManager broker=1 name=forwarding]: Recorded new controller, from now on will use node b010d8cb7964:9092 (id: 1 rack: null) (kafka.server.BrokerToControllerRequestThread)
...
[kafka-admin-client-thread | adminclient-1] INFO org.apache.kafka.clients.admin.internals.AdminMetadataManager - [AdminClient clientId=adminclient-1] Metadata update failed
org.apache.kafka.common.errors.TimeoutException: Timed out waiting to send the call. Call: fetchMetadata
[pool-1-thread-11] ERROR org.apache.beam.it.kafka.KafkaResourceManager - Failed to delete kafka topic.
java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Timed out waiting to send the call. Call: deleteTopics
    at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395)
    at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2005)
    at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:165)
    at org.apache.beam.it.kafka.KafkaResourceManager.cleanupAll(KafkaResourceManager.java:197)
    at org.apache.beam.it.common.utils.ResourceManagerUtils.cleanResources(ResourceManagerUtils.java:153)
    at com.google.cloud.teleport.v2.templates.StreamingDataGeneratorIT.tearDown(StreamingDataGeneratorIT.java:99)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:566)
    at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
    at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
    at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
    at org.junit.internal.runners.statements.RunAfters.invokeMethod(RunAfters.java:46)
    at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:33)
    at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61)
    at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
    at org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
    at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
    at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
    at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
    at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
    at org.apache.maven.surefire.junitcore.pc.Scheduler$1.run(Scheduler.java:410)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.kafka.common.errors.TimeoutException: Timed out waiting to send the call. Call: deleteTopics