risingwavelabs / risingwave

Best-in-class stream processing, analytics, and management. Perform continuous analytics, or build event-driven applications, real-time ETL pipelines, and feature stores in minutes. Unified streaming and batch. PostgreSQL compatible.
https://go.risingwave.com/slack
Apache License 2.0
7.04k stars 579 forks source link

bug: kafka consuer lag size metrics are not dropped after related actors are dropped #16064

Open MrCroxx opened 7 months ago

MrCroxx commented 7 months ago

Describe the bug

Some source related metrics (e.g. source_latest_message_id, source_kafka_high_watermark, and maybe more) are not dropped after related actors are dropped. Use LabelGuardedMetrics instead to automatically drop the metrics.

Error message/log

No response

To Reproduce

No response

Expected behavior

Source related metrics should be dropped after the related actors are dropped.

How did you deploy RisingWave?

No response

The version of RisingWave

No response

Additional context

No response

MrCroxx commented 7 months ago

The difficulty lies in that certain labels have the same lifetime as the owner's, but some specific labels are random and do not have a clear lifetime relationship. 🤔

e.g.

For KafkaSplitReader, when reporting the metrics, the first two labels can be decided when the reader is created.

impl KafkaSplitReader {
    fn report_latest_message_id(&self, split_id: &str, offset: i64) {
        self.source_ctx
            .metrics
            .latest_message_id
            .with_label_values(&[
                // source name is not available here
                &self.source_ctx.source_id.to_string(),
                &self.source_ctx.actor_id.to_string(),
                split_id,
            ])
            .set(offset);
    }
}

But the last label is decided by its received message.

        #[for_await]
        'for_outer_loop: for msgs in self.consumer.stream().ready_chunks(max_chunk_size) {
            let msgs: Vec<_> = msgs
                .into_iter()
                .collect::<std::result::Result<_, KafkaError>>()?;

            // ... ...

            for (partition, offset) in split_msg_offsets {
                let split_id = partition.to_string();
                self.report_latest_message_id(&split_id, offset);
            }

cc @tabVersion @wenym1

MrCroxx commented 7 months ago

@wangrunji0408 may be facing the same problem.

MrCroxx commented 7 months ago

Should we introduce a new metrics guard that holds a label map internally?

wenym1 commented 7 months ago

Should we introduce a new metrics guard that holds a label map internally?

For simplicity, we can maintain a map in KafkaSplitReader to cache the labeled metrics? We can try to get the created labeled metrics from the map, and if not found, we can create a new one from the new split id.

github-actions[bot] commented 5 months ago

This issue has been open for 60 days with no activity. Could you please update the status? Feel free to continue discussion or close as not planned.