sanjuthomas / kafka-connect-gcp-bigtable

Kafka Sink Connect to GCP Bigtable - https://www.confluent.io/hub/sanjuthomas/kafka-connect-gcp-bigtable
http://sanjuthomas.com
MIT License
8 stars 8 forks source link

Work with latest Kafka (ie without Zookeeper) #21

Closed ee07dazn closed 2 years ago

ee07dazn commented 2 years ago

Hi Great contribution for the community, thanks! Looking at the prerequisites, it says zookeeper is one of them. So wondering, if the connector can still function without zookeeper. Kind regards Karan

sanjuthomas commented 2 years ago

Karan - thanks for reaching out. It was tested for Kafka version that needs Zookper. I will test the connector for the latest vesion of Confluent Kafka and let you know.

ee07dazn commented 2 years ago

Hi @sanjuthomas

Thanks

I have given this a go and i think i have reached the final step of insertion into BigTable but it fails at that point with the following error:

2022-11-22 09:31:33,080 ERROR [bigtable-connector|task-0] Some mutations failed to apply (com.sanjuthomas.gcp.bigtable.writer.BigtableWriter) [task-thread-bigtable-connector-0]
com.google.cloud.bigtable.data.v2.models.MutateRowsException: Some mutations failed to apply
    at com.google.cloud.bigtable.data.v2.stub.mutaterows.MutateRowsAttemptCallable.handleAttemptSuccess(MutateRowsAttemptCallable.java:290)
    at com.google.cloud.bigtable.data.v2.stub.mutaterows.MutateRowsAttemptCallable.access$000(MutateRowsAttemptCallable.java:87)
    at com.google.cloud.bigtable.data.v2.stub.mutaterows.MutateRowsAttemptCallable$2.apply(MutateRowsAttemptCallable.java:121)
    at com.google.cloud.bigtable.data.v2.stub.mutaterows.MutateRowsAttemptCallable$2.apply(MutateRowsAttemptCallable.java:118)
    at com.google.api.core.ApiFutures$GaxFunctionToGuavaFunction.apply(ApiFutures.java:204)
    at com.google.common.util.concurrent.AbstractTransformFuture$TransformFuture.doTransform(AbstractTransformFuture.java:243)
    at com.google.common.util.concurrent.AbstractTransformFuture$TransformFuture.doTransform(AbstractTransformFuture.java:233)
    at com.google.common.util.concurrent.AbstractTransformFuture.run(AbstractTransformFuture.java:118)
    at com.google.common.util.concurrent.DirectExecutor.execute(DirectExecutor.java:30)
    at com.google.common.util.concurrent.AbstractFuture.executeListener(AbstractFuture.java:1138)
    at com.google.common.util.concurrent.AbstractFuture.complete(AbstractFuture.java:957)
    at com.google.common.util.concurrent.AbstractFuture.set(AbstractFuture.java:726)
    at com.google.common.util.concurrent.AbstractCatchingFuture.run(AbstractCatchingFuture.java:93)
    at com.google.common.util.concurrent.DirectExecutor.execute(DirectExecutor.java:30)
    at com.google.common.util.concurrent.AbstractFuture.executeListener(AbstractFuture.java:1138)
    at com.google.common.util.concurrent.AbstractFuture.complete(AbstractFuture.java:957)
    at com.google.common.util.concurrent.AbstractFuture.set(AbstractFuture.java:726)
    at com.google.api.core.AbstractApiFuture$InternalSettableFuture.set(AbstractApiFuture.java:90)
    at com.google.api.core.AbstractApiFuture.set(AbstractApiFuture.java:73)
    at com.google.api.gax.rpc.SpoolingResponseObserver$MyFuture.set(SpoolingResponseObserver.java:84)
    at com.google.api.gax.rpc.SpoolingResponseObserver.onCompleteImpl(SpoolingResponseObserver.java:72)
    at com.google.api.gax.rpc.StateCheckingResponseObserver.onComplete(StateCheckingResponseObserver.java:74)
    at com.google.api.gax.tracing.TracedResponseObserver.onComplete(TracedResponseObserver.java:109)
    at com.google.api.gax.grpc.ExceptionResponseObserver.onCompleteImpl(ExceptionResponseObserver.java:89)
    at com.google.api.gax.rpc.StateCheckingResponseObserver.onComplete(StateCheckingResponseObserver.java:74)
    at com.google.api.gax.grpc.GrpcDirectStreamController$ResponseObserverAdapter.onClose(GrpcDirectStreamController.java:144)
    at io.grpc.PartialForwardingClientCallListener.onClose(PartialForwardingClientCallListener.java:39)
    at io.grpc.ForwardingClientCallListener.onClose(ForwardingClientCallListener.java:23)
    at io.grpc.ForwardingClientCallListener$SimpleForwardingClientCallListener.onClose(ForwardingClientCallListener.java:40)
    at io.grpc.internal.CensusStatsModule$StatsClientInterceptor$1$1.onClose(CensusStatsModule.java:700)
    at io.grpc.PartialForwardingClientCallListener.onClose(PartialForwardingClientCallListener.java:39)
    at io.grpc.ForwardingClientCallListener.onClose(ForwardingClientCallListener.java:23)
    at io.grpc.ForwardingClientCallListener$SimpleForwardingClientCallListener.onClose(ForwardingClientCallListener.java:40)
    at io.grpc.internal.CensusTracingModule$TracingClientInterceptor$1$1.onClose(CensusTracingModule.java:399)
    at io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:510)
    at io.grpc.internal.ClientCallImpl.access$300(ClientCallImpl.java:66)
    at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl.close(ClientCallImpl.java:630)
    at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl.access$700(ClientCallImpl.java:518)
    at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInternal(ClientCallImpl.java:692)
    at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:681)
    at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
    at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:829)
    Suppressed: com.google.api.gax.rpc.AsyncTaskException: Asynchronous task failed
        at com.google.api.gax.rpc.ApiExceptions.callAndTranslateApiException(ApiExceptions.java:57)
        at com.google.cloud.bigtable.data.v2.BigtableDataClient.bulkMutateRows(BigtableDataClient.java:921)
        at com.sanjuthomas.gcp.bigtable.writer.BigtableWriter.execute(BigtableWriter.java:103)
        at com.sanjuthomas.gcp.bigtable.writer.BigtableWriter.flush(BigtableWriter.java:81)
        at com.sanjuthomas.gcp.bigtable.writer.BigtableWriter.flush(BigtableWriter.java:65)
        at com.sanjuthomas.gcp.bigtable.sink.BigtableSinkTask.lambda$put$0(BigtableSinkTask.java:60)
        at java.base/java.lang.Iterable.forEach(Iterable.java:75)
        at com.sanjuthomas.gcp.bigtable.sink.BigtableSinkTask.put(BigtableSinkTask.java:56)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:581)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:333)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:234)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        ... 3 more
