Closed demetris-manikas closed 6 months ago
This looks like a Kafka configuration issue. Subscribing to a nonexistent topic is supposed to fail in Kafka by default. However, like you said, you should be able to change that default behavior by enabling topic auto-creation. Can you make sure that the variable is set correctly? Depending on what Kafka Docker image you're using, it may have a different name.
I 'll try some more then and report back. Thanks
Well no luck...
dbos-app | 2024-05-07 20:05:40 [info]: DBOS Server is running at http://localhost:3000
dbos-app | 2024-05-07 20:05:40 [info]: DBOS Admin Server is running at http://localhost:3001
broker | [2024-05-07 20:05:40,116] INFO Sent auto-creation request for Set(test-topic) to the active controller. (kafka.server.DefaultAutoTopicCreationManager)
dbos-app | {"level":"ERROR","timestamp":"2024-05-07T20:05:40.119Z","logger":"kafkajs","message":"[Connection] Response Metadata(key: 3, version: 6)","broker":"broker:9092","clientId":"dbos-app","error":"This server does not host this topic-partition","correlationId":1,"size":101}
dbos-app | 2024-05-07 20:05:40 [error]: This server does not host this topic-partition
dbos-app | at createErrorFromCode (/home/node/app/node_modules/kafkajs/src/protocol/error.js:581:10)
dbos-app | at Object.parse (/home/node/app/node_modules/kafkajs/src/protocol/requests/metadata/v0/response.js:55:11)
dbos-app | at Connection.send (/home/node/app/node_modules/kafkajs/src/network/connection.js:433:35)
dbos-app | at process.processTicksAndRejections (node:internal/process/task_queues:95:5)
dbos-app | at async [private:Broker:sendRequest] (/home/node/app/node_modules/kafkajs/src/broker/index.js:904:14)
dbos-app | at async Broker.metadata (/home/node/app/node_modules/kafkajs/src/broker/index.js:177:12)
dbos-app | at async /home/node/app/node_modules/kafkajs/src/cluster/brokerPool.js:158:25
dbos-app | at async /home/node/app/node_modules/kafkajs/src/cluster/index.js:111:14
dbos-app | at async Cluster.refreshMetadata (/home/node/app/node_modules/kafkajs/src/cluster/index.js:172:5)
dbos-app | at async Cluster.addMultipleTargetTopics (/home/node/app/node_modules/kafkajs/src/cluster/index.js:230:11)
dbos-app | [nodemon] app crashed
broker | [2024-05-07 20:05:40,149] INFO [ReplicaFetcherManager on broker 1] Removed fetcher for partitions Set(test-topic-0) (kafka.server.ReplicaFetcherManager)
broker | [2024-05-07 20:05:40,156] INFO [LogLoader partition=test-topic-0, dir=/tmp/kraft-combined-logs] Loading producer state till offset 0 with message format version 2 (kafka.log.UnifiedLog$)
broker | [2024-05-07 20:05:40,158] INFO Created log for partition test-topic-0 in /tmp/kraft-combined-logs/test-topic-0 with properties {} (kafka.log.LogManager)
broker | [2024-05-07 20:05:40,159] INFO [Partition test-topic-0 broker=1] No checkpointed highwatermark is found for partition test-topic-0 (kafka.cluster.Partition)
broker | [2024-05-07 20:05:40,160] INFO [Partition test-topic-0 broker=1] Log loaded for partition test-topic-0 with initial high watermark 0 (kafka.cluster.Partition)
As you can see from the logs the request is made but asynchronously.
Anyway as a workaround I solved it by setting a restart:unless-stopped
to my app container and changing nodemon
to nodemon --exitcrash
so at least I can have it running .
If you could provide me a working combination of kafka image and config (like the one you use to test) I will be able to investigate this further. Otherwise go ahead and close this. :)
Thanks for your time
Here's the docker-compose.yml
that I use for testing. I can't get it to reproduce the problem, which makes me suspect it's a Kafka configuration issue.
version: "3.7"
services:
zookeeper:
image: confluentinc/cp-zookeeper:latest
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ports:
- 2181:2181
kafka:
image: confluentinc/cp-kafka:latest
depends_on:
- zookeeper
ports:
- 9092:9092
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
KAFKA_LISTENERS: PLAINTEXT://:9092
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
Here's a tiny app I use for testing:
import { KafkaConfig, KafkaMessage} from "kafkajs";
import { Workflow, WorkflowContext, Kafka, KafkaConsume } from '@dbos-inc/dbos-sdk';
const kafkaConfig: KafkaConfig = {
brokers: [`localhost:9092`]
}
@Kafka(kafkaConfig)
export class KafkaExample {
@KafkaConsume("dbos-topic")
@Workflow()
static async kafkaWorkflow(ctxt: WorkflowContext, topic: string, partition: number, message: KafkaMessage) {
ctxt.logger.info(`Message received: ${message.value?.toString()}`)
}
}
Thanks. I had an app already. Everything runs smoothly with your setup. Thanks a lot for sharing.
Just for reference here is my config that produced the error mentioned.
broker:
image: apache/kafka:latest
container_name: broker
user: root
ports:
- '9092:9092'
environment:
KAFKA_NODE_ID: 1
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT'
KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT_HOST://broker:9092,PLAINTEXT://broker:19092'
KAFKA_PROCESS_ROLES: 'broker,controller'
KAFKA_CONTROLLER_QUORUM_VOTERS: '1@broker:29093'
KAFKA_LISTENERS: 'CONTROLLER://:29093,PLAINTEXT_HOST://:9092,PLAINTEXT://:19092'
KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT'
KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
CLUSTER_ID: 'd86da882-0c82-11ef-8bb9-a757193d3343'
KAFKA_AUTO_CREATE_TOPICS_ENABLE: true
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_LOG_DIRS: '/tmp/kraft-combined-logs'
volumes:
- kafka_data:/tmp/kraft-combined-logs
healthcheck:
test: ["CMD-SHELL", "/opt/kafka/bin/kafka-cluster.sh cluster-id --bootstrap-server localhost:9092 || exit 1"]
interval: 2s
timeout: 12s
retries: 4
start_period: 2s
I tried the latest version and probably it all starts there. I will live with the zookeeper for a while...
After some more digging this is what I found. Auto creation of topics refers to creating a topic when a producer publishes to a non-existent topic. In general it is considered a bad practice since there is not enough control on the number of partitions/replicas. (the automatically created ones get the brokers default settings, so no granularity there) . The good practice being to set AUTO_CREATE_TOPICS to 'false' and create the topics declaratively.
I believe that a better approach is to let the developer declare the topics in some config and the app on init, before starting any consumers, should check the existence of the declared topics and create the missing ones.
This can be easily achieved using kafka.admin.listTopics();
and kafka.admin().createTopics()
;
After setting AUTO_CREATE_TOPICS to false the app keeps crashing in a loop. Which means that if broker has this setting a single typo (in the name of the topic) will crash the app over and over.
That's a good point! We can automatically create all decorated topics during startup if they don't already exist.
If it is going to be implemented this way (reading from the decorators instead of a config) all the topic creation parameters should be available at the decorator level (maybe they are there and I missed them).
As you have probably figured out already I have some time to spare these days so if you wish I could create a PR on this.
I have to say that I would prefer the file based configuration solution. This way one can have a clear view of the whole configuration in one place instead of it being scattered in random files. And maybe some linting rule could error when the name in the decorator is not listed in the config (nice to have but not really important).
On second thought, I want to hold off on this for now. I think that:
DBOS works excellently as a Kafka consumer, but right now I don't think we should do Kafka cluster management.
Great point there. I was thinking about it too.
I just found out that there is an open PR at kafkajs that when merged will fix the issue that started this whole conversation. I tested it locally and seems to do the job. Wouldn't have bothered you had I found about it earlier. Sorry.
Nice find!
Actually is more important tulios / kafkajs / issues / 1603 . Seems that kafkajs will not be getting any updates soon
docker-compose.txt As requested. Github does not support .yaml (that's the .txt extension for) Rename to .yaml; docker compose up -d; In dbos-transact; npm run build; npx jest kafka.test;
You should see this. KafkaJSProtocolError: This server does not host this topic-partition
Workaround for this particular issue until KafkaJS solves it: https://github.com/dbos-inc/dbos-transact/pull/434
Addressed in https://github.com/dbos-inc/dbos-transact/pull/434 until KafkaJS fixes the bug
After adding the kafka integration to dbos-docker-boilerplate
When the app starts with newly created volumes (
docker compose down -v; docker compose up
) It encounters the following errorDebugging showed that the error is thrown at kafka.ts#L88 the first time it tries to subscribe and the app exits runtime.ts#L54
Running
docker compose down; docker compose up
works fine after the first time.Setting the kafka param AUTO_CREATE_TOPIC=true did not change anything.
What did was adding this hack(?)
before initiating the consumer in
kafka.ts
I am no expert configuring kafka so it might just be a misconfiguration on my part.