dbos-inc / dbos-transact

The TypeScript framework for backends that scale
https://docs.dbos.dev
MIT License
335 stars 22 forks source link

App crashes the first time it tries to subscribe to a kafka topic #426

Closed demetris-manikas closed 3 months ago

demetris-manikas commented 3 months ago

After adding the kafka integration to dbos-docker-boilerplate

When the app starts with newly created volumes (docker compose down -v; docker compose up) It encounters the following error

This server does not host this topic-partition 

dbos-app  |     at createErrorFromCode (/home/node/app/node_modules/kafkajs/src/protocol/error.js:581:10)
...
...

Debugging showed that the error is thrown at kafka.ts#L88 the first time it tries to subscribe and the app exits runtime.ts#L54

Running docker compose down; docker compose up works fine after the first time.

Setting the kafka param AUTO_CREATE_TOPIC=true did not change anything.

What did was adding this hack(?)

await kafka.admin().createTopics({
    topics: [
        {topic: ro.kafkaTopic}
    ]);

before initiating the consumer in kafka.ts

I am no expert configuring kafka so it might just be a misconfiguration on my part.

kraftp commented 3 months ago

This looks like a Kafka configuration issue. Subscribing to a nonexistent topic is supposed to fail in Kafka by default. However, like you said, you should be able to change that default behavior by enabling topic auto-creation. Can you make sure that the variable is set correctly? Depending on what Kafka Docker image you're using, it may have a different name.

demetris-manikas commented 3 months ago

I 'll try some more then and report back. Thanks

demetris-manikas commented 3 months ago

Well no luck...

dbos-app  | 2024-05-07 20:05:40 [info]: DBOS Server is running at http://localhost:3000 
dbos-app  | 2024-05-07 20:05:40 [info]: DBOS Admin Server is running at http://localhost:3001 
broker    | [2024-05-07 20:05:40,116] INFO Sent auto-creation request for Set(test-topic) to the active controller. (kafka.server.DefaultAutoTopicCreationManager)
dbos-app  | {"level":"ERROR","timestamp":"2024-05-07T20:05:40.119Z","logger":"kafkajs","message":"[Connection] Response Metadata(key: 3, version: 6)","broker":"broker:9092","clientId":"dbos-app","error":"This server does not host this topic-partition","correlationId":1,"size":101}
dbos-app  | 2024-05-07 20:05:40 [error]: This server does not host this topic-partition 
dbos-app  |     at createErrorFromCode (/home/node/app/node_modules/kafkajs/src/protocol/error.js:581:10)
dbos-app  |     at Object.parse (/home/node/app/node_modules/kafkajs/src/protocol/requests/metadata/v0/response.js:55:11)
dbos-app  |     at Connection.send (/home/node/app/node_modules/kafkajs/src/network/connection.js:433:35)
dbos-app  |     at process.processTicksAndRejections (node:internal/process/task_queues:95:5)
dbos-app  |     at async [private:Broker:sendRequest] (/home/node/app/node_modules/kafkajs/src/broker/index.js:904:14)
dbos-app  |     at async Broker.metadata (/home/node/app/node_modules/kafkajs/src/broker/index.js:177:12)
dbos-app  |     at async /home/node/app/node_modules/kafkajs/src/cluster/brokerPool.js:158:25
dbos-app  |     at async /home/node/app/node_modules/kafkajs/src/cluster/index.js:111:14
dbos-app  |     at async Cluster.refreshMetadata (/home/node/app/node_modules/kafkajs/src/cluster/index.js:172:5)
dbos-app  |     at async Cluster.addMultipleTargetTopics (/home/node/app/node_modules/kafkajs/src/cluster/index.js:230:11)
dbos-app  | [nodemon] app crashed
broker    | [2024-05-07 20:05:40,149] INFO [ReplicaFetcherManager on broker 1] Removed fetcher for partitions Set(test-topic-0) (kafka.server.ReplicaFetcherManager)
broker    | [2024-05-07 20:05:40,156] INFO [LogLoader partition=test-topic-0, dir=/tmp/kraft-combined-logs] Loading producer state till offset 0 with message format version 2 (kafka.log.UnifiedLog$)
broker    | [2024-05-07 20:05:40,158] INFO Created log for partition test-topic-0 in /tmp/kraft-combined-logs/test-topic-0 with properties {} (kafka.log.LogManager)
broker    | [2024-05-07 20:05:40,159] INFO [Partition test-topic-0 broker=1] No checkpointed highwatermark is found for partition test-topic-0 (kafka.cluster.Partition)
broker    | [2024-05-07 20:05:40,160] INFO [Partition test-topic-0 broker=1] Log loaded for partition test-topic-0 with initial high watermark 0 (kafka.cluster.Partition)

As you can see from the logs the request is made but asynchronously.

Anyway as a workaround I solved it by setting a restart:unless-stopped to my app container and changing nodemon to nodemon --exitcrash so at least I can have it running .

If you could provide me a working combination of kafka image and config (like the one you use to test) I will be able to investigate this further. Otherwise go ahead and close this. :)

Thanks for your time

kraftp commented 3 months ago

