npgsql / npgsql

Npgsql is the .NET data provider for PostgreSQL.
http://www.npgsql.org
PostgreSQL License
3.34k stars 823 forks source link

LogicalReplicationConnection PgOutputReplicationMessage does not contain the Data property #5871

Open rodrigonunes100 opened 1 month ago

rodrigonunes100 commented 1 month ago

Is there any reason why LogicalReplicationConnection PgOutputReplicationMessage does not contain the Data property ? In LogicalReplicationConnection TestDecodingData contains the Data property, and in PhysicalReplicationConnection XLogDataMessage Also contain the Data property.

using Npgsql.Replication;
using Npgsql.Replication.Internal;
using Npgsql.Replication.PgOutput;
using Npgsql.Replication.TestDecoding;

await using var conn = new LogicalReplicationConnection("<connection_string>");
await conn.Open();

var pg = new PgOutputReplicationOptions("blog_pub", 
    PgOutputProtocolVersion.V1,true,PgOutputStreamingMode.On,true);
var slot = new PgOutputReplicationSlot("blog_slot");
var cancellationTokenSource = new CancellationTokenSource();
await foreach (var message in conn.StartReplication(slot, pg, cancellationTokenSource.Token))
{
    // error
    var stream = message.Data;
    Console.WriteLine($"Received message type: {message.GetType().Name}");
    conn.SetReplicationStatus(message.WalEnd);
}

in a future version we will have the message.Data stream in LogicalReplicationConnection PgOutputReplicationMessage ? In PhysicalReplicationConnection XLogDataMessage can I do the replication in wal file style. In LogicalReplicationConnection PgOutputReplicationMessage I don't know if with the available data we can do a wal file style replication. A good idea for postgresql would be to accept this stream data back into primary. Ordered Stream.

roji commented 1 month ago

Please fix the code formatting and apply syntax highlighting to your comment, it's impossible to read your code.

rodrigonunes100 commented 1 month ago

Edit original

roji commented 1 month ago

@rodrigonunes100 please learn about code fencing in markdown, I edited your comment yo make it more readable.

/cc @Brar

Brar commented 1 month ago

You may want to have a look at the classes in the Npgsql.Replication.PgOutput.Messages namespace. They derive from PgOutputReplicationMessage and contain the "data" you're probably looking for in a processable way.

To expand the code sample you've provided:

using Npgsql.Replication;
using Npgsql.Replication.PgOutput;
using Npgsql.Replication.PgOutput.Messages;

await using var conn = new LogicalReplicationConnection("<connection_string>");
await conn.Open();

var pg = new PgOutputReplicationOptions("blog_pub", 
    protocolVersion: PgOutputProtocolVersion.V1,
    binary: true,
    streamingMode: PgOutputStreamingMode.On,
    messages: true);
var slot = new PgOutputReplicationSlot("blog_slot");
var cancellationTokenSource = new CancellationTokenSource();
await foreach (var message in conn.StartReplication(slot, pg, cancellationTokenSource.Token))
{
    Console.WriteLine($"Received message type: {message.GetType().Name}");
    switch (message)
    {
        case InsertMessage insertMessage:
        {
            await foreach(var field in insertMessage.NewRow)
                switch (field.GetPostgresType().Name)
                {
                    case "text":
                    {
                        var value = field.Get<string>();
                        Console.WriteLine($"Insert message contains text value: {value}");
                        break;
                    }
                }
            break;
        }
    }
    conn.SetReplicationStatus(message.WalEnd);
}
NinoFloris commented 2 weeks ago

@rodrigonunes100 were you able to get things working?