Closed altmannmarcelo closed 1 year ago
@blackbeam I have a GDocs document that helps to understand the new binlog event format, such as manually decoding it - https://docs.google.com/document/d/1Ypnsua2W-MQfGoogPXw2u_5y3QW3qdQeXkzTQyKtEPM/edit?usp=sharing
I wrote a small test program to validate the new format:
use mysql_async::prelude::Queryable;
use mysql_common as mysql;
use mysql_common::binlog::consts::EventType;
use mysql_common::binlog::{self, events};
#[tokio::main]
async fn main() -> Result<(), mysql_async::Error> {
let mut reader = binlog::EventStreamReader::new(binlog::consts::BinlogVersion::Version4);
// Payload event has checksum disabled. Use different reader to read it.
let mut reader_no_checksum =
binlog::EventStreamReader::new(binlog::consts::BinlogVersion::Version4);
let database_url = mysql_async::Opts::from_url("mysql://root:@localhost:3310/test")?;
let pool = mysql_async::Pool::new(database_url);
let mut conn = pool.get_conn().await?;
conn.query_drop("CREATE TABLE IF NOT EXISTS tb1(ID INT); FLUSH BINARY LOGS;")
.await?;
let query = "SHOW MASTER STATUS";
let pos: mysql::Row = conn
.query_first(query)
.await?
.ok_or_else(|| mysql_async::Error::Driver(mysql_async::DriverError::ConnectionClosed))?;
let file: String = pos.get(0).expect("Binlog file");
let offset: u64 = pos.get(1).expect("Binlog offset");
conn.query_drop("SET binlog_transaction_compression=ON; INSERT INTO tb1 VALUES (1)")
.await?;
let cmd = mysql_common::packets::ComRegisterSlave::new(2);
conn.query_drop("SET @master_binlog_checksum='CRC32'")
.await?;
conn.write_command(&cmd).await?;
conn.read_packet().await?;
let cmd = mysql_common::packets::ComBinlogDump::new(2)
.with_filename(file.as_bytes())
.with_pos(offset.try_into().expect("Impossible pos"));
conn.write_command(&cmd).await?;
conn.read_packet().await?;
let mut keep = true;
while keep {
let packet = conn.read_packet().await?;
assert_eq!(packet.first(), Some(&0));
let binlog_event = reader.read(&packet[1..])?;
match binlog_event.header().event_type().map_err(|ev| {
println!("Unknown event type: {:?}", ev);
mysql_async::Error::Driver(mysql_async::DriverError::ConnectionClosed)
})? {
EventType::TRANSACTION_PAYLOAD_EVENT => {
let ev: events::TransactionPayloadEvent = binlog_event.read_event()?;
println!("Received TRANSACTION_PAYLOAD_EVENT - Compressed Size: {}, Uncompressed Size: {}", ev.payload_size(), ev.uncompressed_size());
let buff = ev.payload_raw_decompressed();
let mut read_pos = 0;
while read_pos < buff.len() {
let binlog_ev = reader_no_checksum.read(&buff[read_pos..])?;
read_pos += binlog_ev.header().event_size() as usize;
println!(
"Received decompressed event: {:?}",
binlog_ev.header().event_type()
);
}
keep = false;
}
ev => {
println!("Received other event type: {:?}", ev);
}
}
}
// Dropped connection will go to the pool
drop(conn);
// The Pool must be disconnected explicitly because
// it's an asynchronous operation.
pool.disconnect().await?;
// the async fn returns Result, so
Ok(())
}
Example output:
$ ./target/debug/binlog
Received other event type: FORMAT_DESCRIPTION_EVENT
Received other event type: ANONYMOUS_GTID_EVENT
Received TRANSACTION_PAYLOAD_EVENT - Compressed Size: 129, Uncompressed Size: 179
Received decompressed event: Ok(QUERY_EVENT)
Received decompressed event: Ok(TABLE_MAP_EVENT)
Received decompressed event: Ok(WRITE_ROWS_EVENT)
Received decompressed event: Ok(XID_EVENT)
@blackbeam
Having a CompressionType::NONE
might be a bit tricky. From what I remember, ZSTD will write uncompressed frames if it does not see enough range redundancies and write the frame uncompressed.
I have added a binlog with compressed events and extended the test to validate it can decompress the Payload Event and read proper events in the decompressed buffer.
This patch adds support for Binlog Transaction Compression, which has been available since MySQL 8.0.20. It is disabled by default, but it is widely used in production.
The patch adds a new event type, TransactionPayloadEvent and its necessary methods to parse binlog event TRANSACTION_PAYLOAD_EVENT(0x28).
The patch also adds a method to decompress the payload if it has been compressed with ZSTD.
More information about the feature can be found here:
https://dev.mysql.com/worklog/task/?id=3549 https://github.com/mysql/mysql-server/commit/1e5ae348c6f760b16a3972d84956358606a30041