nickelser / pg_kinesis

Replicate Postgres commits using logical replication to AWS Kinesis.
30 stars 6 forks source link

Race condition when record is > 1mb and error is thrown #6

Open diranged opened 6 years ago

diranged commented 6 years ago

First off, this tool is great .. and I really want to see it succeed... so with that said, we're testing it out and finding that after it runs for a while, it deadlocks in our environment. This is the second time we've seen this in 24 hrs.

April 27th 2018, 09:58:28.879 | fadc017/ecs-postgres-kinesis-bridge-13-postgres-kinesis-bridge-f6afa2f3af8fa2fd2e00/3b0204efc55b[7951]: 2018-04-27T16:58:28Z inserts=0 (0.0/s) updates=1741 (174.1/s) deletes=0 (0.0/s) skipped=0 (0.0/s) putrecords=1736 (173.6/s, 6ms/record, 10.0s total) backlog=10000 lsn=3C12/E2436F98 lag=7.0 GB
-- | --
  | April 27th 2018, 09:58:38.879 | fadc017/ecs-postgres-kinesis-bridge-13-postgres-kinesis-bridge-f6afa2f3af8fa2fd2e00/3b0204efc55b[7951]: 2018-04-27T16:58:38Z inserts=0 (0.0/s) updates=1769 (176.9/s) deletes=0 (0.0/s) skipped=0 (0.0/s) putrecords=1762 (176.2/s, 6ms/record, 10.0s total) backlog=10000 lsn=3C12/E2436F98 lag=7.0 GB
  | April 27th 2018, 09:58:45.302 | fadc017/ecs-postgres-kinesis-bridge-13-postgres-kinesis-bridge-f6afa2f3af8fa2fd2e00/3b0204efc55b[7951]: /go/src/github.com/nickelser/pg_kinesis/pg_kinesis.go : 138 - 2 records failed during Kinesis PutRecords; retrying in 100ms
  | April 27th 2018, 09:58:45.402 | fadc017/ecs-postgres-kinesis-bridge-13-postgres-kinesis-bridge-f6afa2f3af8fa2fd2e00/3b0204efc55b[7951]: /go/src/github.com/nickelser/pg_kinesis/pg_kinesis.go : 138 - 1 records failed during Kinesis PutRecords; retrying in 100ms
  | April 27th 2018, 09:58:45.405 | fadc017/ecs-postgres-kinesis-bridge-13-postgres-kinesis-bridge-f6afa2f3af8fa2fd2e00/3b0204efc55b[7951]: /go/src/github.com/nickelser/pg_kinesis/pg_kinesis.go : 138 - 2 records failed during Kinesis PutRecords; retrying in 199.067526ms
  | April 27th 2018, 09:58:48.879 | fadc017/ecs-postgres-kinesis-bridge-13-postgres-kinesis-bridge-f6afa2f3af8fa2fd2e00/3b0204efc55b[7951]: 2018-04-27T16:58:48Z inserts=0 (0.0/s) updates=1758 (175.8/s) deletes=0 (0.0/s) skipped=0 (0.0/s) putrecords=1759 (175.9/s, 6ms/record, 10.0s total) backlog=10000 lsn=3C12/E2436F98 lag=7.0 GB
  | April 27th 2018, 09:58:58.879 | fadc017/ecs-postgres-kinesis-bridge-13-postgres-kinesis-bridge-f6afa2f3af8fa2fd2e00/3b0204efc55b[7951]: 2018-04-27T16:58:58Z inserts=0 (0.0/s) updates=1771 (177.1/s) deletes=0 (0.0/s) skipped=0 (0.0/s) putrecords=1770 (177.0/s, 6ms/record, 10.0s total) backlog=10000 lsn=3C12/E2436F98 lag=7.0 GB
  | April 27th 2018, 09:59:08.879 | fadc017/ecs-postgres-kinesis-bridge-13-postgres-kinesis-bridge-f6afa2f3af8fa2fd2e00/3b0204efc55b[7951]: 2018-04-27T16:59:08Z inserts=0 (0.0/s) updates=1759 (175.9/s) deletes=0 (0.0/s) skipped=0 (0.0/s) putrecords=1762 (176.2/s, 6ms/record, 10.0s total) backlog=9997 lsn=3C12/E2436F98 lag=7.0 GB
  | April 27th 2018, 09:59:18.879 | fadc017/ecs-postgres-kinesis-bridge-13-postgres-kinesis-bridge-f6afa2f3af8fa2fd2e00/3b0204efc55b[7951]: 2018-04-27T16:59:18Z inserts=0 (0.0/s) updates=1784 (178.4/s) deletes=0 (0.0/s) skipped=0 (0.0/s) putrecords=1782 (178.2/s, 6ms/record, 10.0s total) backlog=10000 lsn=3C12/E2436F98 lag=7.0 GB
  | April 27th 2018, 09:59:28.879 | fadc017/ecs-postgres-kinesis-bridge-13-postgres-kinesis-bridge-f6afa2f3af8fa2fd2e00/3b0204efc55b[7951]: 2018-04-27T16:59:28Z inserts=5 (0.5/s) updates=1718 (171.8/s) deletes=0 (0.0/s) skipped=0 (0.0/s) putrecords=1728 (172.8/s, 6ms/record, 10.1s total) backlog=10000 lsn=3C12/E2436F98 lag=7.0 GB
  | April 27th 2018, 09:59:38.880 | fadc017/ecs-postgres-kinesis-bridge-13-postgres-kinesis-bridge-f6afa2f3af8fa2fd2e00/3b0204efc55b[7951]: 2018-04-27T16:59:38Z inserts=0 (0.0/s) updates=1738 (173.8/s) deletes=0 (0.0/s) skipped=0 (0.0/s) putrecords=1739 (173.9/s, 6ms/record, 10.0s total) backlog=10000 lsn=3C12/E2436F98 lag=7.0 GB
  | April 27th 2018, 09:59:48.880 | fadc017/ecs-postgres-kinesis-bridge-13-postgres-kinesis-bridge-f6afa2f3af8fa2fd2e00/3b0204efc55b[7951]: 2018-04-27T16:59:48Z inserts=1 (0.1/s) updates=1678 (167.8/s) deletes=0 (0.0/s) skipped=0 (0.0/s) putrecords=1684 (168.4/s, 6ms/record, 10.3s total) backlog=10000 lsn=3C12/E2436F98 lag=7.0 GB
  | April 27th 2018, 09:59:58.880 | fadc017/ecs-postgres-kinesis-bridge-13-postgres-kinesis-bridge-f6afa2f3af8fa2fd2e00/3b0204efc55b[7951]: 2018-04-27T16:59:58Z inserts=0 (0.0/s) updates=1738 (173.8/s) deletes=0 (0.0/s) skipped=0 (0.0/s) putrecords=1734 (173.4/s, 6ms/record, 10.0s total) backlog=10000 lsn=3C12/E2436F98 lag=7.0 GB
  | April 27th 2018, 10:00:08.880 | fadc017/ecs-postgres-kinesis-bridge-13-postgres-kinesis-bridge-f6afa2f3af8fa2fd2e00/3b0204efc55b[7951]: 2018-04-27T17:00:08Z inserts=0 (0.0/s) updates=1659 (165.9/s) deletes=0 (0.0/s) skipped=0 (0.0/s) putrecords=1658 (165.8/s, 6ms/record, 10.0s total) backlog=10000 lsn=3C12/E2436F98 lag=7.0 GB
  | April 27th 2018, 10:00:18.880 | fadc017/ecs-postgres-kinesis-bridge-13-postgres-kinesis-bridge-f6afa2f3af8fa2fd2e00/3b0204efc55b[7951]: 2018-04-27T17:00:18Z inserts=1 (0.1/s) updates=877 (87.7/s) deletes=0 (0.0/s) skipped=0 (0.0/s) putrecords=1683 (168.3/s, 6ms/record, 10.0s total) backlog=8382 lsn=3C12/E2436F98 lag=7.0 GB
  | April 27th 2018, 10:00:28.880 | fadc017/ecs-postgres-kinesis-bridge-13-postgres-kinesis-bridge-f6afa2f3af8fa2fd2e00/3b0204efc55b[7951]: 2018-04-27T17:00:28Z inserts=0 (0.0/s) updates=0 (0.0/s) deletes=0 (0.0/s) skipped=0 (0.0/s) putrecords=1776 (177.6/s, 6ms/record, 10.0s total) backlog=4827 lsn=3C12/E2436F98 lag=7.0 GB
  | April 27th 2018, 10:00:38.880 | fadc017/ecs-postgres-kinesis-bridge-13-postgres-kinesis-bridge-f6afa2f3af8fa2fd2e00/3b0204efc55b[7951]: 2018-04-27T17:00:38Z inserts=0 (0.0/s) updates=0 (0.0/s) deletes=0 (0.0/s) skipped=0 (0.0/s) putrecords=1800 (180.0/s, 6ms/record, 10.0s total) backlog=1227 lsn=3C12/E2436F98 lag=7.0 GB
  | April 27th 2018, 10:00:48.880 | fadc017/ecs-postgres-kinesis-bridge-13-postgres-kinesis-bridge-f6afa2f3af8fa2fd2e00/3b0204efc55b[7951]: 2018-04-27T17:00:48Z inserts=0 (0.0/s) updates=0 (0.0/s) deletes=0 (0.0/s) skipped=0 (0.0/s) putrecords=616 (61.6/s, 6ms/record, 3.7s total) backlog=0 lsn=3C12/E2436F98 lag=7.0 GB
  | April 27th 2018, 10:00:58.880 | fadc017/ecs-postgres-kinesis-bridge-13-postgres-kinesis-bridge-f6afa2f3af8fa2fd2e00/3b0204efc55b[7951]: 2018-04-27T17:00:58Z inserts=0 (0.0/s) updates=0 (0.0/s) deletes=0 (0.0/s) skipped=0 (0.0/s) putrecords=0 (0.0/s, 0ms/record, 0.0s total) backlog=0 lsn=3C12/E2436F98 lag=7.1 GB
  | April 27th 2018, 10:01:08.881 | fadc017/ecs-postgres-kinesis-bridge-13-postgres-kinesis-bridge-f6afa2f3af8fa2fd2e00/3b0204efc55b[7951]: 2018-04-27T17:01:08Z inserts=0 (0.0/s) updates=0 (0.0/s) deletes=0 (0.0/s) skipped=0 (0.0/s) putrecords=0 (0.0/s, 0ms/record, 0.0s total) backlog=0 lsn=3C12/E2436F98 lag=7.1 GB
  | April 27th 2018, 10:01:18.881 | fadc017/ecs-postgres-kinesis-bridge-13-postgres-kinesis-bridge-f6afa2f3af8fa2fd2e00/3b0204efc55b[7951]: 2018-04-27T17:01:18Z inserts=0 (0.0/s) updates=0 (0.0/s) deletes=0 (0.0/s) skipped=0 (0.0/s) putrecords=0 (0.0/s, 0ms/record, 0.0s total) backlog=0 lsn=3C12/E2436F98 lag=7.1 GB
  | April 27th 2018, 10:01:28.881 | fadc017/ecs-postgres-kinesis-bridge-13-postgres-kinesis-bridge-f6afa2f3af8fa2fd2e00/3b0204efc55b[7951]: 2018-04-27T17:01:28Z inserts=0 (0.0/s) updates=0 (0.0/s) deletes=0 (0.0/s) skipped=0 (0.0/s) putrecords=0 (0.0/s, 0ms/record, 0.0s total) backlog=0 lsn=3C12/E2436F98 lag=7.1 GB

