a0x8o / kafka

A high-throughput, distributed, publish-subscribe messaging system
Apache License 2.0
66 stars 171 forks source link

Kafka Connect | Inconsistent behaviour with REST API in Distributed Mode #50

Open 1riatsila1 opened 1 year ago

1riatsila1 commented 1 year ago

When running the following request against the kafka connect cluster in distributed mode (2 worker pods) in k8s, the following can be observed:

curl http://kafka-connect:8083/connectors/test -X DELETE
{"error_code":500,"message":"Error trying to forward REST request: Connector test not found"}

curl http://kafka-connect:8083/connectors/test -X DELETE
{"error_code":500,"message":"Error trying to forward REST request: Connector test not found"}

curl http://kafka-connect:8083/connectors/test -X DELETE
{"error_code":404,"message":"Connector test not found"}

Where the connector called test does not exist.

The requests are running in quick succession. As you can see, occasionally the server responds with a 500 and other times, a 404. The working theory is that occasionally, the follower worker gets the request, forwards the request to the leader, receives a 404 back and then returns a 500. Is this desired behaviour, or should the follower simply be echoing the leader's response back? I would expect to see a 404 in all cases.

I am not sure if this is related, but the behaviour was first noticed after upgrading to cp-kafka-connect 7.2.1

moisescastellano commented 1 year ago

Hi: we have a similar issue :

Most of it has been temporarily solved by downloading the replicas of the Kafka cluster (Kafka Connect pods) from 3 to 1 - so this seems like a problem with balancing - however this is not ok for Prod environment and we are still searching for the root cause.

Our architecture needs a Kafka connector that provisions a topic when there are changes in Cloudant, we had also to (in real time) create a connector when the user logs in for the first time.

We have been able to create the Kafka Connectors via the Kafka Connect REST API (https://docs.confluent.io/platform/current/connect/references/restapi.html), however it randomly fails and provides "java.net.SocketTimeoutException: Connect Timeout".

Even worst, after the first fail it behaves badly providing the SocketTimeoutException to subsequent POST creation requests, even when Kafka Connect API is up and responding ok to e.g. GET connectors requests.

We have been googling this issue ("SocketTimeoutException kafka connect api") and a few people got it but a clear solution is not available beyond obvious ones like changing timeouts.

The POST request to the Kafka Connect API: { "name": "xxxxxxxxxxx", "config": { "connector.class": "com.ibm.cloudant.kafka.connect.CloudantSourceConnector", "cloudant.db.url": "https://xxxxxxxxxx.cloudantnosqldb.appdomain.cloud/xxxxxxxxx", "cloudant.db.username": "xxxxxxxxxxxx", "cloudant.db.password": "xxxxxxxxxxxx", "topics": "mapis-dev-cloudant-test-provisioning-connector", "connection.timeout.ms": 5000, "read.timeout.ms": 5000 } }

Our code is just creating the previous request:

         URL apiUrl = new URL(kafkaConnectApiUrl.get());          HttpsURLConnection http = (HttpsURLConnection)apiUrl.openConnection();          http.setRequestMethod("POST");          http.setDoOutput(true);          http.setRequestProperty("Accept", "application/json");          http.setRequestProperty("Content-Type", "application/json");

         String data = "{\n \"name\": \"" + dbName + "\""                      + ",\n \"config\": {"                            + "\n \t \"connector.class\": \"com.ibm.cloudant.kafka.connect.CloudantSourceConnector\""                            + ",\n \t \"cloudant.db.url\": \"" + cloudantUrl.get() + "/" + dbName + "\""                            + ",\n \t \"cloudant.db.username\": \"" + username.get() + "\""                            + ",\n \t \"cloudant.db.password\": \"" + password.get() + "\""                            + ",\n \t \"topics\": \"" + kafkaTopic.get() + "\""                            + ",\n \t \"connection.timeout.ms\": 5000"                            + ",\n \t \"read.timeout.ms\": 5000"                      + "\n \t}"                      + "\n}"                      ;                 byte[] out = data.getBytes(StandardCharsets.UTF_8);                 OutputStream stream = http.getOutputStream();          stream.write(out);          stream.flush();          stream.close();      int responseCode = http.getResponseCode();          http.disconnect();

joelmin93 commented 2 weeks ago

Hi @moisescastellano we have the same issue with Connect Timeout. Have you figured out a solution for this?