bersler / OpenLogReplicator

Open Source Oracle database CDC
https://www.bersler.com
GNU General Public License v3.0
280 stars 131 forks source link

Add additional information with primary key data to Kafka messages #165

Open ageMLex2021 opened 1 month ago

ageMLex2021 commented 1 month ago

Good day! Is your feature request related to a problem? Please describe. How to fill kafka key field for OLR Destination Kafka? Why I need it? When you send message to kafka, it sended to partition. If you dont have filled key field, kafka cant send messages with same primary keys to one partition. If messages with same primary keys in different partition, I cant read kafka topic with parallel consumers without possibility miss order of update data.

Describe the solution you'd like I need options for send to Kafka key field Table name + PK value with any separator -> rd_kafkaproducev( rk, RD_KAFKA_V_TOPIC(topic_name.c_str()), RD_KAFKA_V_PARTITION(partition), RD_KAFKA_V_MSGFLAGS(msgflags), RD_KAFKA_V_VALUE(payload, len), RD_KAFKA_V_KEY(key, key_len), RD_KAFKA_V_TIMESTAMP(timestamp), RD_KAFKA_V_OPAQUE(msg_opaque), RD_KAFKA_V_END)); I need parametr for RD_KAFKA_V_KEY(key, key_len)

Describe alternatives you've considered Yes, I know about OLR + debezium, but in my case debezium cant work propertly, beacase my DB source do bad message (it fill null for not nullable field) make debezium to crush. Yes, I know about I can reproduce data to new topic with key field filled, but data flow huge and why we can`t do it at first time?

Additional context Add any other context or screenshots about the feature request here.

bersler commented 1 month ago

Hi @ageMLex2021 . Is this a feature request or a bug? I really don't understand from the description. Please write details what you see and what you expect to see. Examples are welcome.

ageMLex2021 commented 1 month ago

Hi @bersler ! I think this is a feature request. Openlogreplicator have kafka as destination. It is works good. But olr populate only value in kafka message and dont populate key. It is not a problem, if you have a tiny data flow, because you consume from kafka with one consumer. If you have huge flow, for example 50000000 messages for 24 h, and you want read it before new portion of data will be produced to topic, you need many parallel consumers. It is not a problem, run 48 consumers and read data, but without filled kafka field key, rows with same primary key values kafka put in different partitions. For example, two consumers read from different partitions two rows with same PK with millisecond time delta, and first reader-consumer must put data to DB first, but it is parallel processes and first put same PK data after second consumer and delete last updated data. Only what need to do for fix this, fill-up the key field in kafka message with tablename;all PK values for every message. In olr config you have this information about PK. Get value message, read from it table name and all values for PK field and this concatenated string put to kafka key. After this feature kafka destination will put the same PK values only to one partition and only one consumer will consume this values from one partition, and other consume data from another partition. And huge data will be consumed fast.

ageMLex2021 commented 1 month ago

IMG_20240802_231248 Example of key fill. Pk name & value order is important, the same like in config.

ageMLex2021 commented 1 month ago

SAVE_20240802_232658 The same partition if the same pk values in key field

bersler commented 1 month ago

Hi @ageMLex2021 , what I have just understood is that you are not happy how the code works today. If this is a feature request you need to define what is the expected result. Please describe: what additional parameters you would like to have, how they behave, etc. From the screenshot it is not clear what to change in the code and how to verify if the result is ok or not.

ageMLex2021 commented 1 month ago

Hi @bersler ! I will prepare all what you need:

  1. Place of C++ code, where and witch new parameter need add.
  2. Place of olr config where and witch parameter need.
  3. Python code for example, how to generate new parameter and it value.
  4. Oracle schema + olr config + python code for generation test data.
  5. Python code for test result of enhancement (reading from kafka and compare different rows with the same PK). What else do we need for success development?
bersler commented 1 month ago

When describing functional change please don't write about implementation (code in C++). Just write how you think the program should work. Like, text description. No code. So far you have complained that it does not work like expected. So what is the expected result?

ageMLex2021 commented 1 month ago

Hi @bersler !

So what is the expected result?

I have expected, what openlogreplicator put to kafka not only message value, but it key. I have expected, what key it is a string with text contains permanent order, kind and format of values of the current message: table name + ; + value of first pk column + ; + value of second pk column + ; + . . . + ; + value of last pk column. It is enought for kafka to put messages with the same key strings value to the same partition. It is enought to make me happy and consuming from kafka parallel without missmatching.

bersler commented 1 month ago

I really don't understands what is expected by you. If you mark primary key columns with supplemental log then it is also present in the message. You already should have the primary key in the message in Kafka. You have the payload with primary key data in the message. Maybe an example is good here. Like you have this sql, and this output, but you are expecting instead that output.

ageMLex2021 commented 1 month ago

Example:

