googleapis / java-pubsub-group-kafka-connector

Apache License 2.0
41 stars 18 forks source link

Connector `cps.endpoint` config does not target emulator #290

Open Axelcouty opened 7 months ago

Axelcouty commented 7 months ago

Hi,

I'm trying to use this connector locally against the pubsub emulator. google-cloud-cli:454.0.0-emulators

I am able to create a topic and a subscription with java client andhile I could make the connector work with existing topic & subscription for my true, existing gcloud project I'm not able to do so with the emulator.

Both the emulator & kafka-connect are started as part of a docker-compose:

I tried with this source connector example:

{
  "cps.subscription": "vmchIIQhNG",
  "metadata.publish": "false",
  "name": "test-connector-5800cc7b-a616-4bec-a7b5-b4d5128dbc3b",
  "kafka.partition.count": "1",
  "cps.endpoint": "pubsub:8085",
  "key.converter": "org.apache.kafka.connect.storage.StringConverter",
  "value.converter": "org.apache.kafka.connect.json.JsonConverter",
  "cps.project": "my-project",
  "kafka.topic": "test-363210ef-c078-462b-8c46-bc9213a03e2e",
  "kafka.record.headers": "true",
  "headers.publish": "true",
  "connector.class": "com.google.pubsub.kafka.source.CloudPubSubSourceConnector"
}

When I have a look at the connector's state I see the following exception raised:

rg.apache.kafka.connect.errors.ConnectException: Error verifying the subscription vmchIIQhNG for project eixample-dev
    at com.google.pubsub.kafka.source.CloudPubSubSourceConnector.verifySubscription(CloudPubSubSourceConnector.java:314)
    at com.google.pubsub.kafka.source.CloudPubSubSourceConnector.start(CloudPubSubSourceConnector.java:146)
    at org.apache.kafka.connect.runtime.WorkerConnector.doStart(WorkerConnector.java:184)
    at org.apache.kafka.connect.runtime.WorkerConnector.start(WorkerConnector.java:209)
    at org.apache.kafka.connect.runtime.WorkerConnector.doTransitionTo(WorkerConnector.java:348)
    at org.apache.kafka.connect.runtime.WorkerConnector.doTransitionTo(WorkerConnector.java:331)
    at org.apache.kafka.connect.runtime.WorkerConnector.doRun(WorkerConnector.java:140)
    at org.apache.kafka.connect.runtime.WorkerConnector.run(WorkerConnector.java:117)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: com.google.api.gax.rpc.NotFoundException: io.grpc.StatusRuntimeException: NOT_FOUND: Resource not found (resource=vmchIIQhNG).
    at com.google.api.gax.rpc.ApiExceptionFactory.createException(ApiExceptionFactory.java:90)
    at com.google.api.gax.rpc.ApiExceptionFactory.createException(ApiExceptionFactory.java:41)
    at com.google.api.gax.grpc.GrpcApiExceptionFactory.create(GrpcApiExceptionFactory.java:86)
    at com.google.api.gax.grpc.GrpcApiExceptionFactory.create(GrpcApiExceptionFactory.java:66)
    at com.google.api.gax.grpc.GrpcExceptionCallable$ExceptionTransformingFuture.onFailure(GrpcExceptionCallable.java:97)
    at com.google.api.core.ApiFutures$1.onFailure(ApiFutures.java:67)
    at com.google.common.util.concurrent.Futures$CallbackListener.run(Futures.java:1132)
    at com.google.common.util.concurrent.DirectExecutor.execute(DirectExecutor.java:31)
    at com.google.common.util.concurrent.AbstractFuture.executeListener(AbstractFuture.java:1270)
    at com.google.common.util.concurrent.AbstractFuture.complete(AbstractFuture.java:1038)
    at com.google.common.util.concurrent.AbstractFuture.setException(AbstractFuture.java:808)
    at io.grpc.stub.ClientCalls$GrpcFuture.setException(ClientCalls.java:574)
    at io.grpc.stub.ClientCalls$UnaryStreamToFuture.onClose(ClientCalls.java:544)
    at io.grpc.PartialForwardingClientCallListener.onClose(PartialForwardingClientCallListener.java:39)
    at io.grpc.ForwardingClientCallListener.onClose(ForwardingClientCallListener.java:23)
    at io.grpc.ForwardingClientCallListener$SimpleForwardingClientCallListener.onClose(ForwardingClientCallListener.java:40)
    at com.google.api.gax.grpc.ChannelPool$ReleasingClientCall$1.onClose(ChannelPool.java:541)
    at io.grpc.internal.DelayedClientCall$DelayedListener$3.run(DelayedClientCall.java:489)
    at io.grpc.internal.DelayedClientCall$DelayedListener.delayOrExecute(DelayedClientCall.java:453)
    at io.grpc.internal.DelayedClientCall$DelayedListener.onClose(DelayedClientCall.java:486)
    at io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:576)
    at io.grpc.internal.ClientCallImpl.access$300(ClientCallImpl.java:70)
    at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInternal(ClientCallImpl.java:757)
    at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:736)
    at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
    at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133)
    ... 3 more
    Suppressed: com.google.api.gax.rpc.AsyncTaskException: Asynchronous task failed
        at com.google.api.gax.rpc.ApiExceptions.callAndTranslateApiException(ApiExceptions.java:57)
        at com.google.api.gax.rpc.UnaryCallable.call(UnaryCallable.java:112)
        at com.google.pubsub.kafka.source.CloudPubSubSourceConnector.verifySubscription(CloudPubSubSourceConnector.java:312)
        at com.google.pubsub.kafka.source.CloudPubSubSourceConnector.start(CloudPubSubSourceConnector.java:146)
        at org.apache.kafka.connect.runtime.WorkerConnector.doStart(WorkerConnector.java:184)
        at org.apache.kafka.connect.runtime.WorkerConnector.start(WorkerConnector.java:209)
        at org.apache.kafka.connect.runtime.WorkerConnector.doTransitionTo(WorkerConnector.java:348)
        at org.apache.kafka.connect.runtime.WorkerConnector.doTransitionTo(WorkerConnector.java:331)
        at org.apache.kafka.connect.runtime.WorkerConnector.doRun(WorkerConnector.java:140)
        at org.apache.kafka.connect.runtime.WorkerConnector.run(WorkerConnector.java:117)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        ... 3 more
