usedatabrew / blink

OpenSource data platform to build event-driven systems. It's like Deebezium for golang :)
https://docs.databrew.tech/open-source/prerequisites
MIT License
26 stars 1 forks source link

Flush last snapshot data #4

Closed le-vlad closed 11 months ago

le-vlad commented 11 months ago

In case if amount of data in the snapshot is lower then batch size - we have to flush the snapshot before starting logical replication

le-vlad commented 11 months ago

Should be fixed in #3

le-vlad commented 11 months ago

Fixed with time.AfterFunc()

if len(s.messagesBuffer) >= s.snapshotMaxBufferSize {
            err := s.writeSnapshotBatch()
            if err != nil {
                return err
            }

            s.messagesBuffer = []message.Message{}
            return nil
        } else if s.snapshotTicker == nil {
            // Start a timer if not already running
            s.snapshotTicker = time.AfterFunc(time.Second, func() {
                s.mutex.Lock()
                defer s.mutex.Unlock()
                err := s.writeSnapshotBatch()
                if err != nil {
                    panic("Failed to write snapshot batch")
                }
                s.messagesBuffer = []message.Message{}
                s.snapshotTicker.Stop()
                s.snapshotTicker = nil
            })
            return nil
        } else {
            return nil
        }