akoutmos / prom_ex

An Elixir Prometheus metrics collection library built on top of Telemetry with accompanying Grafana dashboards
MIT License
607 stars 105 forks source link

PromEx.Plugins.Broadway assigns "acknowledger" name to message metrics tag rather than module name #114

Closed dsschneidermann closed 2 years ago

dsschneidermann commented 2 years ago

From https://github.com/akoutmos/prom_ex/blob/3557e828cf3905adaf6719003e68b65294b62288/lib/prom_ex/plugins/broadway.ex#L248

This populates the name tag with the module name of my producer module, eg. in my code App.Producers.TestProducer

It conflicts with the expectation the dashboard has, that message metrics are reported with the name of the Broadway module, eg. in my example code this is filtered with the value App.Broadway and shows no data.

image

image

I don't know if there is a different idea at play here than matching the metrics with the processing module, but it seems that the same information is not readily available. Does this need a PR in Broadway to include additional information?

The same issue is present for exception info; https://github.com/akoutmos/prom_ex/blob/3557e828cf3905adaf6719003e68b65294b62288/lib/prom_ex/plugins/broadway.ex#L347

Edit: I see the batch tag extraction gets the topology_name - is there a way to get the same passed to the messages metrics handler?

dsschneidermann commented 2 years ago

My mix.exs content

      {:broadway, "~> 1.0"},
      {:prom_ex, "~> 1.6.0"},
akoutmos commented 2 years ago

Hey Dennis!

I'll take a look at this either tonight or tomorrow and get back to you. I am almost positive that the example app in the repo is working correctly....so I am curious what is going on in your case.

dsschneidermann commented 2 years ago

Thanks, I will check the example repo and report back (after this weekend)

On Wed, 2 Feb 2022, 18:23 Alexander Koutmos, @.***> wrote:

Hey Dennis!

I'll take a look at this either tonight or tomorrow and get back to you. I am almost positive that the example app in the repo is working correctly....so I am curious what is going on in your case.

— Reply to this email directly, view it on GitHub https://github.com/akoutmos/prom_ex/issues/114#issuecomment-1028172941, or unsubscribe https://github.com/notifications/unsubscribe-auth/AC2MWEJ7GXTSXOYEE4FVVZDUZFSBNANCNFSM5NJPODKQ . You are receiving this because you authored the thread.Message ID: @.***>

dsschneidermann commented 2 years ago

Hi again @akoutmos, you're right, the example applications have the answer. What makes it work is specifically assigning the acknowledger for the pipeline, eg. lines https://github.com/akoutmos/prom_ex/blob/d4346eb0ca87ffba5276ffaf0723b8f7c4f5b4fd/example_applications/web_app/lib/web_app/temp_processor.ex#L14 and https://github.com/akoutmos/prom_ex/blob/d4346eb0ca87ffba5276ffaf0723b8f7c4f5b4fd/example_applications/web_app/lib/web_app/temp_processor.ex#L63

It should probably be documented or pointed to as the implementation method. I'll add the fix to my own code and confirm it works, and make a PR with a line in the documentation for the Broadway plugin. Would you prefer in the docs to just point to the example application, or to provide the steps that are needed?

dsschneidermann commented 2 years ago

@akoutmos Getting back to my code and prom_ex integration here and I'm finding a potential trap with this transform addition that skips all acks back to the original producer.

Not sure how this is missed, but the impact on the standard Broadway producers is huge, fx. it would result in messages not being dequeued from RabbitMQ (from my reading of the source code apply_ack_func would never be called?) I haven't tested with any actual producers - but you must have some experience here. Maybe I'm way off.

However, working through it, I have made the passthrough ack below as a solution that uses the Broadway ack semantics to group correctly back to the original producer(s). It should be a permanent fix, but it does bloat the code somewhat, and needs comments to explain whys:

def transform(%Message{data: data, acknowledger: {ack_module, ack_ref, ack_data}}, _opts) do
  # Updating the acknowledger module is needed for PromEx tracking, but we preserve
  # the original and ensure we ack our messages to it in the `ack` function.
  %Broadway.Message{
    data: data,
    acknowledger: {__MODULE__, {ack_module, ack_ref}, ack_data}
  }
end

@impl Broadway.Acknowledger
def ack({ack_module, ack_ref}, successful, failed) do
  restore_original = fn msgs ->
    Enum.map(msgs, fn %Broadway.Message{acknowledger: {_, _, ack_data}} = x ->
      put_in(x.acknowledger, {ack_module, ack_ref, ack_data})
    end)
  end

  # Pass messages on to the producer acknowledger. As we are grouping by the
  # ack_module and ack_ref, we can simply send successes and failures.
  ack_module.ack(ack_ref, restore_original.(successful), restore_original.(failed))
end

I'd like your input on how come this hasn't been an blocker for the standard Broadway producers, and what we should do, if anything? Potentially this code could be dropped into a PromExAcknowledger, making it a bit easier to employ and document usage for.

I can put in some time to do what is needed.

Thanks!

maxmarcon commented 2 years ago

Thank you for this post! I bumped exactly into the same problem while using the Amazon/SQS producer.

The change suggested in the docs can't work for most real-world producers, because it overwrites the original acknowledger. I'm gonna try your solution right now.

I'd like your input on how come this hasn't been an blocker for the standard Broadway producers

I would venture to guess that not many folks are using PromEx with Broadway yet, and that's the reason 😄

maxmarcon commented 2 years ago

works! Thanks again.

Potentially this code could be dropped into a PromExAcknowledger, making it a bit easier to employ and document usage for.

👍 from my side

marcospgsilva commented 2 years ago