openlog replicator config excerpt: { "log-level": 4, "source": [ { "alias": "A4_2", "filter": { "table": [ { "owner": "COLVIR", "table": "ANLACC", "key": "DEP_ID, ID" }, ...

Message value excerpt: { "c_idx": 8, "c_scn": 23733032827, "db": "CBSTEST", "payload": [ { "after": { "CHA_ID": 2265, "CODE": "CR_EXP_IN_00000000", "DEP_ID": 765, "ID": 13397400, "LONGNAME": null, "NAME": null }, "op": "c", "rid": "AAAkclAAPAAIC1IAEb", "schema": { "columns": [ ... ], "obj": 69903, "owner": "COLVIR", "table": "ANLACC" } } ], "scn": 23733023482, "tm": 1722041037000000000, "xid": "0x001c.045.0000a1e1" }

Key value: ANLACC;765;13397400

Alternative key value if it more comfortable for enhansment development do key from json serelalization: {"table_name":"ANLACC", "DEP_ID" : 765, "ID" : 13397400} {"table_name":"ANLACC","pk":"DEP_ID,ID"};765;13397400

ageMLex2021 commented 1 month ago

You talk about only value of kafka where save message. But kafka message have more columns. All kafka columns it is a header, key, value.

bersler commented 1 month ago

What about when there is no defined primary key for the table?

ageMLex2021 commented 1 month ago

If primary key is not defined it is not need to fill kafka key value or put null

bersler commented 1 month ago

Other replication tools would use for primary key instead a unique index if present or all columns. This would make the message very long. I prefer to have just one message because you can have Kafka as output, because it does not have to be Kafka, it can be also other output format, like network stream. Could bd json but also protobuf. I don't want to get tight to some message streaming and it's limitations.

ageMLex2021 commented 1 month ago

You can do key filling optional with config switching. Kafka it is simple way to work with huge stream and you can make it possible just do one more option for you product. If some body not need long key, they just switch of its option. What do you think about this way?

ageMLex2021 commented 1 month ago

I understand that space for all parameters is reserved and refactoring will not be easy. But all I want is to see this in your code in the end rd_kafkaproducev( rk, RD_KAFKA_V_TOPIC(topic_name.c_str()), RD_KAFKA_V_PARTITION(partition), RD_KAFKA_V_MSGFLAGS(msgflags), RD_KAFKA_V_VALUE(payload, len), RD_KAFKA_V_KEY(key, key_len), RD_KAFKA_V_TIMESTAMP(timestamp), RD_KAFKA_V_OPAQUE(msg_opaque), RD_KAFKA_V_END)); Please)

ageMLex2021 commented 1 month ago

I have an idea about long message) Can you this string key ANLACC;765;13397400 calculate through CRC32 function and result put to optional key field? This approach fix long key issue. Two messages with same CRC32 key field to be put to the same partition by kafka.

bersler commented 1 month ago

I think this feature is interesting. The next step would be write a good description of this feature so that we can find more companies who would be interested in using this feature and actually sponsoring the development works. I think that the priory is low. You can have a workaround and it is not something that without it OLR would not work.

ageMLex2021 commented 1 month ago

This is the description of proposal generated by chatgpt4. Feature_Request_Add_PK_Data_to_Kafka_Messages.pdf

ageMLex2021 commented 1 month ago

How much money would you like to raise for developing this feature? I want to ask among those I know.

ageMLex2021 commented 1 month ago

Yes, I'll work with OpenLogReplicator over the network for now. It's a decent workaround, but I'll need to take care of delivery and parallelization myself.

bersler commented 1 month ago

Hi @ageMLex2021 , can you contact me directly to discuss some details? I will prepare a complex analysis how much work is required for this feature.

ageMLex2021 commented 1 month ago

I have sent you email.

bersler commented 3 weeks ago

Hi @ageMLex2021,

I have made a summary to the description of the task you have defined. Currently I have vacation period and I could not think about this task earlier.

Currently OLR is producing messages in various formats. Supported format is only json, but there is also unsupported protobuf format. Other formats could be used as well.

For the purpose of comnfirmation of messages, every message contains information about c_scn/c_idx which should be extracted from the message and provided to OLR during confirmation what was the last message that has been processed by the client.

This feature request is to provide additional metadata along with the message which would be used for traffic routing purpose for Kafka receiver.

Perhaps if there are other producers in the future, this could be used for them as well.

For network stream producer (WriterStream) this would be ignored, since tehre is just one pipe for messages and this form of communication does not allow sending metadata along the message.

Same would be with file producer (WriterFile).

Configuration:

Various information can be added as a key to the message. This request says about writing information from primary key, but this is not the only possible solution.

The configuration would need to define a format which could be easily extended in the future.

By default, no key is added to messages.

'format': {
  ...
}

User could define a global setting for all messages defining the key parameter:

'format': {
  'key': {
    'separator': ';'
    'table': [{...}]
  }
}

The table list would would contain definitions for creating the key values based on various criteria. OLR would go thoguth the list and if the table matches one of the elements, certain rules would be used for selecing the data for the Kafka Key field.

For example:

'format': {
  'key': {
    'table': [
      {'owner': 'OWNER1', 'table': '.*', 'columns': '[pk]'}
    ]
  }
}

This would mean that for all tables owned by OWNER1 and those which would match the regex .* (all would match) there would be key added based on the value defined in the columns field.

The 'columns' field could have other values like:

The 'separator' field would contain a string which would be used as a separator between values. This could be a multiple character parameter, but could also be empty. The default separater would be ';' (semicolon).

There could be multiple rules like for example:

'format': {
  'key': {
    'table': [
      {'owner': 'XXX', 'table': 'YY', 'key': 'a, b'}
      {'owner': '.*', 'table': '.*', 'key': '[pk]'},
      {'owner': '.*', 'table': '.*', 'key': '[ui]'}
    ]
  }
}

First, we have an exception, for table XXX.YY we use list of columns [a,b] as values for Key. For all tables (beside the table XXX.YY) we just add a primary key as the key (if it exists). If the table does not contain a primary key (rule 1 is not matching), but contains an unique index, the unique index is used instead. If table is not XXX.YY and does not have a primary key and does not have unique index, no key is added.

The values of defined columns in string format (it would need to be verified if binary or UTF-8 values are accepted by Kafka) would be concatenated using the value defined by separator configuration parameter and added to the Kafka message using the RD_KAFKA_VTYPE_KEY librdkafka message parameter during the send operation.

If a table does not have defined columns, error is printed and replication stops.

Please confirm if this is what you have described. This is actually what I meant when I was referring to describe in detail what is defined by this feature request.