quix-labs / flash

Go library for managing real-time PostgreSQL changes.
MIT License
43 stars 5 forks source link

How do we set up wal_logical driver? #1

Closed vamshiaruru closed 2 weeks ago

vamshiaruru commented 2 weeks ago

Hey guys, came across this really cool library on twitter, great job! I want to know how to get this working with wal_logical driver? Do we have to configure the db on our and set up the logical replication, replication slot etc first and then pass it to the library or does the library create it? trigger driver seems to work without any further configuration, but wal_logical doesn't seem to. Hence the doubt.

Thanks in advance!

alancolant commented 2 weeks ago

The specific documentation for each driver is currently being written and will be available in drivers/wal_logical/README.MD.

To use this driver, you just need to pass the driver parameter manually like this:

driver := wal_logical.NewDriver(&wal_logical.DriverConfig{
    // UseStreaming: true,
})

clientConfig := &types.ClientConfig{
    DatabaseCnx:     "postgresql://devuser:devpass@localhost:5432/devdb",
    Logger:          &logger, // Définissez votre custom zerolog.Logger ici
    ShutdownTimeout: time.Second * 2,
    Driver:          driver,
}

For now, you can check examples/development/development.go.

You just need to configure the PostgreSQL database with wal_level=logical, an example can be found in the docker-compose.yaml file.

The configuration of the publications and the replication slot will be done automatically, no need to declare them beforehand.

I'll leave this issue open until the specific driver documentation is written.

vamshiaruru commented 2 weeks ago

Understood, thanks!

vamshiaruru commented 2 weeks ago

I have tried following the example, and am facing the following issue:

panic: ERROR: client sent proto_version=4 but we only support protocol 3 or lower (SQLSTATE 0A000)

I imagine it is because I am on postgres 15 and it has replication version 3 :(

alancolant commented 2 weeks ago

I have tried following the example, and am facing the following issue:

panic: ERROR: client sent proto_version=4 but we only support protocol 3 or lower (SQLSTATE 0A000)

I imagine it is because I am on postgres 15 and it has replication version 3 :(

Okay, noted. Actually, I support the protocol in version 2, so I'll downgrade it. It should work now. You probably need to delete manually created publication before you next start.

DROP PUBLICATION IF EXISTS "flash_publication-init";
alancolant commented 2 weeks ago

@vamshiaruru I have added documentation on how to configure the drivers.

Please see drivers/wal_logical/README.md or drivers/trigger/README.md.

Your feedback on the clarity of the documentation would be greatly appreciated.

vamshiaruru commented 2 weeks ago

Hi @alancolant , thanks for the documentation. That's very helpful. However, when I try out with logical wal replication, the typed event doesn't seem to have any data. For example in insert event, typedEvent.New is just an empty map. Following is my code

package main

import (
    "fmt"
    "os"
    "os/signal"
    "time"

    "github.com/quix-labs/flash/pkg/client"
    "github.com/quix-labs/flash/pkg/drivers/wal_logical"
    "github.com/quix-labs/flash/pkg/listeners"
    "github.com/quix-labs/flash/pkg/types"
)

func main() {

    postsListenerConfig := &types.ListenerConfig{
        Table:              "public.entity_definition",
        MaxParallelProcess: 1, // In most case 1 is ideal because sync between goroutine introduce some delay
    }
    postsListener, _ := listeners.NewListener(postsListenerConfig)
    stopAll, err := postsListener.On(types.OperationAll, func(event types.Event) {
        switch typedEvent := event.(type) {
        case *types.InsertEvent:
            fmt.Printf("insert - new: %+v\n", typedEvent.New)
        case *types.UpdateEvent:
            fmt.Printf("update - old: %+v - new: %+v\n", typedEvent.Old, typedEvent.New)
        case *types.DeleteEvent:
            fmt.Printf("delete - old: %+v \n", typedEvent.Old)
        case *types.TruncateEvent:
            fmt.Printf("truncate \n")
        }
    })
    if err != nil {
        panic(err)
    }

    defer func() {
        err := stopAll()
        if err != nil {
            panic(err)
        }
    }()

    driver := wal_logical.NewDriver(&wal_logical.DriverConfig{
        //UseStreaming: true,
    })

    // Create client
    clientConfig := &types.ClientConfig{
        DatabaseCnx:     "postgresql://username:password@localhost:5432/atlantis",
        ShutdownTimeout: time.Second * 2,
        Driver:          driver,
    }
    flashClient, _ := client.NewClient(clientConfig)
    flashClient.Attach(postsListener)

    // Start listening
    go func() {
        err := flashClient.Start()
        if err != nil {
            panic(err)
        }
    }() // Error Handling

    defer func() {
        err := flashClient.Close()
        if err != nil {
            panic(err)
        }
    }()

    // Wait for interrupt signal (Ctrl+C)
    interrupt := make(chan os.Signal, 1)
    signal.Notify(interrupt, os.Interrupt)
    <-interrupt

    fmt.Println("Program terminated.")
}

I am also not receiving any udpate events, but I am getting events for insert and delete.

alancolant commented 2 weeks ago

@vamshiaruru See #2 . it's fixed but the usage change in favor of one and only one package? No more client + listeners + types.

See Upgrade Guide