Open leiwang008 opened 3 months ago
Get the log-call-back and set it to local field so that it can be used by the rd_kafka. We should also probably use the string constant instead of hardcoded string :-).
https://github.com/morganstanley/modern-cpp-kafka/blob/a146d10bcf166f55299c7a55728abaaea52cb0e5/include/kafka/KafkaClient.h#L383
// Log Callback if (properties.contains(Config::LOG_CB)) { setLogCallback(properties.get<LogCallback>(Config::LOG_CB)); rd_kafka_conf_set_log_cb(rk_conf.get(), KafkaClient::logCallback); } // Statistics Callback if (properties.contains(Config::STATS_CB)) { setStatsCallback(properties.get<StatsCallback>(Config::STATS_CB)); rd_kafka_conf_set_stats_cb(rk_conf.get(), KafkaClient::statsCallback); } // Error Callback if (properties.contains(Config::ERROR_CB)) { setErrorCallback(properties.get<ErrorCallback>(Config::ERROR_CB)); rd_kafka_conf_set_error_cb(rk_conf.get(), KafkaClient::errorCallback); } // OAUTHBEARER Toker Refresh Callback if (properties.contains(Config::OAUTHBEARER_TOKEN_REFRESH_CB)) { setOauthbearerTokenRefreshCallback(properties.get<OauthbearerTokenRefreshCallback>(Config::OAUTHBEARER_TOKEN_REFRESH_CB)); rd_kafka_conf_set_oauthbearer_token_refresh_cb(rk_conf.get(), KafkaClient::oauthbearerTokenRefreshCallback); } // Interceptor if (properties.contains(Config::INTERCEPTORS)) { setInterceptors(properties.get<Interceptors>(Config::INTERCEPTORS)); const Error result{ rd_kafka_conf_interceptor_add_on_new(rk_conf.get(), "on_new", KafkaClient::configInterceptorOnNew, nullptr) }; KAFKA_THROW_IF_WITH_ERROR(result); }
Get the log-call-back and set it to local field so that it can be used by the rd_kafka. We should also probably use the string constant instead of hardcoded string :-).
https://github.com/morganstanley/modern-cpp-kafka/blob/a146d10bcf166f55299c7a55728abaaea52cb0e5/include/kafka/KafkaClient.h#L383