silviucpp / erlkaf

Erlang kafka driver based on librdkafka
MIT License
84 stars 41 forks source link

Error bad arg calling erlkaf_nif:consumer_new #64

Closed tpitale closed 11 months ago

tpitale commented 11 months ago

Inside of erlkaf_consumer_group init there is a call to erlkaf_nif:consumer_new(GroupId, TopicsNames, RdkClientConfig, RdkTopicConfig).

This appears to be returning an error {error, badarg} up when I call erlkaf:create_consumer_group.

So I printed the args being passed in to consumer_new:

erlang:display(GroupId),
erlang:display(TopicsNames),
erlang:display(RdkClientConfig),
erlang:display(RdkTopicConfig),

And then from a console (I'm using elixir, so iex) I try calling consumer_new with those args (some info redacted):

:erlkaf_nif.consumer_new(
  'lifecycle-consumer-local',
  [<<"lifecycle_events">>],
  [{<<"security.protocol">>,<<"ssl">>},{<<"bootstrap.servers">>,<<"blah-blah-blah.kafka.amazonaws.com:9094">>}],
  [{<<"auto.offset.reset">>,<<"latest">>}]
)

And that call works, returning {ok, Ref} … no badarg 🙃

Any ideas why it would return a badarg once and then work when I call it myself?

tpitale commented 11 months ago

Oh … and the info I am setting here does work to connect when I use something like kcat locally.

silviucpp commented 11 months ago

Hello,

It's hard for me to follow your report. erlkaf_nif is not designed to be used directly. Please look on the code on how do we process the configs passed via the public api. https://github.com/silviucpp/erlkaf/blob/dcb615993ac582f0a61165038acecc586d656687/src/erlkaf_manager.erl#L106

tpitale commented 11 months ago

I'm not calling it directly. I call the public API and it errors with badarg.

I traced it down to try to figure out why it was saying badarg. But when I pass the args directly to the call as a test it does not error.

tpitale commented 11 months ago

From elixir, I call:

topics = [
      {topic, [callback_module: handler, callback_args: [], dispatch_mode: :one_by_one]}
    ]

client_id = :client_consumer_id
group_id = group_name
client_config = [bootstrap_servers: brokers(), security_protocol: :ssl]
topic_config = [auto_offset_reset: :latest]

:erlkaf.create_consumer_group(client_id, group_id, topics, client_config, topic_config)

Calling erlkaf:create_consumer_group as per the README is what fails with a badarg.

tpitale commented 11 months ago

Could you please reopen this as it is not resolved …?

silviucpp commented 11 months ago

Please provide a complete code . I can't know what brokers() return. Also read documentation for each config what type it is because more than sure it's a problem with the values you are passing.

tpitale commented 11 months ago

I'm passing the same information that you can see from the debugging of the call to consumer_new.

Just focus on this: why would calling the public api fail when it works to call the internal function?

silviucpp commented 11 months ago

I really don't understand your report. I told you to check the data types. You also have inside the test folder a test_consumer example.

-define(TOPICS, [
    {<<"benchmark">>, [
        {callback_module, ?MODULE},
        {callback_args, []}, % default [] (you can skip it)
        {dispatch_mode, one_by_one} % default one_by_one (you can skip it)
    ]}
]).

GroupId = <<"erlkaf_consumer">>,

    ClientConfig = [
        {bootstrap_servers, <<"172.17.33.123:9092">>},
        {security_protocol, ssl}
    ],

    TopicConf = [
        {auto_offset_reset, latest}
    ],
    ok = erlkaf:create_consumer_group(client_consumer, GroupId, ?TOPICS, ClientConfig, TopicConf).

Looking to your code (incomplete) I only can see that you use atom for GroupId instead binary and the same for topic name I'm not familiar with elixir.

Silviu

tpitale commented 11 months ago

For anyone who reads this in the future: Not sure how, but erlang:display(GroupId) was showing an atom as a bitstring. 🤷🏼 So yes, the configuration I was passing for GroupId was not correct, but I couldn't tell. It needs to be a binary.