Aiven-Open / jdbc-connector-for-apache-kafka

Aiven's JDBC Sink and Source Connectors for Apache Kafka®
Apache License 2.0
94 stars 57 forks source link

Support metric to monitor import lag #179

Open cl-a-us opened 2 years ago

cl-a-us commented 2 years ago

We need a metric to measure the import lag during processing messages from a jdbc source. We were wondering whether it is possible to add further metrics to the source connector that can be added to the kafka connect Prometheus metrics. Our goal: Especially during first or initial import of database rows we want to monitor how many rows still need to be imported and are not already processed.

There is a metric that displays the number of polled rows (e.g. source-record-poll-total). This metric represents the number of rows which the connector has already processed/read from database. One cannot determine how many unpolled rows are left in the database and still need to be read from db.

I am new to kafka connect plugin development. First of all: is it possible to add such a metric to this kafka connect plugin? Second, if we would implement that metric would you merge that pull request to your code base?

mstein11 commented 2 years ago

This would be important to us too. Is anybody here that can answer a few question regrading how this can be achieved? After that we would be open to implement it ourselfs and merge it back here.

@jlprat, @ftisiot

Ugbot commented 2 years ago

Hiya! Thanks for commenting, we're always looking for insight into usage patterns and extra value we can add.

However, there's only one answer in SW engineering: "it depends" So here are some thoughts & questions

Key questions for this are about expectations of usage, and what you're looking to use this on.

Effectively you'd be running a count as well as the query to get up to the MAX row per poll, then submitting this metric in addition to storing it. Assuming its for just the initial spool up, this could simply be a count run on startup of the task then you'd more or less get the lag running down to zero over time, and that would be relatively easy to manage, if a touch awkward.

BUT, if you are running fast enough that you expect the Connector to get behind a lot, its likely that you'd be better served with something that reads directly from the write ahead log rather than querying. The problem here is that it is rather hard to quantify the cost of running this, as the count could scan a LOT of rows/tables dependent on the size of the query. Also as the tables can change at any time, there are a lot of assumptions that would have to be built in, making it rather specific to a mostly append only table/view.

Questions: Is this a continuous metric or a startup metric? Is this a streaming task or something more batch/ETL based? How complex is your query?

cl-a-us commented 2 years ago

We plan to use such a metric for an initial spool up. After all rows are imported other processes in our application landscape can start processing. In other words, other processes must wait for the initial spool up to finish. Our main goal is to recognize that the spool up is finished.

Technically, it would be a count on the database. If I understand you correct, you suggest implementing the count query during jdbcSourceTask.start(…) or jdbcSourceTask.poll depending on whether the count must be updated or not. How would you store the metric? Is it possible within a kafka connector to define custom metrics that are forwarded to kafka connect and will be provided to external sources via Prometheus? Or would you suggest another way to provide the metric to external sources?

To answer your questions:

Ugbot commented 2 years ago

Short version: We could more or less do either, adding it to metrics would be easest but its then on you to store/edit. Personally I'd store it and issue an update per task/pull, allowing you to see the updates

cl-a-us commented 2 years ago

Hey @Ugbot,

If you talk about metrics, do you talk about a metric that is published via JMX MBean? Or is there another way to provide metrics. I’m not sure, if I got what you mean by 'metrics'. Does your second proposal refer to a general metrics topic that is provided by kafka connect? If I understand you correct, we prefer the first option. We already use a jmxPrometheusExporter to extract and store the values in our Prometheus. To store a further value would just be another config line for the exporter.

Ugbot commented 2 years ago

Hiya! Sorry that last post seems to have gotten truncated a bit. some of the context was lost. Firstly I seem to be using log vs metric interchangeably. What is the ideal format for you? It would seem to be JMX from your comments.

Storing this data is a bit of an issue, you could put it into your DB, but from the description of the task, it does not actually need to be stored. The table is not going to change so having to do the count again on task startup should not be a huge hit. I'd literally keep it in memory and just have it published with the task poll logging.

I'll need to dig into the JMX stuff a bit to see if I can confirm this will work as expected....

cl-a-us commented 2 years ago

Hi @Ugbot,

I did some programming and provided a solution with a PullRequest! As a first step, my solution on startup just executes an initial Select count(*) ... with the given query or tables. It then provides that metric to an JMX endpoint. All other kafka connect metrics are provided via jmx export as well. I just needed to add another jmxPrometheusExporter rule to provide the new metric via Prometheus:

- pattern: io.aiven.connect.jdbc.initialImportCount<task="(.*)", topic="(.*)">
  name: aiven_startup_count
  labels:
    topic: "$2"
    task: "$1"
  help: "Kafka JMX metric initialImportCount"
  type: COUNTER

Is there anything I can do to speed up the merge process?

cl-a-us commented 2 years ago

Hey @Ugbot,

I’d like to kindly ask if there is any feedback to the code review? It would be great to see a progress.