At this point, its been locked like this for ~10 hours .. so its clearly dead. It keeps outputting the status line, but nothing else. Any suggestions on how to troubleshoot?

diranged commented 6 years ago

I've found where the bug lies.. and have a partial fix. The bug is that if handleReplicationMsg() returns an error, it immediately halts the replicationLoop() goroutine. When that is halted, and because the replicationMessages channel has size: 0, the main connectReplicateLoop() routine halts when trying to insert the next message into the channel.

The failure process looks like this ...

diff --git a/pg_kinesis.go b/pg_kinesis.go
index ebbfdf1..f982340 100644
--- a/pg_kinesis.go
+++ b/pg_kinesis.go
@@ -458,6 +458,8 @@ func replicationLoop(replicationMessages chan *pgx.ReplicationMessage, replicati

                        if err != nil {
                                replicationFinished <- err // already wrapped
+                               logf("error received, dumped into replicationFinished: %s", err)
+                               logf("replicationFinished size: %d", len(replicationFinished))
                                return
                        }
                }
@@ -619,6 +621,7 @@ func fetchLag(slot *string, conn *pgx.Conn) error {

 func pkeyLagLoop(slot *string, nonReplConn *pgx.Conn, replicationFinished chan error) {
        for !done.IsSet() {
+               logf("channel length: %d", len(replicationFinished))
                err := fetchPKs(nonReplConn)
                if err != nil {
                        replicationFinished <- err
@@ -694,28 +697,34 @@ func connectReplicateLoop(slot *string, sourceConfig pgx.ConnConfig, stream *str
                        return errors.Wrap(err, "waiting for replication message failed")
                }

-               // check if the replicating goroutine died
-               select {
-               case replErr = <-replicationFinished:
-               default:
-               }
-
-               if replErr != nil {
-                       logf("stopping replication due to replication goroutine failure")
-                       return replErr // already wrapped
-               }

                if message != nil {
                        if message.WalMessage != nil {
                                // this is not exactly the server time
                                // but we are taking over this field as PG does not send it down
                                message.WalMessage.ServerTime = uint64(now.UnixNano())
-                               replicationMessages <- message
+
+                               // check if the replicating goroutine died
+                               select {
+                               case replicationMessages <- message:
+                               case replErr = <-replicationFinished:
+                               }
+
+                               logf("done")
                        } else if message.ServerHeartbeat != nil {
                                keepaliveRequested = message.ServerHeartbeat.ReplyRequested == 1
                        }
                }

+
+               if replErr != nil {
+                       logf("stopping replication due to replication goroutine failure")
+                       return replErr // already wrapped
+               }
+
                err = sendKeepalive(conn, keepaliveRequested)
                if err != nil {
                        return errors.Wrap(err, "unable to send keepalive")

The problem is that even with this patch, things aren't great. As soon as the connectReplicateLoop() dies off, the main() loop sleeps, then restarts (and there is a debate on whether or not that should happen). The problem is that when it restarts, it starts the conenctReplicateLoop() creates all new output channels, including new ones to send more data to Kinesis. At this point, you end up with two different pkeyLagLoop() routines running!

At this point, everything is hosed.. there are multiple functions running trying to write to common global variables (pks.Store... and the stats struct). You also, for some reason that I have not yet figured out, can only run through the connectReplicateLoop() once before it deadlocks when trying to add a message to its new replicationMessages channel.

It seems like at this point, the app just needs to die?

diranged commented 6 years ago

Just a followup, if you set done.SetTo(true) on failure, then at least the code exits cleanly:

diff --git a/pg_kinesis.go b/pg_kinesis.go
index ebbfdf1..4c8d32e 100644
--- a/pg_kinesis.go
+++ b/pg_kinesis.go
@@ -694,28 +694,31 @@ func connectReplicateLoop(slot *string, sourceConfig pgx.ConnConfig, stream *str
                        return errors.Wrap(err, "waiting for replication message failed")
                }

-               // check if the replicating goroutine died
-               select {
-               case replErr = <-replicationFinished:
-               default:
-               }
-
-               if replErr != nil {
-                       logf("stopping replication due to replication goroutine failure")
-                       return replErr // already wrapped
-               }

                if message != nil {
                        if message.WalMessage != nil {
                                // this is not exactly the server time
                                // but we are taking over this field as PG does not send it down
                                message.WalMessage.ServerTime = uint64(now.UnixNano())
-                               replicationMessages <- message
+
+                               // check if the replicating goroutine died
+                               select {
+                               case replicationMessages <- message:
+                               case replErr = <-replicationFinished:
+                               }
+
                        } else if message.ServerHeartbeat != nil {
                                keepaliveRequested = message.ServerHeartbeat.ReplyRequested == 1
                        }
                }

+
+               if replErr != nil {
+                       logf("stopping replication due to replication goroutine failure while processing: %s", message)
+                       done.SetTo(true)
+                       return replErr // already wrapped
+               }
+
                err = sendKeepalive(conn, keepaliveRequested)
                if err != nil {
                        return errors.Wrap(err, "unable to send keepalive")
2018-05-02T20:36:33Z reading target DB configuration from shell environment
/root/pg_kinesis/src/github.com/diranged/pg_kinesis/pg_kinesis.go : 859 - unable to create slot pg_kinesis: ERROR: replication slot "pg_kinesis" already exists (SQLSTATE 42710)
2018-05-02T20:36:33Z replication slot pg_kinesis already exists, continuing
2018-05-02T20:36:33Z replication starting from LSN 0/0
2018-05-02T20:36:34Z stopping replication due to replication goroutine failure while processing: &{Wal: 3C13/EC317180 Time: 1816-08-21 04:48:58.988617872 +0000 UTC Lag: 0 <nil>}
/root/pg_kinesis/src/github.com/diranged/pg_kinesis/pg_kinesis.go : 887 - replication messages must be less than 1MB in size
taybin commented 5 years ago

Any updates on this?

diranged commented 5 years ago

@taybin, check out https://github.com/Nextdoor/pg-bifrost.. we ended up writing our own tool for this, and we’re running this as our primary Postgres streaming method now in production at scale.

taybin commented 5 years ago

@diranged Oh cool. I mostly wanted to modify this to add support for writing to RabbitMQ, but I'll take a look at pg-bifrost for that as well. It looks pretty well modularized, so maybe I can get a clean PR of that together.

diranged commented 5 years ago

Open an issue on our project and we can talk through it. We have designed it to be pluggable (we’re considering a direct-to-S3 transport).