morganstanley / modern-cpp-kafka

A C++ API for Kafka clients (i.e. KafkaProducer, KafkaConsumer, AdminClient)
Apache License 2.0
331 stars 86 forks source link

oAuth callback issue #231

Open samkhopkar opened 2 months ago

samkhopkar commented 2 months ago

I have a simple barebones oAuth callback registered as follows

  _props.put<const KAFKA_API::clients::OauthbearerTokenRefreshCallback> 
                 (KAFKA_API::clients::Config::OAUTHBEARER_TOKEN_REFRESH_CB, 
                [](const std::string& oauthbearerConfig) 
                {
                    KAFKA_API::clients::SaslOauthbearerToken token_info;
                    return token_info;
                } );

Yet when the callback is invoked it gets an exception. I added some debug lines in KafkaClient.h to log the details

oauthbearerTokenRefreshCallback called calling onOauthbearerTokenRefresh() oauthbearerTokenRefreshCallback failed [basic_string::_M_construct null not valid]

What am I doing wrong in registering this simple oAuth callback ?

Here is the code from KafkaClient.h (library header) where the callback is being invoked

inline void
KafkaClient::oauthbearerTokenRefreshCallback(rd_kafka_t* rk, const char* oauthbearerConfig, void* /* opaque */)
{
    std::cerr << "oauthbearerTokenRefreshCallback called" << std::endl;

    SaslOauthbearerToken oauthbearerToken;

    try
    {
        std::cerr << "calling onOauthbearerTokenRefresh()" << std::endl;
        oauthbearerToken = kafkaClient(rk).onOauthbearerTokenRefresh(oauthbearerConfig);
        std::cerr << "oauthbearerTokenRefreshCallback succeeded" << std::endl;
    }
    catch (const std::exception& e)
    {
        rd_kafka_oauthbearer_set_token_failure(rk, e.what());
        std::cerr << "oauthbearerTokenRefreshCallback failed [" << e.what() << "]" << std::endl;
    }

    LogBuffer<LOG_BUFFER_SIZE> errInfo;

    std::vector<const char*> extensions;
    extensions.reserve(oauthbearerToken.extensions.size() * 2);
    for (const auto& kv: oauthbearerToken.extensions)
    {
        extensions.push_back(kv.first.c_str());
        extensions.push_back(kv.second.c_str());
    }

    if (rd_kafka_oauthbearer_set_token(rk,
                                       oauthbearerToken.value.c_str(),
                                       oauthbearerToken.mdLifetime.count(),
                                       oauthbearerToken.mdPrincipalName.c_str(),
                                       extensions.data(), extensions.size(),
                                       errInfo.str(), errInfo.capacity()) != RD_KAFKA_RESP_ERR_NO_ERROR)
    {
        rd_kafka_oauthbearer_set_token_failure(rk, errInfo.c_str());
        //std::cerr << "oauthbearerTokenRefreshCallback set token failed [" << errInfo.str() << "]" << std::endl;
    }
}
rockwood-openai commented 4 weeks ago

You need to set the values of the token in the callback

samkhopkar commented 4 weeks ago

Thanks for getting back. We got it working fine.

On Thu, Jun 6, 2024 at 10:50 AM Tyler Rockwood @.***> wrote:

You need to set the values of the token in the callback

— Reply to this email directly, view it on GitHub https://github.com/morganstanley/modern-cpp-kafka/issues/231#issuecomment-2152864969, or unsubscribe https://github.com/notifications/unsubscribe-auth/AR6KJDAHQVUG4PRMQ6MPYWLZGCANZAVCNFSM6AAAAABF7NCN72VHI2DSMVQWIX3LMV43OSLTON2WKQ3PNVWWK3TUHMZDCNJSHA3DIOJWHE . You are receiving this because you authored the thread.Message ID: @.***>