GridProtectionAlliance / openPDC

Open Source Phasor Data Concentrator
MIT License
128 stars 59 forks source link

How to use openPDC adapter to pass data into a specified partition under kafka topic? #160

Open snail526 opened 1 year ago

snail526 commented 1 year ago

Hi,community

I am having the following problem with the kafka adapter that comes with openPDC: I have created a kafka topic with 2 partitions and using openPDC's KafkaAdapter to pass data into the kafka partitions. Since there are 2 partitions under the topic and the data deposit process is polled, I want to know what I need to do if I want to pass a set of data into specified partition. Do I need to change some configuration parameters in the adapter? image

I would like to get your reply, thanks!

StephenCWills commented 1 year ago

You can specify Partitions=2 in the connection string of the adapter. It looks like the adapter uses the ID field of the input measurements to determine which partition they should go into, and it's fairly arbitrary. Measurements with even IDs would go into partition 0 and measurements with odd IDs would go into partition 1.

https://github.com/GridProtectionAlliance/gsf/blob/42929a71f25b663bcffc12625739695ef14153fb/Source/Libraries/Adapters/Kafka/TimeSeriesProducer.cs#L495

snail526 commented 1 year ago

Thank you for your reply.

I also have the following questions regarding the use of the kafka adapter that I would like to have answered by you: As shown in line 495 of the TimeSeriesProducer.cs code: image

If you want to get the value of PartitionId, you need to know the value of measurement.Key.ID first. So how do I get or modify the measurement.Key.ID parameter of the measurement data in openPDC using the test device SHELBY as an example? image

Once I have obtained the measurement.Key.ID, I can determine the number of partitions based on the number of groups of measurement data (under the same kafka topic) and thus pass the data from different groups into the partition I want.

So, I think the question is how to get the measurement.Key.ID parameter.

These are my questions and I look forward to your professional response, thank you!

StephenCWills commented 1 year ago

The ID column in your screenshot is what is referred to as the measurement key in the code. So measurement.Key.ID for the selected frequency in your screenshot would be 2, and measurement.Key.ID for the selected frequency delta measurement would be 4.

The (SQLite) definition for the Measurement table in the database looks like this.

https://github.com/GridProtectionAlliance/openPDC/blob/ad18a86795f3af833e1eed4caa87576ffd2d4239/Source/Data/SQLite/openPDC.sql#L303-L326

In this definition, the PointID field is the one that is referred to as measurement.Key.ID in the code. So you would need to update that field in order to change the partition manually.

The field is an autoincrementing integer, sometimes used as a primary key, and it's referenced by foreign keys in other tables so you may have some difficulty changing it. You might be better off making a change to the Kafka Producer adapter that would enable you to explicitly specify which partition each measurement goes to.

snail526 commented 1 year ago

Thank you.