Closed chaotianque closed 4 months ago
You can open a ticket to PolarDB. It should provide MySQL compatible binlog.
<dependency>
<groupId>com.zendesk</groupId>
<artifactId>mysql-binlog-connector-java</artifactId>
<version>0.29.1</version>
</dependency>
I use this to subscribe to binlog and the columns that appear are normal.
UpdateRowsEventData: UpdateRowsEventData{tableId=11071, includedColumnsBeforeUpdate={0, 1, 2, 3}, includedColumns={0, 1, 2, 3}, rows=[ {before=[21, 2, 2211, null], after=[21, 2, 22111, null]} ]}
Can you use the first example code (under Replication section) in README to dump every binlog events? So we can see what's happened.
package main
import ( "encoding/json" "flag" "fmt"
"github.com/go-mysql-org/go-mysql/canal"
"github.com/go-mysql-org/go-mysql/mysql"
"github.com/go-mysql-org/go-mysql/replication"
"github.com/siddontang/go-log/log"
)
var ( id string )
func init() { flag.StringVar(&id, "id", "0", "任务id") }
func main() { flag.Parse() run() }
func ObjToJson(param interface{}) []byte { data, _ := json.Marshal(param) return data }
type MyEventHandler struct { canal.DummyEventHandler }
func (s MyEventHandler) OnRotate(header replication.EventHeader, e *replication.RotateEvent) error { return nil }
func (s MyEventHandler) OnDDL(header replication.EventHeader, nextPos mysql.Position, event *replication.QueryEvent) error { fmt.Printf("OnDDL:%s \n", event.Query) return nil }
func (s MyEventHandler) OnXID(header replication.EventHeader, nextPos mysql.Position) error { return nil } func (h MyEventHandler) OnRow(e canal.RowsEvent) error { fmt.Printf("--------e.Table.Name :%v \n", e.Table.Name) fmt.Printf("--------len(e.Rows) :%v \n", e.Rows[0]) return nil }
func (s MyEventHandler) OnTableChanged(header replication.EventHeader, schema string, table string) error { //err := _transferService.updateRule(schema, table) fmt.Println(table) return nil }
func (s MyEventHandler) OnGTID(header replication.EventHeader, gtidEvent mysql.BinlogGTIDEvent) error { return nil }
func (s MyEventHandler) OnPosSynced(header replication.EventHeader, pos mysql.Position, set mysql.GTIDSet, force bool) error { return nil }
func (s MyEventHandler) OnRowsQueryEvent(e replication.RowsQueryEvent) error { return nil }
func (h *MyEventHandler) String() string { return "MyEventHandler" }
func run() { cfg := canal.NewDefaultConfig()
switch id {
case "0":
cfg.Addr = "127.0.0.1:3306"
cfg.User = "root"
cfg.Password = "123456"
cfg.Dump.TableDB = "test"
cfg.Dump.Tables = []string{"test1"}
default:
}
cfg.Dump.ExecutionPath = ""
cfg.Dump.DiscardErr = false
c, err := canal.NewCanal(cfg)
if err != nil {
log.Fatal(err)
}
// Register a handler to handle RowsEvent
c.SetEventHandler(&MyEventHandler{})
masterPos, err := c.GetMasterPos()
if err != nil {
log.Fatal("获取master pos 异常:", err)
}
// Start canal
switch id {
case "0":
c.RunFrom(masterPos)
default:
c.RunFrom(mysql.Position{Name: "mysql-bin.000037", Pos: 4})
}
}
Can you use the first example code (under Replication section) in README to dump every binlog events? So we can see what's happened.
I don't understand what you mean. Can you elaborate?
Can you use the first example code (under Replication section) in README to dump every binlog events? So we can see what's happened.
I don't understand what you mean. Can you elaborate?
OK. Let's change to another way to debug. There's a example program under cmd/go-mysqlbinlog/main.go
, please check its code to know how to configurate it. It will dump every binlog events. Please post the content of troublesome binlog events in this issue.
mysqlbinlog
Can you use the first example code (under Replication section) in README to dump every binlog events? So we can see what's happened.
I don't understand what you mean. Can you elaborate?
OK. Let's change to another way to debug. There's a example program under
cmd/go-mysqlbinlog/main.go
, please check its code to know how to configurate it. It will dump every binlog events. Please post the content of troublesome binlog events in this issue.
=== GTIDEvent === Date: 2024-05-24 10:06:04 Log position: 3062625 Event size: 79 Commit flag: 0 GTID_NEXT: 09b7be66-eb3e-11ee-9d04-08c0eb4423c6:21953936 LAST_COMMITTED: 2249 SEQUENCE_NUMBER: 2250 Immediate commmit timestamp: 1716516364230509 (2024-05-24T10:06:04.230509+08:00) Orignal commmit timestamp: 1716516364230509 (2024-05-24T10:06:04.230509+08:00) Transaction length: 319 Immediate server version: 80018 Orignal server version: 80018
=== QueryEvent === Date: 2024-05-24 10:06:04 Log position: 3062708 Event size: 83 Slave proxy ID: 539960576 Execution time: 0 Error code: 0 Schema: dcs Query: BEGIN
=== TableMapEvent === Date: 2024-05-24 10:06:04 Log position: 3062768 Event size: 60 TableID: 11071 TableID size: 6 Flags: 1 Schema: dcs Table: test1 Column count: 5 Column type: 00000000 08 01 0f 11 0a |.....| NULL bitmap: 00000000 18 |.| Signedness bitmap: 00000000 00 |.| Default charset: [45] Column charset: [] Set str value: [] Enum str value: [] Column name: [] Geometry type: [] Primary key: [] Primary key prefix: [] Enum/set default charset: [] Enum/set column charset: [] Invisible Column bitmap: UnsignedMap: map[int]bool{0:false, 1:false} CollationMap: map[int]uint64{2:0x2d} EnumSetCollationMap: map[int]uint64(nil) EnumStrValueMap: map[int][]string(nil) SetStrValueMap: map[int][]string(nil) GeometryTypeMap: map[int]uint64(nil) VisibilityMap: map[int]bool(nil) Columns: <n/a> type=8 unsigned=no null=no <n/a> type=1 unsigned=no null=no <n/a> type=15 collation=45 null=no <n/a> type=17 null=yes <n/a> type=14 null=yes
0:21
1:12
2:"22111"
3:
=== XIDEvent === Date: 2024-05-24 10:06:04 Log position: 3062865 Event size: 31 XID: 576191625
You can see the binlog content is wrong indeed. Like
=== TableMapEvent ===
Log position: 3062768
...
Column count: 5
You can open a ticket to PolarDB.
You can see the binlog content is wrong indeed. Like
=== TableMapEvent === Log position: 3062768 ... Column count: 5
You can open a ticket to PolarDB.
Then why is it correct for me to use this subscription?
UpdateRowsEventData: UpdateRowsEventData{tableId=11071, includedColumnsBeforeUpdate={0, 1, 2, 3}, includedColumns={0, 1, 2, 3}, rows=[
{before=[21, 2, 2211, null], after=[21, 2, 22111, null]} ]}
You can see the binlog content is wrong indeed. Like
=== TableMapEvent === Log position: 3062768 ... Column count: 5
You can open a ticket to PolarDB.
Then why is it correct for me to use this subscription?
com.zendesk mysql-binlog-connector-java 0.29.1
UpdateRowsEventData: UpdateRowsEventData{tableId=11071, includedColumnsBeforeUpdate={0, 1, 2, 3}, includedColumns={0, 1, 2, 3}, rows=[
{before=[21, 2, 2211, null], after=[21, 2, 22111, null]} ]}
you should ask com.zendesk mysql-binlog-connector-java
CREATE TABLE
test1
(id
bigint(20) NOT NULL COMMENT '主键ID',bill_type
tinyint(2) NOT NULL DEFAULT '1' COMMENT '',collect_no
varchar(32) NOT NULL DEFAULT '' COMMENT '',create_time
timestamp NULL DEFAULT NULL COMMENT '', PRIMARY KEY (id
) USING BTREE, KEYf_idx_bgt
((cast(create_time
as date))) ) ENGINE=InnoDBKEY
f_idx_bgt
((cast(create_time
as date))) The table structure is 4 columns, but this index is also defined as columns and becomes 5 columns.