milvus-io / milvus

A cloud-native vector database, storage for next generation AI applications
https://milvus.io
Apache License 2.0
30.43k stars 2.92k forks source link

[Bug]: Kafka topics should be explicitly created with externally configured kafka #17981

Closed mohitreddy1996 closed 1 year ago

mohitreddy1996 commented 2 years ago

Is there an existing issue for this?

Environment

- Milvus version: milvusdb/milvus-dev tag - 2.1.0
- Deployment mode(standalone or cluster): cluster
- SDK version(e.g. pymilvus v2.0.0rc2): go v2
- OS(Ubuntu or CentOS): ubuntu
- CPU/Memory: xx
- GPU: No
- Others: 

External kafka enabled. We are using managed kafka cluster through Confluent where we use a "Basic" or "Standard" cluster [1].

[1]

Current Behavior

With external kafka configured, Topics required for inter-server communication are not created, but are delegated to the kafka consumers which are configured with allow.auto.create.topics - https://github.com/milvus-io/milvus/blob/master/internal/mq/msgstream/mqwrapper/kafka/kafka_client.go#L121

Expected Behavior

With confluent managed kafka clusters, configuring behavior is currently not supported for the "Basic" and "Standard" clusters [1]. Also it seems that configuring this, may not result in intended behavior, see - https://github.com/confluentinc/confluent-kafka-go/issues/615

It would be great if these topics are created as part of the producer (this operation possibly could be made idempotent?) creation.

[1] https://docs.confluent.io/cloud/current/clusters/broker-config.html#ccloud-cluster-and-topic-configuration-settings

Steps To Reproduce

1. Setting up milvus cluster mode with the following changes:

"image": {
  "all": {
   "repository": "milvusdb/milvus-dev",
   "tag": "2.1.0-latest"
  }
},

"externalKafka": {
 "enabled": true,
 "brokerList": "brokerlist",
 "securityProtocol": "SASL_SSL",
 "sasl":
  "mechanisms": "PLAIN",
  "username": "username",
  "password": "password",
}
  1. Deploying milvus on a kubernetes cluster
  2. Using go sdk to create a collection using CreateCollection method. This fails with:
    CreateCollection failed: send dd create collection req failed, error = Broker: Unknown topic or partition
  3. In the logs of datacoord service/pod, we see:
    [2022/06/30 23:58:54.152 +00:00] [ERROR] [kafka_consumer.go:143] ["consume msg failed"] [topic=by-dev-datacoord-timetick-channel] [groupID=by-dev-dataCoord] [error="by-dev-datacoord-timetick-channel [0]: topic does not exist (Broker: Unknown topic or partition)"] [stack="github.com/milvus-io/milvus/internal/mq/msgstream/mqwrapper/kafka.(*Consumer).Chan.func1.1\n\t/go/src/github.com/milvus-io/milvus/internal/mq/msgstream/mqwrapper/kafka/kafka_consumer.go:143"]

Milvus Log

No response

Anything else?

No response

yanliang567 commented 2 years ago

/assign @jaime0815 /unassign

mohitreddy1996 commented 2 years ago

Hi, any updates here?

This blocks us from productionizing Milvus on our Kubernetes cluster with existing Kafka setup. Anything we could help with here?

yanliang567 commented 2 years ago

@jaime0815 any updates?

xiaofan-luan commented 2 years ago

Anyone interested to implement on it?

stale[bot] commented 2 years ago

This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions. Rotten issues close after 30d of inactivity. Reopen the issue with /reopen.

BorisPolonsky commented 2 years ago

I've installed milvus v2.1.1 via helm chart(v3.1.9) with external kafka configured and I'm facing the same Issue. Here's what I got upon creating collection with pymilvus. Here's what I got in logs of datacoord

