Closed iamysx closed 2 months ago
当kafka的server.properties中的auto.create.topics.enable设置为false时,通过以下代码创建topic,在创建的时候并未报错,但在产生kafka通知发送时的时候报”Failed to produce message:Local: Unknown topic“错误,帮忙看一下是否需要一些其它设置才可以,谢谢
void kafka_set_config(RdKafka::Conf *&conf, std::string &errstr, std::string &brokers, const char *autocommit = "false") { if (conf->set("metadata.broker.list", brokers, errstr) != RdKafka::Conf::CONF_OK) { g_kafka_failed = true; } if (conf->set("security.protocol", "sasl_plaintext", errstr) != RdKafka::Conf::CONF_OK) { g_kafka_failed = true; } if (conf->set("sasl.mechanisms", "PLAIN", errstr) != RdKafka::Conf::CONF_OK) { g_kafka_failed = true; } if (conf->set("sasl.username", g_config.kafka_username, errstr) != RdKafka::Conf::CONF_OK) { g_kafka_failed = true; } if (conf->set("sasl.password", g_config.kafka_password, errstr) != RdKafka::Conf::CONF_OK) { g_kafka_failed = true; } if (conf->set("enable.auto.commit", autocommit, errstr) != RdKafka::Conf::CONF_OK) { g_kafka_failed = true; } } void kafka_produce() { std::string brokers = "1.1.1.1:9092"; std::string errstr; std::string topic_str; std::string topic = "xxx_topic"; RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL); if (conf->set("group.id", "xxx_group", errstr) != RdKafka::Conf::CONF_OK) { g_kafka_failed = true; } kafka_set_config(conf, errstr, brokers, "true"); /* * Create produce using accumulated global configuration. */ RdKafka::Producer *producer = RdKafka::Producer::create(conf, errstr); if (!producer) { log_info("Failed to create Kafka producer: %s \n", errstr.c_str()); g_kafka_failed = true; } // 创建Topic配置 RdKafka::Conf *topic_conf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC); topic_conf->set("message.timeout.ms", "5000", errstr); if (nullptr == topic_conf) { log_erro("Failed to create topic_conf,\n"); return; } RdKafka::Topic *topic_ret = RdKafka::Topic::create(producer, topic, topic_conf, errstr); log_info("create topic %s \n", topic.c_str()); if (nullptr == topic_ret) { log_erro("Failed to create topic: %s", errstr.c_str()); } /* * produce messages */ while (g_kafka_failed == false) { int partition = RdKafka::Topic::PARTITION_UA; log_debug("producer : topic %s ,payload %s \n", msg.topic.c_str(), msg.payload.c_str()); RdKafka::ErrorCode resp = producer->produce(msg.topic, partition, RdKafka::Producer::RK_MSG_COPY /* Copy payload */, const_cast<char *>(msg.payload.c_str()), msg.payload.size(), NULL, 0, 0, NULL, NULL); if (resp != RdKafka::ERR_NO_ERROR) { log_erro("Failed to produce message:%s ", RdKafka::err2str(resp).c_str()); } } } return; }
你这个不是Rdkafka吗?
用workflow的kafka client,可能也是一样的问题,创建操作不是立即完成。
当kafka的server.properties中的auto.create.topics.enable设置为false时,通过以下代码创建topic,在创建的时候并未报错,但在产生kafka通知发送时的时候报”Failed to produce message:Local: Unknown topic“错误,帮忙看一下是否需要一些其它设置才可以,谢谢