Open mgirard772 opened 4 years ago
Kafka version 2.2.1 (Amazon MSK)
vs
"broker": "localhost:9092",
This doesn't add up I think, are you sure you're looking at the right broker?
Kafka version 2.2.1 (Amazon MSK)
vs
"broker": "localhost:9092",
This doesn't add up I think, are you sure you're looking at the right broker?
I omitted the actual host for security
Are you consistently seeing this error or intermittently? If it's constant, that would point to you not having configured ACLs correctly, so that your producer doesn't have the right to access the topic, and as such wouldn't be able to produce to it.
Ubuntu 14.04.6 LTS
As an aside, you should really upgrade that. 14.04 has been EOL for over a year now. 😅
Everything seems to work fine up to a point, then an onslaught of these errors start coming (and don't stop) even though the metadata says everything is fine. The kicker is that the errors are for topics we've confirmed exist and are writable by the producer. Something changes somewhere down the line.
Things I've tried so far, but to no avail:
Things I'm still looking into:
JavaCompatiblePartitioner
Thanks for bringing up the Ubuntu version, I'll have to mention it to our SRE team.
Any thoughts on what I might be missing? I'd really prefer to stick with our current implementation, but I've been trying to debug this for 2 weeks now and starting to get pretty frustrated.
The key and the partitioner should be innocent in this. Regardless of which partition a message is assigned to, the mismatch is between the broker address and the resolved IP, or the metadata response and reality.
Under the hood, KafkaJS uses net
/tls.connect
in order to connect to the brokers. DNS resolution is done via dns.lookup
by default (this is all happening in Node, it's not something that we explicitly configure), which in turn just delegates to the OS. If your OS was caching DNS entries, that could mean that we connect to the wrong broker if the IPs change. This would explain why this suddenly happens at some point and doesn't stop. What's unlikely about it though is that clearly a broker is responding, just not the right one. You could verify this theory by implementing a custom socket factory (search in the docs for this. There's an example in there) which uses dns.resolve
instead of dns.lookup
- this is an option you can pass into net
/tls.connect
. dns.resolve
does not use the same mechanism as lookup, and should bypass whatever caching the OS may have in place. At the very least you could log the hostname and the resolved IP to see if this is changing over time.
If that's not the issue, then it would have to be something with the metadata response being incorrect. We have seen some odd stuff from MSK in the past, but I can't remember if it was related.
Another thing you could do, just to rule it out, is to try using the beta version of KafkaJS instead of 1.12.0. It's possible there was some related bug fixed in there, but I would have to go through the commits to be sure.
Does the broker list have to be in a particular order? For example, does broker 1 have the be the first in the list, or are host names/IP's used to match appropriately based on what's in the metadata? The metadata responses I've seen simply give numbers.
I've noticed that the AWS Boto3 API in Python gives the brokers hostnames out of order, 3,2,1 or 3,1,2, while the AWS console gives them as 1,2,3.
No, order is irrelevant. It's just a mapping between node ids and hostnames.
Ditching MSK and using another Kafka implementation
We have seen some odd stuff from MSK in the past, but I can't remember if it was related.
Mostly as a data point: We're running on MSK (right now Kafka 2.4.1), and have been since the preview. Sometimes there are glitches/oddnesses, but in general MSK does work and we haven't seen this particular problem.
Maybe this is also related to this issue: #803
I experienced the same problems and errors as described by the author here.
Interesting.
We ended up switching over to node-rdkafka and stopped experiencing the issue. There must be some issue in the way kafkajs
communicates with the brokers that's causing it to throw these errors unnecessarily.
I've a similar issue using NestJS with Kafka Confluent Cloud, when the topic doesn't exists it fails even though I have allowAutoTopicCreation set to true in Consumer and Producer:
[NestWinston] Error 2021-6-18 16:06:17 [ClientKafka] ERROR [Connection] Response Metadata(key: 3, version: 6) {"timestamp":"2021-06-18T20:06:17.495Z","logger":"kafkajs","broker":"pkc-4nym6.us-east-1.aws.confluent.cloud:9092","clientId":"nestjs-consumer-client","error":"This server does not host this topic-partition","correlationId":24,"size":2569} - {"stack":[""]}
Client Config
{
transport: Transport.KAFKA,
options: {
client: {
brokers,
sasl,
ssl,
},
consumer: {
groupId: 'consumer',
allowAutoTopicCreation: true,
}
},
}
@titobundy check if auto.create.topics.enable config is set to true, I've got that issue on AWS MSK
I've use Kafka Confluent Cloud free version, it seems that this version Confluent Control Center doesn't allow access to all options to configure the broker, but in the docs it indicates that by default is set to true.
When you use createTopics method , you should try to add { topic: 'topic_name', numPartitions : 1, replicationFactor : 3 }
I get the error message when I try to run
const [createError, createResult] = await to( admin.createTopics({
validateOnly: true,
waitForLeaders: true,
timeout: 5000,
topics: [
{
topic: '123',
}
],
}) );
@joerg-walter-de thanks the waitForLeaders: true,
solved the error type: 'NOT_CONTROLLER'
in some cases for me :+1:
That is so strange, I'm trying to run the kafkaJS integration tests against a different kafka (not the kafka configured by the kafkaJS integration tests), and when I use waitForLeaders: true
I can get the src/admin/__tests__/createTopics.spec.js
tests working but get NOT_CONTROLLER
error on src/admin/__tests__/deleteTopics.spec.js
tests. When I change waitForLeaders: false
I got the same error on the createTopics.spec.js
tests.
Running 2.0.1 here and still seeing this bug running against Confluent Cloud. It doesn't matter how many partitions the topic has, tried with 1, 3 and 6. Is there a workaround for this yet?
With the following code:
const consumeMessage = async ({ topic, partition, message, heartbeat }) => {
console.log('Received message', topic, partition, message, heartbeat)
return processEntry(message.key.toString(), JSON.parse(message.value.toString()))
}
const startConsumer = async () => {
const consumer = kafka.consumer({ groupId: process.env.KAFKA_GROUP_ID })
await consumer.connect()
await consumer.subscribe({ topics: [process.env.KAFKA_TOPIC] })
return await consumer.run({ eachMessage: consumeMessage })
}
I never see my console.log. I only see the following logged, every time a message is published on that topic:
Got request 5e39bdb0-c9ec-11ec-8a73-cd9f4c2cfe13 { ts: 1653574734217 }
{"level":"ERROR","timestamp":"2022-06-01T07:54:50.826Z","logger":"kafkajs","message":"[Connection] Response Metadata(key: 3, version: 6)","broker":"pkc-4r297.europe-west1.gcp.confluent.cloud:9092","clientId":"gtm-adapter","error":"This server does not host this topic-partition","correlationId":3,"size":2345}
{"level":"ERROR","timestamp":"2022-06-01T07:54:51.109Z","logger":"kafkajs","message":"[Connection] Response Metadata(key: 3, version: 6)","broker":"pkc-4r297.europe-west1.gcp.confluent.cloud:9092","clientId":"gtm-adapter","error":"This server does not host this topic-partition","correlationId":4,"size":2345}
{"level":"ERROR","timestamp":"2022-06-01T07:54:51.582Z","logger":"kafkajs","message":"[Connection] Response Metadata(key: 3, version: 6)","broker":"pkc-4r297.europe-west1.gcp.confluent.cloud:9092","clientId":"gtm-adapter","error":"This server does not host this topic-partition","correlationId":5,"size":2345}
{"level":"ERROR","timestamp":"2022-06-01T07:54:52.364Z","logger":"kafkajs","message":"[Connection] Response Metadata(key: 3, version: 6)","broker":"pkc-4r297.europe-west1.gcp.confluent.cloud:9092","clientId":"gtm-adapter","error":"This server does not host this topic-partition","correlationId":6,"size":2345}
{"level":"ERROR","timestamp":"2022-06-01T07:54:54.193Z","logger":"kafkajs","message":"[Connection] Response Metadata(key: 3, version: 6)","broker":"pkc-4r297.europe-west1.gcp.confluent.cloud:9092","clientId":"gtm-adapter","error":"This server does not host this topic-partition","correlationId":7,"size":2345}
{"level":"ERROR","timestamp":"2022-06-01T07:54:57.138Z","logger":"kafkajs","message":"[Connection] Response Metadata(key: 3, version: 6)","broker":"pkc-4r297.europe-west1.gcp.confluent.cloud:9092","clientId":"gtm-adapter","error":"This server does not host this topic-partition","correlationId":8,"size":2345}
{"level":"ERROR","timestamp":"2022-06-01T07:55:00.023Z","logger":"kafkajs","message":"[Consumer] Crash: KafkaJSNonRetriableError: This server does not host this topic-partition","groupId":"alex-consumer","stack":"KafkaJSNonRetriableError: This server does not host this topic-partition\n at /Users/alex/Projects/revend/gtmadapter-node/node_modules/kafkajs/src/retry/index.js:55:18\n at runMicrotasks (<anonymous>)\n at processTicksAndRejections (node:internal/process/task_queues:96:5)"}
{"level":"INFO","timestamp":"2022-06-01T07:55:00.091Z","logger":"kafkajs","message":"[Consumer] Stopped","groupId":"alex-consumer"}
Is there any settings on the topic or broker that I need to change in conflunt cloud in order to make it compatible with this library?
Thanks in advance!
+1 facing the same issue
I had the same issue. There is a bug in refreshMetadata. It is not able to handle external (outside of kafkajs) removal of the topic properly.
Problem is related to refreshMetadata code which is trying to refresh metadata for each topic that is in this.targetTopics Set (from admin/cluster/index.js)
Easy way to reproduce this bug:
So if one of the topics from this.targetTopics was externally removed from kafka (without use of kafkajs) - you will not be able to perform metadata refresh for any other topic and it will not be able to recover from this error.
"error":"This server does not host this topic-partition"
I am experiencing this issue with NestJS with the subscribeToResponseOf() function even though I know the topic exists... I have search everywhere for a resolution, and tried many code changes.
Any guidance from anyone on how to fix this issue? Is it simply a Kafka BUG??
Update from my side: my issue was that we had the same broker for prod and dev, and the app instances were using the same group id (e.g. app dev instance connecting to dev_
topics, app prod instance connecting to prod_
topics).
It all started to working fine when we added the env to group id.
Hope it helps someone.
We have also faced similar issue: Steps to reproduce:
"This server does not host this topic-partition"" "originalError\":{\"name\":\"KafkaJSProtocolError\",\"retriable\":true,\"type\":\"UNKNOWN_TOPIC_OR_PARTITION\",\"code\":3}
Library should not fail to publish data to an EXISTING topic even if there is an attempt to publish data to a NON EXISTING topic in some other part of the code.
I'm seeing the same issue as @vadiml. We use Kafka topics as webhook queues, along with a permanent topic for create/update/delete of the webhooks. When a webhook is deleted, if we delete the webhook topic, there's a race between the queue readers unsubscribing due to the webhook delete message and hitting this issue. To be clear, we do use kafkajs to delete the topic, but it's happening in another process. It would also be a big help if the error had more context, like which topic-partition is not found.
+1 I'm having the same experience when trying to delete topics
Oddly I'm having the same issue with confluentinc/cp-kafka:7.0.4
but if I try (and fail) to delete the topic before disconnecting, when I reconnect and list topics the topic... was somehow created (and accepts messages).
There seems to be some sort of latency issue or the promise returns before the actual creation process is complete because any subsequent logic that expects the topics to be there fails, even with long (multi-second) sleeps beforehand.
If I let the process end the connection and restart however, the topic is there successfully created the next time I run admin.listTopics()
.
I'm having the same issue. I'm running MSK Serverless kafka in AWS. When destroying our AWS Cloudformation stack, we have a lambda that get called to delete all known topics. These deletion requests are run in parallel. Very usually one out of 5 will fail with this exception. Rerunning the deletion makes the topic get deleted properly. I suspect for at least my case, the problem is related to the parallel deletion.
having the same issue, running NestJS with confluent Kafka, it works if I run it in a sample file but has issues when running on NestJS.
@tekpriest Hi I have the same issue running Kafka on NestJs. Happens only in the stage/prod environment. When I use local Kafka this issue does not exist.
@tekpriest Hi I have the same issue running Kafka on NestJs. Happens only in the stage/prod environment. When I use local Kafka this issue does not exist.
Yeah running locally doesn't cause errors, but I have been having issues running kafka on docker, anyone got a reference?
I had the same issue and I hope this will help to fix it or maybe get more clear with the error message. When working on a cluster environment with a proxy gate: when try to subscribe topics that are not created in the cluster this error is occurred (This server does not host this topic-partition). *Side Note -> [auto.create.topics.enable is set to true still not solving this]
2 options solved this error:
Define the brokers property when creating new Kafka instance directly with one of the brokers IP (one only is enough) And than he has no problem with subscribing none existing topics.
Creating Kafka instance with proxy gate and not subscribe to none existing topics stop this error.
Hope this will help someone.
have the same issue:
KafkaJSNonRetriableError
Caused by: KafkaJSProtocolError: This server does not host this topic-partition
at createErrorFromCode (node_modules/kafkajs/src/protocol/error.js:581:10)
at Object.parse (node_modules/kafkajs/src/protocol/requests/metadata/v0/response.js:56:11)
at Connection.send (node_modules/kafkajs/src/network/connection.js:333:35)
at Broker.[private:Broker:sendRequest] (node_modules/kafkajs/src/broker/index.js:947:14)
at Broker.metadata (node_modules/kafkajs/src/broker/index.js:225:12)
at node_modules/kafkajs/src/cluster/brokerPool.js:161:25
at Cluster.refreshMetadata (node_modules/kafkajs/src/cluster/index.js:135:5)
at Cluster.addMultipleTargetTopics (node_modules/kafkajs/src/cluster/index.js:193:11)
at node_modules/kafkajs/src/admin/index.js:277:9
kafkajs : 1.16.0 kafka: 3.3.1
I had the same issue. There is a bug in refreshMetadata. It is not able to handle external (outside of kafkajs) removal of the topic properly.
Problem is related to refreshMetadata code which is trying to refresh metadata for each topic that is in this.targetTopics Set (from admin/cluster/index.js)
Easy way to reproduce this bug:
1. create topic1 and topic2 2. get offsets for topic1 and topic2 with kafkajs 3. manually delete topic1 without use of kafkajs 4. try to get offset for topic2 with kafkajs. It will fail with "This server does not host this topic-partition" exception even if topic2 exists and we only deleted topic1.
So if one of the topics from this.targetTopics was externally removed from kafka (without use of kafkajs) - you will not be able to perform metadata refresh for any other topic and it will not be able to recover from this error.
Same issue here. Topics created are removed by a different kafkajs instance. After that, the only workaround I came up with is, to restart the creator service.
Installation:
If you create a new topic with and start a consumer without waiting for producer to finish then
'KafkaJSProtocolError: This server does not host this topic-partition'
will be produced and consumer won't subscribe (say, on nodejs instance n1). Topic is created, so running the same again but from a different nodejs instance (say, n2) will work (producer in n2 will queue a message, consumer in n2 will read it).
To fix it, make sure to await
for the producer. Change:
producer.send({
topic: domainId, messages: [
{value: JSON.stringify(msgBody)},
],
});
await consumer.connect();
await consumer.subscribe({topic: topic, fromBeginning: true});
await consumer.run({
eachMessage: consumerCb
});
to:
await producer.send({
topic: domainId, messages: [
{value: JSON.stringify(msgBody)},
],
});
await consumer.connect();
await consumer.subscribe({topic: topic, fromBeginning: true});
await consumer.run({
eachMessage: consumerCb
});
Was anyone able to figure out a temporary fix until the above PR is merged?
The PR above didn't fix the issue for me. I added it manually into the lib and still face the issue This server is not the leader for that topic-partition
I sort of solved this problem for my case:
My scenario is this: I use nestjs with kafka to consume messages from topics that does not exists and I see that the service fails to run because of KafkaJSProtocolError: This server does not host this topic-partition
but the service is able to create the topics anyway. This happens hen I run kafka using the confluent platform images, using the version with kraft without zookeeper:
broker1:
image: confluentinc/cp-kafka:7.4.0
hostname: broker1
container_name: broker1
depends_on:
- controller
ports:
- "9092:9092"
- "9101:9101"
environment:
KAFKA_NODE_ID: 1
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT'
KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://broker1:29092,PLAINTEXT_HOST://localhost:9092'
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_JMX_PORT: 9101
KAFKA_JMX_HOSTNAME: localhost
KAFKA_PROCESS_ROLES: 'broker'
KAFKA_CONTROLLER_QUORUM_VOTERS: '2@controller:9093'
KAFKA_LISTENERS: 'PLAINTEXT://broker1:29092,PLAINTEXT_HOST://0.0.0.0:9092'
KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT'
KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
KAFKA_LOG_DIRS: '/tmp/kraft-combined-logs'
# KAFKA_METRIC_REPORTERS: 'io.confluent.metrics.reporter.ConfluentMetricsReporter'
# KAFKA_CONFLUENT_METRIC_REPORTER_BOOTSTRAP_SERVER: 'broker1:9092,controller:9092'
CLUSTER_ID: 'MkU3OEVBNTcwNTJENDM2Qk'
controller:
image: confluentinc/cp-kafka:7.4.0
hostname: controller
container_name: controller
ports:
- "9093:9093"
- "9102:9102"
environment:
KAFKA_NODE_ID: 2
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_JMX_PORT: 9102
KAFKA_JMX_HOSTNAME: localhost
KAFKA_PROCESS_ROLES: 'controller'
KAFKA_CONTROLLER_QUORUM_VOTERS: '2@controller:9093'
KAFKA_LISTENERS: 'CONTROLLER://controller:9093'
KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT'
KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
KAFKA_LOG_DIRS: '/tmp/kraft-controller-logs'
# KAFKA_METRIC_REPORTERS: 'io.confluent.metrics.reporter.ConfluentMetricsReporter'
# KAFKA_CONFLUENT_METRIC_REPORTER_BOOTSTRAP_SERVER: 'broker1:9092,controller:9092'
# Replace CLUSTER_ID with a unique base64 UUID using "bin/kafka-storage.sh random-uuid"
# See https://docs.confluent.io/kafka/operations-tools/kafka-tools.html#kafka-storage-sh
CLUSTER_ID: 'MkU3OEVBNTcwNTJENDM2Qk'
My nest config:
{
transport: Transport.KAFKA,
options: {
client: {
brokers: ['localhost:9092'],
},
consumer: {
groupId: 'nest-consumer-group',
},
}
Now the solution was to use kafka version with zookeeper:
zookeeper:
image: confluentinc/cp-zookeeper:7.4.0
hostname: zookeeper
container_name: zookeeper
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
broker:
image: confluentinc/cp-server:7.4.0
hostname: broker
container_name: broker
depends_on:
- zookeeper
ports:
- "9092:9092"
- "9101:9101"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 1
KAFKA_CONFLUENT_BALANCER_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_JMX_PORT: 9101
KAFKA_JMX_HOSTNAME: localhost
KAFKA_CONFLUENT_SCHEMA_REGISTRY_URL: http://schema-registry:8081
CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: broker:29092
CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
CONFLUENT_METRICS_ENABLE: 'true'
CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous'
This is not a complete solution, but it is some progress at least
For me , set waitForLeaders = false
solve the problem;
because the error message is :
key: 3, version: 5
according to Kafka document , key 3 means it is Metadata API (Key: 3)
. So that is properly Kafkajs send some update operation(such as create topic) to Kafka node A then get the metadata from Kfaka node B. But data was not sync to B at that time ,so error raise.
that maybe someting related to Kafka sync mechanism which can be config. It seems that this just happen in Kafka which use Kraft. I met this problem in Kafka 3.5 with kraft, but it is OK in Kafka 2.8
Any fix for this ? We are facing the same issue. Please let me know if someone fixed this @youth7
@mithiridi
No solution but just a ugly workaround. Currently I have to retry in our code while This server does not host this topic-partition
error happen. That means I need to add lots try catch
code to handle that error. I expect Kafka is Linearizability, but it seems not. It is ridiculous that I use createTopic()
to create topic A, then I use listTopic
to check whether A is created, event listTopic()
said that A is existed , it can still be error when I use subscribe()
to get message from A later.
any update, i am using kafka upstash and nestjs
having the same issue, running NestJS with confluent Kafka, it works if I run it in a sample file but has issues when running on NestJS.
were you able to solve it? i have the same problem it works on sample but not on nestjs
Update from my side: my issue was that we had the same broker for prod and dev, and the app instances were using the same group id (e.g. app dev instance connecting to
dev_
topics, app prod instance connecting toprod_
topics). It all started to working fine when we added the env to group id. Hope it helps someone.
First of all, thank you for sharing.
I've changed my Kafka variables with your solution as adding stage
(environment dev, prod etc.) suffix end of the topic.
export const KafkaVariables = {
brokers: [process.env.KAFKA_BROKER],
clientId: `broker_${process.env.STAGE}`,
groupId: `broker_${process.env.STAGE}`,
globalTopic: `broker_${process.env.STAGE}`,
producerConnectionRetryTimeAsMS: 10_000,
consumerConnectionRetryTimeAsMS: 10_000
};
and it worked to me.
Describe the bug I receive lots of error messages like the following:
However, a check of the topic metadata (using this topic as an example), speaks to the contrary:
This happens across multiple topics.
Code
Expected behavior Messages get sent and acknowledged by at least the topic-partition leader, unless an error occurs.
Observed behavior KafkaJS producer throws above error claiming
This server does not host this topic-partition
, when it obviously does. It's possible there's another issue but the logic throws this error instead.Environment:
Additional context Any pointers on what might be wrong with my code, or wrong with the library would be helpful.