Closed braghettos closed 6 years ago
Unfortunately the info in the log doesn't seem to help much.
A couple of things come mind.
Did you create the topic "wimt_collaudo" before running the connector? Is "http://webservice:8080/count" running and reachabble in your environment?
I'd need more information about how you run it to find the issue?
Thanks,
HI @llofberg, yes the topic was created before running the connector. Even the webservice is reacheable:
root@eb4631b752d0:/# curl -X POST "http://webservice:8080/count" {"timestamp":"2018-06-14T13:33:55.004+0000","status":400,"error":"Bad Request","message":"Required request body is missing: public void hello.GreetingController.count(java.lang.Object)","path":"/count"}
This is the error log:
echo "===> Launching ... "
+ echo '===> Launching ... '
exec /etc/confluent/docker/launch
+ exec /etc/confluent/docker/launch
[2018-06-14 14:21:28,430] DEBUG expanded subtype org.apache.kafka.connect.connector.Task -> org.apache.kafka.connect.sink.SinkTask (org.reflections.Reflections)
[2018-06-14 14:21:28,430] DEBUG expanded subtype org.apache.kafka.connect.connector.Connector -> org.apache.kafka.connect.sink.SinkConnector (org.reflections.Reflections)
[2018-06-14 14:21:28,475] DEBUG expanded subtype org.apache.kafka.connect.connector.Task -> org.apache.kafka.connect.sink.SinkTask (org.reflections.Reflections)
[2018-06-14 14:21:28,475] DEBUG expanded subtype org.apache.kafka.connect.connector.Connector -> org.apache.kafka.connect.sink.SinkConnector (org.reflections.Reflections)
[2018-06-14 14:21:28,494] DEBUG expanded subtype org.apache.kafka.connect.connector.Task -> org.apache.kafka.connect.sink.SinkTask (org.reflections.Reflections)
[2018-06-14 14:21:28,494] DEBUG expanded subtype org.apache.kafka.connect.connector.Connector -> org.apache.kafka.connect.sink.SinkConnector (org.reflections.Reflections)
Jun 14, 2018 2:21:31 PM org.glassfish.jersey.internal.Errors logErrors
WARNING: The following warnings have been detected: WARNING: The (sub)resource method createConnector in org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource contains empty path annotation.
WARNING: The (sub)resource method listConnectors in org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource contains empty path annotation.
WARNING: The (sub)resource method listConnectorPlugins in org.apache.kafka.connect.runtime.rest.resources.ConnectorPluginsResource contains empty path annotation.
WARNING: The (sub)resource method serverInfo in org.apache.kafka.connect.runtime.rest.resources.RootResource contains empty path annotation.
[2018-06-14 14:21:31,347] DEBUG Updating configuration for connector RestSinkConnector configuration: {connector.class=com.tm.kafka.connect.rest.RestSinkConnector, rest.sink.method=POST, rest.sink.payload.converter.schema=true, tasks.max=1, topics=wimt_collaudo, rest.sink.properties=Content-Type:application/json, name=RestSinkConnector, rest.sink.url=http://webservice:8080/count, rest.sink.payload.converter.class=com.tm.kafka.connect.rest.converter.JsonPayloadConverter} (org.apache.kafka.connect.storage.KafkaConfigBackingStore)
[2018-06-14 14:21:31,347] DEBUG Storing new config for task RestSinkConnector-0 this will wait for a commit message before the new config will take effect. New config: {connector.class=com.tm.kafka.connect.rest.RestSinkConnector, task.class=com.tm.kafka.connect.rest.RestSinkTask, rest.sink.method=POST, rest.sink.payload.converter.schema=true, tasks.max=1, topics=wimt_collaudo, rest.sink.properties=Content-Type:application/json, name=RestSinkConnector, rest.sink.url=http://webservice:8080/count, rest.sink.payload.converter.class=com.tm.kafka.connect.rest.converter.JsonPayloadConverter} (org.apache.kafka.connect.storage.KafkaConfigBackingStore)
[2018-06-14 14:21:34,862] DEBUG WorkerConnector{id=RestSinkConnector} Initializing connector RestSinkConnector with config {connector.class=com.tm.kafka.connect.rest.RestSinkConnector, rest.sink.method=POST, rest.sink.payload.converter.schema=true, tasks.max=1, topics=wimt_collaudo, rest.sink.properties=Content-Type:application/json, name=RestSinkConnector, rest.sink.url=http://webservice:8080/count, rest.sink.payload.converter.class=com.tm.kafka.connect.rest.converter.JsonPayloadConverter} (org.apache.kafka.connect.runtime.WorkerConnector)
[2018-06-14 14:21:34,868] DEBUG Added sensor with name connect-sensor-group: sink-task-metrics;connector=RestSinkConnector;task=0;sink-record-read (org.apache.kafka.common.metrics.Metrics)
[2018-06-14 14:21:34,868] DEBUG Added sensor with name connect-sensor-group: sink-task-metrics;connector=RestSinkConnector;task=0;sink-record-send (org.apache.kafka.common.metrics.Metrics)
[2018-06-14 14:21:34,868] DEBUG Added sensor with name connect-sensor-group: sink-task-metrics;connector=RestSinkConnector;task=0;sink-record-active-count (org.apache.kafka.common.metrics.Metrics)
[2018-06-14 14:21:34,868] DEBUG Added sensor with name connect-sensor-group: sink-task-metrics;connector=RestSinkConnector;task=0;partition-count (org.apache.kafka.common.metrics.Metrics)
[2018-06-14 14:21:34,868] DEBUG Added sensor with name connect-sensor-group: sink-task-metrics;connector=RestSinkConnector;task=0;offset-seq-number (org.apache.kafka.common.metrics.Metrics)
[2018-06-14 14:21:34,868] DEBUG Added sensor with name connect-sensor-group: sink-task-metrics;connector=RestSinkConnector;task=0;offset-commit-completion (org.apache.kafka.common.metrics.Metrics)
[2018-06-14 14:21:34,868] DEBUG Added sensor with name connect-sensor-group: sink-task-metrics;connector=RestSinkConnector;task=0;offset-commit-completion-skip (org.apache.kafka.common.metrics.Metrics)
[2018-06-14 14:21:34,868] DEBUG Added sensor with name connect-sensor-group: sink-task-metrics;connector=RestSinkConnector;task=0;put-batch-time (org.apache.kafka.common.metrics.Metrics)
rest.sink.method = POST
rest.sink.payload.converter.class = class com.tm.kafka.connect.rest.converter.JsonPayloadConverter
rest.sink.payload.converter.schema = true
rest.sink.properties = [Content-Type:application/json]
rest.sink.retry.backoff.ms = 5000
rest.sink.url = http://webservice:8080/count
rest.sink.velocity.template = rest.vm
rest.sink.method = POST
rest.sink.payload.converter.class = class com.tm.kafka.connect.rest.converter.JsonPayloadConverter
rest.sink.payload.converter.schema = true
rest.sink.properties = [Content-Type:application/json]
rest.sink.retry.backoff.ms = 5000
rest.sink.url = http://webservice:8080/count
rest.sink.velocity.template = rest.vm
[2018-06-14 14:21:37,961] DEBUG Stopping sink task, setting client to null (com.tm.kafka.connect.rest.RestSinkTask)
[2018-06-14 14:21:37,998] DEBUG Removed sensor with name connect-sensor-group: sink-task-metrics;connector=RestSinkConnector;task=0;partition-count (org.apache.kafka.common.metrics.Metrics)
[2018-06-14 14:21:37,998] DEBUG Removed sensor with name connect-sensor-group: sink-task-metrics;connector=RestSinkConnector;task=0;offset-commit-completion-skip (org.apache.kafka.common.metrics.Metrics)
[2018-06-14 14:21:37,998] DEBUG Removed sensor with name connect-sensor-group: sink-task-metrics;connector=RestSinkConnector;task=0;sink-record-send (org.apache.kafka.common.metrics.Metrics)
[2018-06-14 14:21:37,998] DEBUG Removed sensor with name connect-sensor-group: sink-task-metrics;connector=RestSinkConnector;task=0;sink-record-active-count (org.apache.kafka.common.metrics.Metrics)
[2018-06-14 14:21:37,998] DEBUG Removed sensor with name connect-sensor-group: sink-task-metrics;connector=RestSinkConnector;task=0;offset-commit-completion (org.apache.kafka.common.metrics.Metrics)
[2018-06-14 14:21:37,998] DEBUG Removed sensor with name connect-sensor-group: sink-task-metrics;connector=RestSinkConnector;task=0;offset-seq-number (org.apache.kafka.common.metrics.Metrics)
[2018-06-14 14:21:37,998] DEBUG Removed sensor with name connect-sensor-group: sink-task-metrics;connector=RestSinkConnector;task=0;put-batch-time (org.apache.kafka.common.metrics.Metrics)
[2018-06-14 14:21:37,998] DEBUG Removed sensor with name connect-sensor-group: sink-task-metrics;connector=RestSinkConnector;task=0;sink-record-read (org.apache.kafka.common.metrics.Metrics)
This is how I launch the cointainer:
docker run -d \
--name=kafka-connect \
--net=confluent \
-e CONNECT_PRODUCER_INTERCEPTOR_CLASSES=io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor \
-e CONNECT_CONSUMER_INTERCEPTOR_CLASSES=io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor \
-e CONNECT_BOOTSTRAP_SERVERS=kafka:9092 \
-e CONNECT_REST_PORT=8083 \
-e CONNECT_GROUP_ID="docker-connect-group" \
-e CONNECT_CONFIG_STORAGE_TOPIC="docker-connect-configs" \
-e CONNECT_OFFSET_STORAGE_TOPIC="docker-connect-offsets" \
-e CONNECT_STATUS_STORAGE_TOPIC="docker-connect-status" \
-e CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR=1 \
-e CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR=1 \
-e CONNECT_STATUS_STORAGE_REPLICATION_FACTOR=1 \
-e CONNECT_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
-e CONNECT_VALUE_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
-e CONNECT_INTERNAL_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
-e CONNECT_INTERNAL_VALUE_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
-e CONNECT_REST_ADVERTISED_HOST_NAME="kafka-connect" \
-e CONNECT_LOG4J_ROOT_LOGLEVEL=DEBUG \
-e CONNECT_LOG4J_LOGGERS=org.reflections=DEBUG \
-e CONNECT_PLUGIN_PATH=/jars \
-e CONNECT_REST_HOST_NAME="kafka-connect" \
-v /home/U370780/kafka-connect-rest/target:/jars \
-v /home/U370780/kafka-connect-rest/examples/spring/config:/config \
confluentinc/cp-kafka-connect:4.1.0
Did you have matching Kafka version (4.1.0) in pom.xml when you built the connector?
Hi @llofberg , in the default pom.xml the kafka version is 1.1.0 and confluent.version is 4.1.1, which property should I change?
I found the error in the task of the connector:
root@03a30e469d6d:/# curl 10.0.13.86:8083/connectors/RestSinkConnector/status {"name":"RestSinkConnector","connector":{"state":"RUNNING","worker_id":"kafka-connect:8083"},"tasks":[{"state":"FAILED","trace":"org.apache.kafka.connect.errors.DataException: JsonConverter with schemas.enable requires \"schema\" and \"payload\" fields and may not contain additional fields. If you are trying to deserialize plain JSON data, set schemas.enable=false in your converter configuration.\n\tat org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:338)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:467)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:301)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:205)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:173)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)\n\tat java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\n\tat java.util.concurrent.FutureTask.run(FutureTask.java:266)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)\n\tat java.lang.Thread.run(Thread.java:745)\n","id":0,"worker_id":"kafka-connect:8083"}],"type":"sink"}
Hi @llofberg ,
I'm trying to use your sink connector but I'm facing this issue and I'm unable to find out what is the issue:
Could you please help me understanding what's wrong in my configuration?
Many thanks,
Diego