Caused by: io.grpc.StatusRuntimeException: NOT_FOUND: Resource not found (resource=vmchIIQhNG).
    at io.grpc.Status.asRuntimeException(Status.java:539)
    ... 17 more

It looks to me I'm not able to understand what are the required configuration to be able to hit the local emulator from kafka connect.


Additional infos :

My use case is simply to be able to write tests locally when working on SMT development.

Docker compose, pubsub part:

  pubsub:
    image: gcr.io/google.com/cloudsdktool/google-cloud-cli:454.0.0-emulators
    command: 
      - bash
      - -c
      - "gcloud beta emulators pubsub start --host-port 0.0.0.0:8085"
    ports:
      - "8085:8085"

Thanks for your help.

samarthsingal commented 6 months ago

We had not anticipated the use-case of someone using the pubsub-group-kafka-connector against the CPS emulator. To support this, we will need to change the grpcTransportChannelProvider-based channel construction verifySubscription to respect the configured cps.endpoint and possibly make other changes as well. We can consider this a FR.

is this blocking deployment of the connector for you?

Axelcouty commented 5 months ago

Thanks having a look :eyes:,

It is not blocking our deployment but it does prevent us to avoid creating GCP resources when testing locally or within our CI pipelines. We implements custom SMTs for Kafka Connect and test with different Kafka Connectors, we use testcontainers quite a lot and I was trying to integrate it.

Considering subscriptions share the messages from a topic it can be cumbersome to have to create a new subscriptions or even for developers, and provide tooling around it or process to avoid forgotten GCP created resources.

While we could just use isolated environment with the emulator.