2022-11-22 09:31:33,114 ERROR [bigtable-connector|task-0] Failed to save the batch to Bigtable and batch size was 1 (com.sanjuthomas.gcp.bigtable.sink.BigtableSinkTask) [task-thread-bigtable-connector-0]
com.sanjuthomas.gcp.bigtable.exception.BigtableWriteFailedException: Failed to save the batch to Bigtable and batch size was 1
    at com.sanjuthomas.gcp.bigtable.writer.BigtableWriter.execute(BigtableWriter.java:108)
    at com.sanjuthomas.gcp.bigtable.writer.BigtableWriter.flush(BigtableWriter.java:81)
    at com.sanjuthomas.gcp.bigtable.writer.BigtableWriter.flush(BigtableWriter.java:65)
    at com.sanjuthomas.gcp.bigtable.sink.BigtableSinkTask.lambda$put$0(BigtableSinkTask.java:60)
    at java.base/java.lang.Iterable.forEach(Iterable.java:75)
    at com.sanjuthomas.gcp.bigtable.sink.BigtableSinkTask.put(BigtableSinkTask.java:56)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:581)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:333)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:234)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:829)
2022-11-22 09:31:33,114 ERROR [bigtable-connector|task-0] WorkerSinkTask{id=bigtable-connector-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. Error: Failed to save the batch to Bigtable and batch size was 1 (org.apache.kafka.connect.runtime.WorkerSinkTask) [task-thread-bigtable-connector-0]
com.sanjuthomas.gcp.bigtable.exception.BigtableWriteFailedException: Failed to save the batch to Bigtable and batch size was 1
    at com.sanjuthomas.gcp.bigtable.writer.BigtableWriter.execute(BigtableWriter.java:108)
    at com.sanjuthomas.gcp.bigtable.writer.BigtableWriter.flush(BigtableWriter.java:81)
    at com.sanjuthomas.gcp.bigtable.writer.BigtableWriter.flush(BigtableWriter.java:65)
    at com.sanjuthomas.gcp.bigtable.sink.BigtableSinkTask.lambda$put$0(BigtableSinkTask.java:60)
    at java.base/java.lang.Iterable.forEach(Iterable.java:75)
    at com.sanjuthomas.gcp.bigtable.sink.BigtableSinkTask.put(BigtableSinkTask.java:56)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:581)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:333)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:234)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:829)
2022-11-22 09:31:33,115 ERROR [bigtable-connector|task-0] WorkerSinkTask{id=bigtable-connector-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask) [task-thread-bigtable-connector-0]
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
    at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:611)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:333)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:234)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: com.sanjuthomas.gcp.bigtable.exception.BigtableWriteFailedException: Failed to save the batch to Bigtable and batch size was 1
    at com.sanjuthomas.gcp.bigtable.writer.BigtableWriter.execute(BigtableWriter.java:108)
    at com.sanjuthomas.gcp.bigtable.writer.BigtableWriter.flush(BigtableWriter.java:81)
    at com.sanjuthomas.gcp.bigtable.writer.BigtableWriter.flush(BigtableWriter.java:65)
    at com.sanjuthomas.gcp.bigtable.sink.BigtableSinkTask.lambda$put$0(BigtableSinkTask.java:60)
    at java.base/java.lang.Iterable.forEach(Iterable.java:75)
    at com.sanjuthomas.gcp.bigtable.sink.BigtableSinkTask.put(BigtableSinkTask.java:56)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:581)
    ... 10 more

My value message in Kafka topic is

Screenshot 2022-11-22 at 09 35 54

My config.files.location points to the test-bigtable1.yml, which has the following setting

keyFile: /opt/kafka/external-configuration/bq-connector/kafka-bq-sa.json
project: ****
instance: ***
table: ***
families:
- data
familyQualifiers:
- data:
  - name

The error actually gives no good pointer but just wanted to share and see if you spot something obviously wrong here.

Any input will be much appreciated 🙏

sanjuthomas commented 2 years ago

Can you check if you can write to the table using test code from your local without Kafka/Connect?

Ex: https://github.com/sanjuthomas/kafka-connect-gcp-bigtable/wiki/How-to-produce-test-messages%3F

https://github.com/sanjuthomas/kafka-connect-gcp-bigtable/blob/master/src/test/java/com/sanjuthomas/kafka/utils/MessageProducer.java

ee07dazn commented 2 years ago

Thanks for your response.

I have got it working now using redpanda (which is kafka compatible streaming platform but without zookeeper)

Thank you for your contribution 🙏