noctarius / timescaledb-event-streamer

timescaledb-event-streamer is a command line program to create a stream of CDC (Change Data Capture) TimescaleDB Hypertable events from a PostgreSQL installation running the TimescaleDB extension.
Apache License 2.0
44 stars 4 forks source link

The app doesn't send second change event message to nats #122

Open falkolab opened 1 year ago

falkolab commented 1 year ago

I encountered a problem. The first change was successfully sent to the subscription topic, and I can read it, but the subsequent changes are getting stuck in the event-streamer.

I am debugging it and observing that it adds the task using rh.taskManager.EnqueueTask, but the task doesn't execute. The first change:

2023/11/12T21:42:15.931] [INFO] [ReplicationHandler] Starting replication handler loop  
[2023/11/12T21:42:15.931] [TRACE] [ReplicationHandler] Primary Keepalive Message => ServerWALEnd:0/2E39E49 ServerTime:2023-11-12 21:42:16.121476 +0300 MSK ReplyRequested:false  
[2023/11/12T21:42:15.934] [DEBUG] [ReplicationHandler] EVENT: {messageType:Begin finalLSN:0/2E3A120 commitTime:2023-11-12 21:39:18.311364 +0300 MSK xid:1119}  
[2023/11/12T21:42:15.934] [DEBUG] [ReplicationHandler] EVENT: {messageType:Relation relationId:39819 namespace:public relationName:test replicaIdentity:100 columnNum:2 columns:[{name:id flags:1 dataType:23 typeModifier:-1} {name:text flags:0 dataType:1043 typeModifier:54}]}  
[2023/11/12T21:42:23.420] [DEBUG] [ReplicationHandler] EVENT: {messageType:Update oldValues:map[] newValues:map[id:1 text:12]}  
[2023/11/12T21:42:37.442] [DEBUG] [ReplicationHandler] EVENT: {messageType:Commit flags:0 commitLSN:0/2E3A120 transactionEndLSN:0/2E3A150 commitTime:2023-11-12 21:39:18.311364 +0300 MSK}  
[2023/11/12T21:42:37.442] [INFO] [FileStateStorage] Auto storing FileStateStorage at /tmp/db_event_streamer/statestorage.dat  
[2023/11/12T21:42:37.443] [INFO] [FileStateStorage] Storing FileStateStorage at /tmp/db_event_streamer/statestorage.dat  
[2023/11/12T21:42:37.443] [DEBUG] [ReplicationHandler] EVENT: {messageType:Begin finalLSN:0/2E3A518 commitTime:2023-11-12 21:40:31.015822 +0300 MSK xid:1120}  
[2023/11/12T21:42:39.681] [DEBUG] [ReplicationHandler] EVENT: {messageType:Update oldValues:map[] newValues:map[id:1 text:13]}  
[2023/11/12T21:42:41.883] [TRACE] [EventEmitter] Publishing event: map[payload:map[after:map[id:1 text:12] op:u source:map[connector:timescaledb-event-streamer db:fonda lsn:0/2E39E80 name:fonda schema:public snapshot:false table:test ts_ms:1699814536123 txId:0xc000888bc0 version:0.11.1-dev] ts_ms:1699814557443] schema:map[fields:[map[field:source fields:[map[field:version type:string] map[field:connector type:string] map[field:name type:string] map[field:table type:string] map[field:lsn type:int64] map[field:xmin type:int64] map[field:ts_ms type:string] map[default:false field:snapshot type:boolean] map[field:schema type:string] map[field:txId type:int64]] name:io.debezium.connector.postgresql.Source type:struct] map[field:op type:string] map[field:tsdb_op type:string] map[field:ts_ms type:int64] map[field:before fields:[map[default:nextval('test_id_seq'::regclass) field:id index:0 type:int32] map[field:text index:1 length:50 optional:true type:string]] name:timescaledb.public.test.Value type:struct] map[field:after fields:[map[default:nextval('test_id_seq'::regclass) field:id index:0 type:int32] map[field:text index:1 length:50 optional:true type:string]] name:timescaledb.public.test.Value type:struct]] name:timescaledb.public.test.Envelope type:struct]]  
[2023/11/12T21:42:41.883] [DEBUG] [ReplicationHandler] EVENT: {messageType:Commit flags:0 commitLSN:0/2E3A518 transactionEndLSN:0/2E3A548 commitTime:2023-11-12 21:40:31.015822 +0300 MSK}  
[2023/11/12T21:42:41.883] [DEBUG] [ReplicationHandler] EVENT: {messageType:Begin finalLSN:0/2E3A5D0 commitTime:2023-11-12 21:40:49.082131 +0300 MSK xid:1121}  
[2023/11/12T21:42:43.027] [DEBUG] [ReplicationHandler] EVENT: {messageType:Update oldValues:map[] newValues:map[id:1 text:14]}  
[2023/11/12T21:42:44.023] [DEBUG] [ReplicationHandler] EVENT: {messageType:Commit flags:0 commitLSN:0/2E3A5D0 transactionEndLSN:0/2E3A600 commitTime:2023-11-12 21:40:49.082131 +0300 MSK}  
[2023/11/12T21:42:44.024] [TRACE] [ReplicationHandler] Primary Keepalive Message => ServerWALEnd:0/2E3A638 ServerTime:2023-11-12 21:42:16.151676 +0300 MSK ReplyRequested:false  