[2022/09/02 07:29:16.281 +00:00] [ERROR] [kafka/kafka_consumer.go:143] ["consume msg failed"] [topic=by-dev-datacoord-timetick-channel] [groupID=by-dev-dataCoord] [error="by-dev-datacoord-timetick-channel [0]: topic does not exist (Broker: Unknown topic or partition)"] [stack="github.com/milvus-io/milvus/internal/mq/msgstream/mqwrapper/kafka.(*Consumer).Chan.func1.1\n\t/go/src/github.com/milvus-io/milvus/internal/mq/msgstream/mqwrapper/kafka/kafka_consumer.go:143"]
[2022/09/02 07:29:17.281 +00:00] [ERROR] [kafka/kafka_consumer.go:143] ["consume msg failed"] [topic=by-dev-datacoord-timetick-channel] [groupID=by-dev-dataCoord] [error="by-dev-datacoord-timetick-channel [0]: topic does not exist (Broker: Unknown topic or partition)"] [stack="github.com/milvus-io/milvus/internal/mq/msgstream/mqwrapper/kafka.(*Consumer).Chan.func1.1\n\t/go/src/github.com/milvus-io/milvus/internal/mq/msgstream/mqwrapper/kafka/kafka_consumer.go:143"]
[2022/09/02 07:29:18.358 +00:00] [ERROR] [kafka/kafka_consumer.go:143] ["consume msg failed"] [topic=by-dev-datacoord-timetick-channel] [groupID=by-dev-dataCoord] [error="by-dev-datacoord-timetick-channel [0]: topic does not exist (Broker: Unknown topic or partition)"] [stack="github.com/milvus-io/milvus/internal/mq/msgstream/mqwrapper/kafka.(*Consumer).Chan.func1.1\n\t/go/src/github.com/milvus-io/milvus/internal/mq/msgstream/mqwrapper/kafka/kafka_consumer.go:143"]
[2022/09/02 07:29:19.287 +00:00] [ERROR] [kafka/kafka_consumer.go:143] ["consume msg failed"] [topic=by-dev-datacoord-timetick-channel] [groupID=by-dev-dataCoord] [error="by-dev-datacoord-timetick-channel [0]: topic does not exist (Broker: Unknown topic or partition)"] [stack="github.com/milvus-io/milvus/internal/mq/msgstream/mqwrapper/kafka.(*Consumer).Chan.func1.1\n\t/go/src/github.com/milvus-io/milvus/internal/mq/msgstream/mqwrapper/kafka/kafka_consumer.go:143"]

Any solutions?

BorisPolonsky commented 2 years ago

Anyone interested to implement on it?

Could you elaborate and tell us how we should know each of the topic needed so that we could create them manually in external kafka deployment? https://github.com/milvus-io/milvus/discussions/17110

mausch commented 2 years ago

Having the same issue here. Please reopen this.

mausch commented 2 years ago

I guess this is roughly where one would create the topic? https://github.com/milvus-io/milvus/blob/7406cd3393377c786776e6d39ba2155c51c51605/internal/mq/msgstream/mq_msgstream.go#L106

yanliang567 commented 2 years ago

@xiaofan-luan any plan for this

BorisPolonsky commented 2 years ago

Having the same issue here. Please reopen this.

I second this. External kafka is currently not usable yet it's an already implemented feature in Milvus v2.1.1.

stale[bot] commented 2 years ago

This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions. Rotten issues close after 30d of inactivity. Reopen the issue with /reopen.

BorisPolonsky commented 2 years ago

/reopen

LoveEachDay commented 2 years ago

We have some requirements for kafka broker settings:

1. enable topic create
2. set default topic partition number to 1
3. increase the `message.max.bytes=10485760` ,`replica.fetch.max.bytes=10485760`

@mohitreddy1996 We've used confluent kafka, you should manually change the broker config.

stale[bot] commented 2 years ago

This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions. Rotten issues close after 30d of inactivity. Reopen the issue with /reopen.

mausch commented 1 year ago

Still relevant, please reopen.

pascalwhoop commented 1 year ago

agreed, this is relevant

PetrKodyrenkov commented 1 year ago

Facing same issue, topics are not getting created, and that makes externalKafka option useless at least until someone will publish list of topics Milvus needs to start

xiaofan-luan commented 1 year ago

/assign @smellthemoon

big-thousand commented 1 year ago

any update?

stale[bot] commented 1 year ago

This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions. Rotten issues close after 30d of inactivity. Reopen the issue with /reopen.

xiaofan-luan commented 1 year ago

/assign @smellthemoon how it this feature going?

xiaofan-luan commented 1 year ago

keep it, this is a very important issue

stale[bot] commented 1 year ago

This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions. Rotten issues close after 30d of inactivity. Reopen the issue with /reopen.

xiaofan-luan commented 1 year ago

keep it for now

freewine commented 1 year ago

I encountered the same issue when using AWS managed Kafka (MSK) as external Kafka. By changing MSK's property auto.create.topics.enable from false to true, the issue can be resolved. See - https://docs.aws.amazon.com/msk/latest/developerguide/msk-configuration-properties.html

auto.create.topics.enable=true

xiaofan-luan commented 1 year ago

I encountered the same issue when using AWS managed Kafka (MSK) as external Kafka. By changing MSK's property auto.create.topics.enable from false to true, the issue can be resolved. See - https://docs.aws.amazon.com/msk/latest/developerguide/msk-configuration-properties.html

auto.create.topics.enable=true

Good to know. We should be able to specify user channels for now. @smellthemoon is there any document for it?

stale[bot] commented 1 year ago

This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions. Rotten issues close after 30d of inactivity. Reopen the issue with /reopen.