dashbitco / broadway_kafka

A Broadway connector for Kafka
222 stars 52 forks source link

Manual Partition Assignment #125

Closed David-Klemenc closed 4 months ago

David-Klemenc commented 1 year ago

Is it possible to do Manual Partition Assignment like described here: KafkaConsumer.html?

The problem I have is that when I connect to kafka via cli: bin/kafka-console-consumer.sh --bootstrap-server my.server:9097 --topic my-topic --consumer.config client-ssl.properties I get events from both partitions (0 and 1), but when I connect via Broadway I only get events from partition 1 for some reason:

16:13:32.944 [notice]     :supervisor: {:local, :brod_sup}
    :started: [
  pid: #PID<0.1769.0>,
  id: Test.Integration.KafkaPipeline.Broadway.Producer_0.Client,
  mfargs: {:brod_client, :start_link,
   [
     ["my.server": 9097],
     Test.Integration.KafkaPipeline.Broadway.Producer_0.Client,
     [
       ssl: [
         verify: :verify_peer,
         cacertfile: "certs/ca_cert",
         keyfile: "certs/ca_key",
         password: "******",
         certfile: "certs/cert_file"
       ]
     ]
   ]},
  restart_type: {:permanent, 10},
  shutdown: 5000,
  child_type: :worker
]

16:13:53.940 [info] Group member (spin_events,coor=#PID<0.1771.0>,cb=#PID<0.1768.0>,generation=78):
elected=false

16:13:53.983 [info] Group member (spin_events,coor=#PID<0.1771.0>,cb=#PID<0.1768.0>,generation=78):
assignments received:
  my-topic:
    partition=1 begin_offset=29035

16:13:57.537 [info] client Test.Integration.KafkaPipeline.Broadway.Producer_0.Client connected to my.server:9097

Some additional partition info:

bin/kafka-topics.sh --describe --bootstrap-server my.server:9097 --topic my-topic --command-config client-ssl.properties 
Topic: my-topic TopicId: ************ PartitionCount: 2 ReplicationFactor: 3  Configs: segment.bytes=1073741824
  Topic: my-topic Partition: 0  Leader: 3 Replicas: 3,1,2 Isr: 1,2,3
  Topic: my-topic Partition: 1  Leader: 1 Replicas: 1,2,3 Isr: 1,2,3
josevalim commented 1 year ago

@David-Klemenc we let Kafka assign the partitions, so is there any reason why Kafka is not assigning it to us? We don't have an explicit manual partition assignment but we can likely accept a PR for such if it is supported by the underlying brod library.

David-Klemenc commented 1 year ago

@josevalim kafka4beam/brod library supports brod_topic_subscriber: Subscribe on messages from all or selected topic partitions without using consumer groups. I'll have a look at making a possible PR (but I'm a bit conflicted if it is needed or not)

For anyone having the same issue - what happed for me was - a colleague used the same group_id as me by accident and - kafka correctly spit topic partitions among us.

yordis commented 1 year ago

For anyone having the same issue - what happed for me was - a colleague used the same group_id as me by accident and - kafka correctly spit topic partitions among us.

Oh yeah, fun discovery! Here is a code snippet I keep around working with Kafka:

defmodule MyApp.Kafka do
  @business_name "mybusiness"
  @app_name "myapp"
  @topic_joiner "."

  @doc """
  Returns the Group ID with the Service Name Prefix.

      iex> MyApp.Kafka.consumer_group_id("my-group-id")
      "mybusiness.myapp.testing.my-group-id.mycomputername"
  """
  def consumer_group_id(group_id) do
    release_env = Application.fetch_env!(:my_app, :release_environment)

    prefix =
      :my_app
      |> Application.get_env(__MODULE__, [])
      |> Keyword.get(:consumer_group_prefix)
      |> consumer_group_prefix(release_env)
      |> Enum.join(@topic_joiner)

    Enum.join([@business_name, @app_name, prefix, group_id], @topic_joiner)
  end

  defp consumer_group_prefix(nil, release_env), do: [release_env]
  defp consumer_group_prefix(prefix, release_env), do: [release_env, prefix]
end
# config/test.exs
computer_username = System.get_env("USER") || System.get_env("USERNAME") || "UNKNOWN"

config :my_app, :release_environment, "testing"
config :my_app, MyApp.Kafka, consumer_group_prefix: computer_username

It is not a silver bullet, but it will avoid (hopefully) the situation where you are sharing Kafka clusters in a dev environment or between environments like some.

josevalim commented 4 months ago

Closing this for now then. Thank you!