jackc / pglogrepl

PostgreSQL logical replication library for Go.
MIT License
336 stars 63 forks source link

getting xid of each insert, update and delete statements #66

Open mehdipourfar opened 3 months ago

mehdipourfar commented 3 months ago

I want to create a change data capture service which listens to postgresql and stream changes to kafka. As I read in this article, lsn of different transaction commands, during concurrent writes, are not necessarily in order (but commit messages are ordered). So I wanted to group messages by xid and write them to kafka in batches. However, when I inspect xid of insert, update and delete statements, they are always zero. On solution that came to my mind was to set inStream=true in BeginMessage phase and set it to false in CommitMessage phase. I don't know if it is a valid action and I don't see capturing this events in this project example. This action is done during StreamStartMessageV2 and StreamStopMessageV2 which my code (and also the example code) does not produce it.