Here's the docker-compose.yml that I use for testing. I can't get it to reproduce the problem, which makes me suspect it's a Kafka configuration issue.

version: "3.7"

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:latest
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    ports:
      - 2181:2181

  kafka:
    image: confluentinc/cp-kafka:latest
    depends_on:
      - zookeeper
    ports:
      - 9092:9092
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
      KAFKA_LISTENERS: PLAINTEXT://:9092
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092

Here's a tiny app I use for testing:

import { KafkaConfig, KafkaMessage} from "kafkajs";
import { Workflow, WorkflowContext, Kafka, KafkaConsume } from '@dbos-inc/dbos-sdk';

const kafkaConfig: KafkaConfig = {
    brokers: [`localhost:9092`]
}

@Kafka(kafkaConfig)
export class KafkaExample {

  @KafkaConsume("dbos-topic")
  @Workflow()
  static async kafkaWorkflow(ctxt: WorkflowContext, topic: string, partition: number, message: KafkaMessage) {
    ctxt.logger.info(`Message received: ${message.value?.toString()}`)
  }
}
demetris-manikas commented 3 months ago

Thanks. I had an app already. Everything runs smoothly with your setup. Thanks a lot for sharing.

Just for reference here is my config that produced the error mentioned.

  broker:
    image: apache/kafka:latest
    container_name: broker
    user: root
    ports:
      - '9092:9092'
    environment:
      KAFKA_NODE_ID: 1
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT'
      KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT_HOST://broker:9092,PLAINTEXT://broker:19092'
      KAFKA_PROCESS_ROLES: 'broker,controller'
      KAFKA_CONTROLLER_QUORUM_VOTERS: '1@broker:29093'
      KAFKA_LISTENERS: 'CONTROLLER://:29093,PLAINTEXT_HOST://:9092,PLAINTEXT://:19092'
      KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT'
      KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
      CLUSTER_ID: 'd86da882-0c82-11ef-8bb9-a757193d3343'
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: true
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_LOG_DIRS: '/tmp/kraft-combined-logs'
    volumes:
      - kafka_data:/tmp/kraft-combined-logs
    healthcheck:
      test: ["CMD-SHELL", "/opt/kafka/bin/kafka-cluster.sh cluster-id --bootstrap-server localhost:9092 || exit 1"]
      interval: 2s
      timeout: 12s
      retries: 4
      start_period: 2s

I tried the latest version and probably it all starts there. I will live with the zookeeper for a while...

demetris-manikas commented 3 months ago

After some more digging this is what I found. Auto creation of topics refers to creating a topic when a producer publishes to a non-existent topic. In general it is considered a bad practice since there is not enough control on the number of partitions/replicas. (the automatically created ones get the brokers default settings, so no granularity there) . The good practice being to set AUTO_CREATE_TOPICS to 'false' and create the topics declaratively.

I believe that a better approach is to let the developer declare the topics in some config and the app on init, before starting any consumers, should check the existence of the declared topics and create the missing ones.

This can be easily achieved using kafka.admin.listTopics(); and kafka.admin().createTopics();

demetris-manikas commented 3 months ago

After setting AUTO_CREATE_TOPICS to false the app keeps crashing in a loop. Which means that if broker has this setting a single typo (in the name of the topic) will crash the app over and over.

kraftp commented 3 months ago

That's a good point! We can automatically create all decorated topics during startup if they don't already exist.

demetris-manikas commented 3 months ago

If it is going to be implemented this way (reading from the decorators instead of a config) all the topic creation parameters should be available at the decorator level (maybe they are there and I missed them).

As you have probably figured out already I have some time to spare these days so if you wish I could create a PR on this.

demetris-manikas commented 3 months ago

I have to say that I would prefer the file based configuration solution. This way one can have a clear view of the whole configuration in one place instead of it being scattered in random files. And maybe some linting rule could error when the name in the decorator is not listed in the config (nice to have but not really important).

kraftp commented 3 months ago

On second thought, I want to hold off on this for now. I think that:

DBOS works excellently as a Kafka consumer, but right now I don't think we should do Kafka cluster management.

demetris-manikas commented 3 months ago

Great point there. I was thinking about it too.

demetris-manikas commented 3 months ago

I just found out that there is an open PR at kafkajs that when merged will fix the issue that started this whole conversation. I tested it locally and seems to do the job. Wouldn't have bothered you had I found about it earlier. Sorry.

kraftp commented 3 months ago

Nice find!

demetris-manikas commented 3 months ago

Actually is more important tulios / kafkajs / issues / 1603 . Seems that kafkajs will not be getting any updates soon

demetris-manikas commented 3 months ago

docker-compose.txt As requested. Github does not support .yaml (that's the .txt extension for) Rename to .yaml; docker compose up -d; In dbos-transact; npm run build; npx jest kafka.test;

You should see this. KafkaJSProtocolError: This server does not host this topic-partition

kraftp commented 3 months ago

Workaround for this particular issue until KafkaJS solves it: https://github.com/dbos-inc/dbos-transact/pull/434

kraftp commented 3 months ago

Addressed in https://github.com/dbos-inc/dbos-transact/pull/434 until KafkaJS fixes the bug