The second:

[2023/11/12T21:43:33.005] [TRACE] [ReplicationHandler] Primary Keepalive Message => ServerWALEnd:0/2E3A638 ServerTime:2023-11-12 21:43:33.196201 +0300 MSK ReplyRequested:false  
[2023/11/12T21:43:33.006] [DEBUG] [ReplicationHandler] EVENT: {messageType:Begin finalLSN:0/2E3A688 commitTime:2023-11-12 21:43:33.186734 +0300 MSK xid:1122}  
[2023/11/12T21:43:35.117] [DEBUG] [ReplicationHandler] EVENT: {messageType:Update oldValues:map[] newValues:map[id:1 text:15]}  
[2023/11/12T21:43:37.086] [INFO] [FileStateStorage] Auto storing FileStateStorage at /tmp/db_event_streamer/statestorage.dat  
[2023/11/12T21:43:37.086] [INFO] [FileStateStorage] Storing FileStateStorage at /tmp/db_event_streamer/statestorage.dat  
[2023/11/12T21:43:37.086] [DEBUG] [ReplicationHandler] EVENT: {messageType:Commit flags:0 commitLSN:0/2E3A688 transactionEndLSN:0/2E3A6B8 commitTime:2023-11-12 21:43:33.186734 +0300 MSK}  
[2023/11/12T21:43:37.086] [TRACE] [ReplicationHandler] Primary Keepalive Message => ServerWALEnd:0/2E3A6B8 ServerTime:2023-11-12 21:43:33.19649 +0300 MSK ReplyRequested:false  
noctarius commented 1 year ago

What kind of sink do you use? Is the previous event potentially still stuck in publishing due to retrying?

edit: overall I see 4 events, but only one published. I guess there should be Text=12, 13, 14, 15, shouldn't it?

falkolab commented 1 year ago

I suppose I trying to listen as core pob/sub but in fact it's is jetStream so I need to have Consumers configured. The publisher doesn't receive ack from stream server. This is my assumption, I'm investigating this thoughts

falkolab commented 1 year ago

edit: overall I see 4 events, but only one published. I guess there should be Text=12, 13, 14, 15, shouldn't it?

Yes, every time I restart the app, it sends the first "12" message and gets stuck on the publishing stage. I'm using the Openfaas CE installation, which doesn't use JetStream. I initially thought that you used Subject-Based Messaging. So, I believe we can close the issue, as I need to rethink my plans.

noctarius commented 1 year ago

You can totally implement your own sink, however the pub-sub system doesn't have order guarantees, which is why I don't use it.

You can use redpanda or just run a nats docker container.

falkolab commented 1 year ago

The idea was to link postgresql CDC to openfaas functions over nats-connect (it translates nats messages to func call) but you use jetStream and nats-connect uses nats core pub-sub. Yes I see what you mean.

Please pay attention you use "CDC (Chance Data Capture)" in the project description and readme, I suppose it was typo, the correct is CDC (Change Data Capture)

noctarius commented 1 year ago

Oh, good catch! Copy and paste for the win. Fixed it! Thanks again 💪

falkolab commented 1 year ago

I have completed some investigations, and as a newbie in NATS and JetStream, I assumed that the stream would be created when publishing a message. However, that is not true. One will have to create the jetstream with wildcard subject for example: nats str add test_timescaledb_public --subjects "timescaledb.public.*" --max-msgs 1000

I would advice some improvement:

Add configurable deadline on event pushing to not existed stream in order to avoid stuck

// TODO add config param
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

_, err = n.jetStreamContext.PublishMsg(
    &nats.Msg{
        Subject: topicName,
        Header:  header,
        Data:    envelopeData,
    },
nats.Context(ctx),
)

What is weird is that the client returned an error headers not supported by this server when the stream were not exist.

falkolab commented 1 year ago

I'll create PR