Hi @dsschneidermann ! There's an exception while using PromEx.Plugins.Broadway with Broadway Kafka and adding the transform/2 function.

The exception is: [error] GenServer :"condition_sync_producer_consumer.Broadway.Producer_0" terminating ** (KeyError) key :topic not found in: %{} (broadway_kafka 0.3.0) lib/broadway_kafka/producer.ex:557: anonymous fn/2 in BroadwayKafka.Producer.build_allocator_spec_and_consumer_entry/5 (broadway 1.0.3) lib/broadway/topology.ex:228: anonymous fn/3 in Broadway.Topology.build_producers_specs/2 (gen_stage 1.1.2) lib/gen_stage/dispatchers/partition_dispatcher.ex:258: GenStage.PartitionDispatcher.split_events/4 (gen_stage 1.1.2) lib/gen_stage/dispatchers/partition_dispatcher.ex:243: GenStage.PartitionDispatcher.dispatch/3 (gen_stage 1.1.2) lib/gen_stage.ex:2322: GenStage.dispatch_events/3 (gen_stage 1.1.2) lib/gen_stage.ex:2140: GenStage.handle_noreply_callback/2 (stdlib 3.17) gen_server.erl:695: :gen_server.try_dispatch/4 (stdlib 3.17) gen_server.erl:771: :gen_server.handle_msg/6 Last message: {:poll, {1302, "statuses", 0}} State: %{consumers: [{#PID<0.3407.0>, #Reference<0.3463651493.1223163905.98119>}, {#PID<0.3406.0>, #Reference<0.3463651493.1223163905.98112>}], module: BroadwayKafka.Producer, module_state: %{acks: %{{1302, "statuses", 0} => {[], 22476728, []}}, allocator_names: {0, [:"Elixir.condition_sync_producer_consumer.Allocator_processor_default"], [:"Elixir.condition_sync_producer_consumer.Allocator_batcher_consumer_default"]}, buffer: {[], []}, client: BroadwayKafka.BrodClient, client_id: :"Elixir.condition_sync_producer_consumer.Broadway.Producer_0.Client", config: %{client_config: [connect_timeout: 60000, ssl: true, sasl: {:scram_sha_512, _, _}], fetch_config: %{}, group_config: [offset_commit_policy: :commit_to_kafka_v2, rejoin_delay_seconds: 5], group_id: "condition-sync", hosts: [{_, _}, {_, _}], offset_commit_on_ack: true, offset_reset_policy: :earliest, receive_interval: 2000, reconnect_timeout: 1000, topics: ["statuses"]}, demand: 20, group_coordinator: #PID<0.3400.0>, receive_interval: 2000, receive_timer: #Reference<0.3463651493.1223163905.99911>, reconnect_timeout: 1000, revoke_caller: nil, shutting_down?: false}, rate_limiting: nil, transformer: {MyApp.Workers.ConditionSyncProducerConsumer, :transform, []}}

Apparently, the BroadwayKafka.Producer needs some message.metadata keys, like :topic. But, when we run the transform/2 function, we create a new %Broadway.Message{} struct and do not copy the metadata from original message. Therefore, message.metadata is a empty map %{}.

So, I changed to:

def transform(%Message{data: data, metadata: metadata, acknowledger: {ack_module, ack_ref, ack_data}}, _opts) do
  # Updating the acknowledger module is needed for PromEx tracking, but we preserve
  # the original and ensure we ack our messages to it in the `ack` function.
  %Broadway.Message{
    data: data,
    metadata: metadata,
    acknowledger: {__MODULE__, {ack_module, ack_ref}, ack_data}
  }
end

and the exception was fixed.

Please, let me know if I made some mistake.

maxmarcon commented 2 years ago

@marcospgsilva there is a simpler, general way to solve this and other similar issues that may arise: Instead of selecting the fields of the Message struct that you want to copy into the new message, just copy the entire original message and only update the acknowledger field. This is what I do:

def transform(%Message{acknowledger: {ack_module, ack_ref, ack_data}} = message, []) do
    %Message{message | acknowledger: {__MODULE__, {ack_module, ack_ref}, ack_data}}
end

And you're good to go. Whatever fields the producer might expect to be in the message, they'll be there.

marcospgsilva commented 2 years ago

@maxmarcon Thank you! I'll try your solution.

dsschneidermann commented 2 years ago

@akoutmos Reopen this to welcome a PR?

akoutmos commented 2 years ago

I have reopened the issue. Any PRs are welcome that address this. I think the best place to start would be to add RabbitMQ and Kafka to the example app in the PromEx repo to have something that tests this E2E. It looks like the GenStage backed Broadway modules that I put together do not replicate the issues that people are encountering with RabbitMQ and Kafka.

maxmarcon commented 2 years ago

Thanks. I could work on a PR when I find a bit of time. Or @dsschneidermann would you like to take a stab at this first?

Just for completeness: this is not limited to RabbitMQ or Kafka. AmazonSQS is also affected. In general, any producer that sets its own acknowledger in the message will have this problem.

dsschneidermann commented 2 years ago

@maxmarcon Please do, I've had to leave my Elixir project on the backburner for now.

maxmarcon commented 2 years ago

I took a look at the code and I believe that this is no longer an issue since this commit: d76956385c5622096c88b33545c3998f7cc16de8. Before this commit, the pipeline's name was being extracted from an overwritten acknowledger. Since this commit, the name is extracted from the telemetry event's metadata, which is the simple and obvious way of doing it. Overwriting the acknowledger just to make the pipeline's name available was an (unnecessary) hack. The problem is, after the change the documentation wasn't updated, and it stills says one should overwrite the acknowledger.

I'll verify the above and then create a PR to update the docs.