actiontech / dtle

Distributed Data Transfer Service for MySQL
https://actiontech.github.io/dtle-docs-cn
Mozilla Public License 2.0
551 stars 132 forks source link

kafka incr replication job: thread is always null #532

Open asiroliu opened 4 years ago

asiroliu commented 4 years ago

Description

kafka incr replication job: thread is always null

Steps to reproduce the issue

  1. create db and table
    
    create database action_db_1;
    use action_db_1

create table aggregation_source(id int unsigned AUTO_INCREMENT,name VARCHAR(100) NOT NULL, PRIMARY KEY(id))ENGINE=InnoDB DEFAULT CHARSET=utf8;

2. create kafka job

{ "Name": "kafka_aggregation_source_one", "Datacenters": [ "dc1" ], "Tasks": [ { "Type": "Src", "Driver": "MySQL", "NodeId": "da1c766d-b945-9ad4-a434-458d637f4420", "Config": { "ConnectionConfig": { "Host": "172.100.9.2", "Port": 3306, "User": "test_src", "Password": "test_src" }, "ReplicateDoDb": [ { "TableSchema": "action_db_1", "Tables": [ { "TableName": "aggregation_source" } ] } ] } }, { "Type": "Dest", "Driver": "Kafka", "NodeId": "8db2080b-2753-9ce6-a264-45d3383ef35a", "Config": { "Topic": "dtle", "Brokers": [ "172.100.9.21:9092" ], "Converter": "json" } } ] }

3. insert some data

use action_db_1 insert into aggregation_source (name) values("aggregation1"); insert into aggregation_source (name) values("aggregation2"); insert into aggregation_source (name) values("aggregation3"); insert into aggregation_source (name) values("aggregation4"); insert into aggregation_source (name) values("aggregation5");

4. get message from kafka

/kafka/bin/kafka-console-consumer.sh --bootstrap-server 172.100.9.21:9092 --from-beginning --property print.key=true --topic dtle.action_db_1.aggregation_source


## Describe the results you received

{ "payload": { "after": { "id": 11, "name": "aggregation1" }, "before": null, "op": "c", "source": { "db": "action_db_1", "file": "mysql-bin.000002", "gtid": "189ee19a-0aa3-11eb-b235-0242ac110007:55", "name": "dtle", "pos": 15787, "query": null, "row": 0, "server_id": 1, "snapshot": false, "table": "aggregation_source", "thread": null, "ts_sec": 1602749348, "version": "0.0.1" }, "ts_ms": 1602749348200 }, ... }


## Describe the results you expected
the payload "thread" is not null.

thread : ID of the MySQL thread that created the event (non-snapshot only)


## Output of `./dtle version`:**

NOMAD_VERSION=<0.11.1> DTLE_VERSION=<9.9.9.9-master-077b064>



## Additional information

(e.g. issue happens only occasionally)

## Additional details (log, config, job config etc):
<!--
1. default file path 
   - config: /etc/dtle/dtle.conf
   - log: /var/log/dtle.log
   - job: your local file, or get from dtle
     - curl -XGET "http://127.0.0.1:8190/v1/job/<job_uuid>"
4. mask sensitive infomation (e.g. IP, username and password)"
5. paste short files or the key part
6. provide large files as attachments (compress if necessary)
-->
asiroliu commented 2 years ago

从binlog中可以获取thread信息

image