kafkaex / kafka_ex

Kafka client library for Elixir
MIT License
596 stars 162 forks source link

FIX: Partition assignment when topic doesn't exist #457

Closed Zarathustra2 closed 7 months ago

Zarathustra2 commented 2 years ago

Fixes an issue when having a key and the topic does not exist. Previously this would cause the genserver to terminate:

mfa=:gen_server.error_info/7 [error] GenServer :greeks_worker_4 terminating ** (ArithmeticError) bad argument in arithmetic expression :erlang.rem(2111275913, 0) (kafka_ex 0.12.1) lib/workers/greeks/greek_server.ex:418: KafkaEx.DefaultPartitioner.assign_partition_with_key/3 (kafka_ex 0.12.1) lib/kafka_ex/server.ex:400: KafkaEx.Server0P10AndLater.kafka_server_produce/2 (stdlib 3.17) gen_server.erl:721: :gen_server.try_handle_call/4 (stdlib 3.17) gen_server.erl:750: :gen_server.handle_msg/6 (stdlib 3.17) proc_lib.erl:226: :proc_lib.init_p_do_apply/3

sourcelevel-bot[bot] commented 2 years ago

Hello, @Zarathustra2! This is your first Pull Request that will be reviewed by SourceLevel, an automatic Code Review service. It will leave comments on this diff with potential issues and style violations found in the code as you push new commits. You can also see all the issues found on this Pull Request on its review page. Please check our documentation for more information.

Zarathustra2 commented 2 years ago

Actually I think, this is bad idea. A future message with the same key could be going into a different partition if the auto created topic has more than 1 partition. I am not sure what the best approach would be in that case

Argonus commented 2 years ago

@Zarathustra2 We've had a similar issue. Instead of assigning a consumer to nonexisting data, we've just added health checks that observe that the consumer is connected to all topics & partitions it should. If it's not, we are restarting it a few times & killing the runtime after some time.

dantswain commented 2 years ago

Hi @Argonus and @Zarathustra2 !

I think maybe the best thing here is to raise an error if this condition is encountered? Attempting to publish to a topic that doesn't exist is kind of undefined behavior to begin with, so maybe we just give the user a more helpful error message so that they can decide how to resolve it for their use case?