Closed json2008 closed 1 year ago
@fahchen
自己在 review
时,发现一些不妥的地方,尝试着在进行优化改动。
空时,请帮看一下,是否合适。
相应的lsn
的具体处理,在后续的提交中给出。
在psql
客户端执行:
begin;
insert into my_users (id, name) values (1, 'a1'), (2, 'a2');
insert into my_users (id, name) values (3, 'a3'), (4, 'a4');
commit;
所产生的消息,如下:
{:message,
%PostgrexWal.Messages.Begin{
transaction_id: 16147,
commit_timestamp: ~U[2023-02-09 03:16:50.910480Z],
final_lsn: "0/ED5C580"
}, "0/ED5C2A0", "0/ED5C2A0"}
{:message,
%PostgrexWal.Messages.Relation{
columns: [
%PostgrexWal.Messages.Relation.Column{
type_modifier: 4294967295,
type_oid: 23,
column_name: "id",
flags: [key: true]
},
%PostgrexWal.Messages.Relation.Column{
type_modifier: 4294967295,
type_oid: 1043,
column_name: "name",
flags: [key: true]
}
],
number_of_columns: 2,
flags: [setting: :all_columns],
relation_name: "my_users",
namespace: "public",
relation_oid: 39308,
transaction_id: nil
}, "0/0", "0/0"}
{:message,
%PostgrexWal.Messages.Insert{
tuple_data: [text: "1", text: "a1"],
relation_oid: 39308,
transaction_id: nil
}, "0/ED5C2A0", "0/ED5C2A0"}
{:message,
%PostgrexWal.Messages.Insert{
tuple_data: [text: "2", text: "a2"],
relation_oid: 39308,
transaction_id: nil
}, "0/ED5C488", "0/ED5C488"}
{:message,
%PostgrexWal.Messages.Insert{
tuple_data: [text: "3", text: "a3"],
relation_oid: 39308,
transaction_id: nil
}, "0/ED5C4C8", "0/ED5C4C8"}
{:message,
%PostgrexWal.Messages.Insert{
tuple_data: [text: "4", text: "a4"],
relation_oid: 39308,
transaction_id: nil
}, "0/ED5C508", "0/ED5C508"}
{:message,
%PostgrexWal.Messages.Commit{
commit_timestamp: ~U[2023-02-09 03:16:50.910480Z],
end_lsn: "0/ED5C5B0",
lsn: "0/ED5C580",
flags: []
}, "0/ED5C5B0", "0/ED5C5B0"}
消息格式为:{:message, Message.t(),lsn_a, lsn_b}
lsn_a, lsn_b, 是我补充塞进去,用于调试的。(解码的Message.t()
中并不存在)含义:
lsn_a
: The starting point of the WAL data in this message.
lsn_b
: The current end of WAL on the server.
在所有消息中:
lsn_a === lsn_b
Begin.final_lsn === Commit.lsn
在`Commit`消息中,lsn_a === lsn_b == `Commit.end_lsn`
Commit.end_lsn: 最大
其它消息,如:insert, relation
等,在Message.t()
中,并无lsn
。
对上述的各种lsn
做ack
测试, 仅在对end_lsn
做ack
后,方才有效。(其它的ack仍然导致重复的消息)
pg日志如下:
2023-02-09 11:52:53.073 CST [5361] LOG: unexpected EOF on standby connection
2023-02-09 11:52:53.073 CST [5361] STATEMENT: START_REPLICATION SLOT my_slot LOGICAL 0/0 (proto_version '2', publication_names 'my_pub')
2023-02-09 11:52:58.032 CST [5451] LOG: starting logical decoding for slot "my_slot"
2023-02-09 11:52:58.032 CST [5451] DETAIL: Streaming transactions committing after 0/1, reading WAL from 0/ED60D80.
2023-02-09 11:52:58.032 CST [5451] STATEMENT: START_REPLICATION SLOT my_slot LOGICAL 0/0 (proto_version '2', publication_names 'my_pub')
2023-02-09 11:52:58.032 CST [5451] LOG: logical decoding found consistent point at 0/ED60D80
2023-02-09 11:52:58.032 CST [5451] DETAIL: There are no running transactions.
2023-02-09 11:52:58.032 CST [5451] STATEMENT: START_REPLICATION SLOT my_slot LOGICAL 0/0 (proto_version '2', publication_names 'my_pub')
2023-02-09 11:53:36.666 CST [5451] LOG: unexpected EOF on standby connection
2023-02-09 11:53:36.666 CST [5451] STATEMENT: START_REPLICATION SLOT my_slot LOGICAL 0/0 (proto_version '2', publication_names 'my_pub')
2023-02-09 11:53:41.801 CST [5508] LOG: starting logical decoding for slot "my_slot"
2023-02-09 11:53:41.801 CST [5508] DETAIL: Streaming transactions committing after 0/ED61241, reading WAL from 0/ED61240.
2023-02-09 11:53:41.801 CST [5508] STATEMENT: START_REPLICATION SLOT my_slot LOGICAL 0/0 (proto_version '2', publication_names 'my_pub')
2023-02-09 11:53:41.801 CST [5508] LOG: logical decoding found consistent point at 0/ED61240
2023-02-09 11:53:41.801 CST [5508] DETAIL: There are no running transactions.
2023-02-09 11:53:41.801 CST [5508] STATEMENT: START_REPLICATION SLOT my_slot LOGICAL 0/0 (proto_version '2', publication_names 'my_pub')
文档上,关于lsn
的定义:
The pg_lsn data type can be used to store LSN (Log Sequence Number) data which is a pointer to a location in the WAL. This type is a representation of XLogRecPtr and an internal system type of PostgreSQL.
Internally, an LSN is a 64-bit integer, representing a byte position in the write-ahead log stream. It is printed as two hexadecimal numbers of up to 8 digits each, separated by a slash; for example, 16/B374D848.
The pg_lsn type supports the standard comparison operators, like = and >. Two LSNs can be subtracted using the - operator; the result is the number of bytes separating those write-ahead log locations. Also the number of bytes can be added into and subtracted from LSN using the +(pg_lsn,numeric) and -(pg_lsn,numeric) operators, respectively.
Note that the calculated LSN should be in the range of pg_lsn type, i.e., between 0/0 and FFFFFFFF/FFFFFFFF
但在实际解码,以及代码中,有多种与lsn
有关的概念及命名:
如:transaction_lsn, commit_lsn, message_lsn, starting_point_lsn, lsn, end_lsn, final_lsn
造成如此的原因,是有些命名是直接从pg
源代码以及pgoutput-plugin
中,直接引用变量名或是函数名导致的。
仅4类消息中,存在lsn
官网文档中,对Message.t()
中出现的lsn
作出的解释:(Int64 (XLogRecPtr)
)
1
Begin.final_lsn
: The final LSN of the transaction. ( 等于:Commit.lsn
)
2
Commit.lsn
: The LSN of the commit.
Commit.end_lsn
: The end LSN of the transaction.
3
Message.lsn
: The LSN of the logical decoding message.
4
StreamCommit.lsn
: The LSN of the commit.
StreamCommit.end_lsn
: The end LSN of the transaction.
官网中,对上述几种lsn
的解释非常含糊。不同的代码,对名称的使用并不完全一致。
当前代码中,在broadway-pipeline
的processor
中,自动处理ack
。与之有关的代码片段如下:
def handle_info({:message, message}, %{current_size: s} = state) do
acker =
if is_map_key(message, :end_lsn),
do: {__MODULE__, {:pg_source, state.pg_source}, :ack_data},
else: Broadway.NoopAcknowledger.init()
event = %Broadway.Message{
data: message,
acknowledger: acker
}
%{state | queue: :queue.in(event, state.queue), current_size: s + 1}
|> dispatch_events()
end
@behaviour Broadway.Acknowledger
@impl true
def ack({:pg_source, pg_source}, successful_messages, _failed_messages) do
max_lsn =
for %{data: %{end_lsn: lsn}} <- successful_messages, reduce: 0 do
acc ->
{:ok, lsn} = Postgrex.ReplicationConnection.decode_lsn(lsn)
if lsn > acc, do: lsn, else: acc
end
max_lsn > 0 && PgSource.ack(pg_source, max_lsn)
end
仅Commit, StreamCommit
存在end_lsn
, 所以,似乎应以transcation
为单位,消费到transaction
结束时,方能ack
。
代码已经据此调整。
@fahchen
对LSN
做出了一引起补充解释。见上面的post
。空时,请review
下,看是否合适?
@fahchen
对
LSN
做出了一引起补充解释。见上面的post
。空时,请review
下,看是否合适?
没问题哈 我想到一个问题,如果一个比较大的 transaction 是不是会输出很多 messages,对于 consumer 就得等所有 messages 都来了才能消费,否则得处理回退或者幂等,这两个都有点不好处理
没问题哈 我想到一个问题,如果一个比较大的 transaction 是不是会输出很多 messages,对于 consumer 就得等所有 messages 都来了才能消费,否则得处理回退或者幂等,这两个都有点不好处理
我考虑过这个问题,并进行过测试,一个事务中,有多个insert
消息,我测试把中间的ack
掉?不成功。
后面看看,能否有更好的解决办法。
@doc
is appended to explain the meaning of lsn and application scenarios of several messages, so that later people can read and understand.PgProducer.