silviucpp / erlkaf

Erlang kafka driver based on librdkafka
MIT License
84 stars 41 forks source link

Get metadata from Kafka broker with dirty IO bound nif #19

Closed v-kat closed 3 years ago

v-kat commented 4 years ago
erlkaf:start(),
erlkaf:create_producer(client_producer, [{bootstrap_servers, "localhost:9092"}]),
erlkaf:get_metadata(client_producer).

I've been using this in an elixir project but it seems to work and there doesn't seem to be any leaks. I didn't find any async calls for getting metadata and re-used the producer as the consumer would have a chicken and egg problem in regards to being related to topics that I was using the metadata call and regex to get. A dirty IO bound nif was used as it isn't async with a message back to the call-site like the rest of the project. LMK if you want me to update the readme or anything else.

silviucpp commented 4 years ago

@IRog

  1. stats are returned as proplists and this metadata as map. seems not consistent.
  2. What's the minimum version of erlang vm required by this ? I think dirty IO and map api were added recently
v-kat commented 4 years ago

The apis were added as experimental in OTP 17 and then fully in OTP 18.

silviucpp commented 3 years ago

@v-kat any feedback on this ? If no I will reject the pull request.

v-kat commented 3 years ago

This has been called quite a large number of times repeatably in production with an uptime of months so I believe there's no introduced leaks as well. I understand the minimum version number would be higher. Feel free to do what you want.

silviucpp commented 3 years ago

@v-kat I asked you above why at line 116 :

Why res. This seems returning an atom all the time. Probably you want Res but i think you should return {ok, Res}.

get_metadata(ClientId) ->
    case erlkaf_cache_client:get(ClientId) of
        {ok, ClientRef, _ClientPid} ->
            case erlkaf_nif:get_metadata(ClientRef) of
                {ok, res} ->
                    res;
                Error ->
                    Error
            end;
        undefined ->
            {error, ?ERR_UNDEFINED_CLIENT};
        Error ->
            Error
    end.

100 % All the time it goes to Error -> Error branch.

Silviu

v-kat commented 3 years ago

@silviucpp You're absolutely correct that it should be capitalized. The Elixir -> Erlang switch must have gotten me.

It still mostly works for the wrong reasons 😅.

Eshell V11.1.1  (abort with ^G)
1> erlkaf:start(),                                                                  
1> erlkaf:create_producer(client_producer, [{bootstrap_servers, "localhost:9092"}]),
1> erlkaf:get_metadata(client_producer).                                            
Loading library: "/redacted/erlkaf/_build/default/lib/erlkaf/priv/erlkaf_nif" 
23:53:16.487 [warning] rdkafka#producer-1 CONFWARN [thrd:app]: Configuration property enable.auto.commit is a consumer property and will be ignored by this producer instance
23:53:16.488 [warning] rdkafka#producer-1 CONFWARN [thrd:app]: Configuration property enable.auto.offset.store is a consumer property and will be ignored by this producer instance
23:53:16.488 [warning] rdkafka#producer-1 CONFWARN [thrd:app]: Configuration property enable.partition.eof is a consumer property and will be ignored by this producer instance
23:53:16.487 [info] persistent queue path: "/redacted/erlkaf/_build/default/lib/erlkaf/priv/client_producer"
{ok,#{brokers =>
          [#{host => <<"123.456.7.890">>,id => 0,port => 9092}],
      topics => 
          [#{name => <<"test-topic">>,
             partitions =>
                 [#{id => 0,isrs => [0],leader => 0,replicas => [0]}]},
           #{name => <<"__consumer_offsets">>,
             partitions =>
                 [#{id => 0,isrs => [0],leader => 0,replicas => [0]},
                  #{id => 1,isrs => [0],leader => 0,replicas => [0]},
                  #{id => 2,isrs => [0],leader => 0,replicas => [0]},
                  #{id => 3,isrs => [0],leader => 0,replicas => [0]},
                  #{id => 4,isrs => [0],leader => 0,replicas => [0]},
                  #{id => 5,isrs => [0],leader => 0,replicas => [0]},
                  #{id => 6,isrs => [0],leader => 0,replicas => [0]},
                  #{id => 7,isrs => [0],leader => 0,replicas => [0]},
                  #{id => 8,isrs => [0],leader => 0,replicas => [0]},
                  #{id => 9,isrs => [0],leader => 0,replicas => [0]},
                  #{id => 10,isrs => [0],leader => 0,replicas => [0]},
                  #{id => 11,isrs => [0],leader => 0,replicas => [0]},
                  #{id => 12,isrs => [0],leader => 0,replicas => [0]},
                  #{id => 13,isrs => [0],leader => 0,replicas => [0]},
                  #{id => 14,isrs => [0],leader => 0,replicas => [0]},
                  #{id => 15,isrs => [0],leader => 0,replicas => [0]},
                  #{id => 16,isrs => [0],leader => 0,replicas => [0]},
                  #{id => 17,isrs => [0],leader => 0,replicas => [0]},
                  #{id => 18,isrs => [0],leader => 0,replicas => [0]},
                  #{id => 19,isrs => [0],leader => 0,...},
                  #{id => 20,isrs => [0],...},
                  #{id => 21,...},
                  #{...}|...]}]}}

I'll try to do a PR tomorrow but I no longer have direct write access to the original fork. Worst case I can close and re-open this PR with a personal fixed fork if you want this addition.

silviucpp commented 3 years ago

@v-kat feel free to close it and open a new one with this fix. I'm happy to merge it !

Silviu