GoogleCloudPlatform / pubsub

This repository contains open-source projects managed by the owners of Google Cloud Pub/Sub.
Apache License 2.0
246 stars 146 forks source link

kafka-connector not flushing messages (ack) #277

Open nicodds opened 3 years ago

nicodds commented 3 years ago

I recently synced the repository with my local copy, in order to test the new advancements. I noticed that the connector became unstable and is incurring in a serious problem, since it is not flushing some messages from the subscription. Consequently I'm experiencing frequent duplicates: the same messages are pulled several times from the PubSub subscription because the ack deadline expires).

The errors in the logs are as follow:

[2021-05-14 16:25:46,674] WARN Failed to acknowledge message: com.google.api.gax.rpc.InvalidArgumentException: io.grpc.StatusRuntimeException: INVALID_ARGUMENT: Request payload size exceeds the limit: 524288 bytes. (com.google.pubsub.kafka.source.CloudPu
bSubSourceTask:366)

.......

[2021-05-14 16:26:21,905] ERROR WorkerSourceTask{id=aaaaa_schedule-2} Failed to flush, timed out while waiting for producer to flush outstanding 440 messages (org.apache.kafka.connect.runtime.WorkerSourceTask:492)
[2021-05-14 16:26:21,906] ERROR WorkerSourceTask{id=aaaaa_schedule-2} Failed to commit offsets (org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter:113)
[2021-05-14 16:26:21,906] INFO WorkerSourceTask{id=aaaaa_schedule-3} flushing 1294 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:487)
[2021-05-14 16:26:22,943] INFO WorkerSourceTask{id=aaaaa-settlement-3} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:487)
[2021-05-14 16:26:22,944] ERROR WorkerSourceTask{id=aaaaa-settlement-3} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:184)
org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:206)
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:132)
        at org.apache.kafka.connect.runtime.WorkerSourceTask.convertTransformedRecord(WorkerSourceTask.java:298)
        at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:324)
        at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:248)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:182)   
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:231)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.kafka.connect.errors.DataException: Invalid schema type for ByteArrayConverter: STRUCT
        at org.apache.kafka.connect.converters.ByteArrayConverter.fromConnectData(ByteArrayConverter.java:55)
        at org.apache.kafka.connect.storage.Converter.fromConnectData(Converter.java:63)
        at org.apache.kafka.connect.runtime.WorkerSourceTask.lambda$convertTransformedRecord$2(WorkerSourceTask.java:298)
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:156)
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:190)
        ... 11 more
May 14, 2021 4:26:22 PM io.grpc.internal.ManagedChannelOrphanWrapper$ManagedChannelReference cleanQueue
SEVERE: *~*~*~ Channel ManagedChannelImpl{logId=70, target=pubsub.googleapis.com:443} was not shutdown properly!!! ~*~*~*
    Make sure to call shutdown()/shutdownNow() and wait until awaitTermination() returns true.
java.lang.RuntimeException: ManagedChannel allocation site
        at io.grpc.internal.ManagedChannelOrphanWrapper$ManagedChannelReference.<init>(ManagedChannelOrphanWrapper.java:93)
        at io.grpc.internal.ManagedChannelOrphanWrapper.<init>(ManagedChannelOrphanWrapper.java:53)
        at io.grpc.internal.ManagedChannelOrphanWrapper.<init>(ManagedChannelOrphanWrapper.java:44)
        at io.grpc.internal.AbstractManagedChannelImplBuilder.build(AbstractManagedChannelImplBuilder.java:518)
        at com.google.api.gax.grpc.InstantiatingGrpcChannelProvider.createSingleChannel(InstantiatingGrpcChannelProvider.java:327)
        at com.google.api.gax.grpc.InstantiatingGrpcChannelProvider.access$1700(InstantiatingGrpcChannelProvider.java:74)
        at com.google.api.gax.grpc.InstantiatingGrpcChannelProvider$1.createSingleChannel(InstantiatingGrpcChannelProvider.java:220)
        at com.google.api.gax.grpc.ChannelPool.create(ChannelPool.java:72)
        at com.google.api.gax.grpc.InstantiatingGrpcChannelProvider.createChannel(InstantiatingGrpcChannelProvider.java:227)
        at com.google.api.gax.grpc.InstantiatingGrpcChannelProvider.getTransportChannel(InstantiatingGrpcChannelProvider.java:210)
        at com.google.api.gax.rpc.ClientContext.create(ClientContext.java:169)
        at com.google.cloud.pubsub.v1.stub.GrpcSubscriberStub.create(GrpcSubscriberStub.java:272)
        at com.google.pubsub.kafka.source.CloudPubSubSourceConnector.verifySubscription(CloudPubSubSourceConnector.java:314)
        at com.google.pubsub.kafka.source.CloudPubSubSourceConnector.start(CloudPubSubSourceConnector.java:157)
        at org.apache.kafka.connect.runtime.WorkerConnector.doStart(WorkerConnector.java:184)
        at org.apache.kafka.connect.runtime.WorkerConnector.start(WorkerConnector.java:209)
        at org.apache.kafka.connect.runtime.WorkerConnector.doTransitionTo(WorkerConnector.java:348)
        at org.apache.kafka.connect.runtime.WorkerConnector.doTransitionTo(WorkerConnector.java:331)
        at org.apache.kafka.connect.runtime.WorkerConnector.doRun(WorkerConnector.java:140)
        at org.apache.kafka.connect.runtime.WorkerConnector.run(WorkerConnector.java:117)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:829)
nicodds commented 3 years ago

feel free to write for any further detail