alibaba / MongoShake

MongoShake is a universal data replication platform based on MongoDB's oplog. Redundant replication and active-active replication are two most important functions. 基于mongodb oplog的集群复制工具,可以满足迁移和同步的需求,进一步实现灾备和多活功能。
GNU General Public License v3.0
1.72k stars 441 forks source link

跨国同步经常跟不上 #264

Closed admh closed 4 years ago

admh commented 4 years ago

问题: 从阿里云印度公网同步到阿里云深圳,经常出现同步跟不上的问题。如下图。

环境: 印度阿里云 mongodb 版本 4.0(云 mongodb,2 核 4G),mongoshake ecs主机(运行在印度)配置是 2 核 16G,CentOS7.6 系统。深圳阿里云 mongodb 为 docker 容器自建 3.6 版本,ecs主机配置是 16 核 48G(这个负载一直正常)。

深圳 mongodb 配置文件:

storage:
  dbPath: /data/db
  journal:
    enabled: true
  engine: wiredTiger
  wiredTiger:
    collectionConfig:
      blockCompressor: snappy
    engineConfig:
      cacheSizeGB: 28

systemLog:
  destination: file
  logAppend: true
  path: /data/db/mongod.log

net:
  port: 27017
  bindIp: 0.0.0.0

processManagement:
  timeZoneInfo: /usr/share/zoneinfo

replication:
  oplogSizeMB: 8000
  replSetName: xedset

mongoshake 版本:1.4.5

印度mongoshake配置文件:

# connect source mongodb, set username and password if enable authority.
# split by comma(,) if use multiple instance in one replica-set.
# split by semicolon(;) if sharding enable.
#mongo_urls = mongodb://username:password@127.0.0.1:20040,127.0.0.1:20041
mongo_urls = mongodb://1.1.1.1:3717,1.1.1.2/admin?replicaSet=test&readPreference=secondary

# collector name
collector.id = mongoshake

# save checkpoint interval if necessary. 
# the checkpoint will be checked and stored after starting 3 minutes.
checkpoint.interval = 5000

# http api interface. Users can use this api to monitor mongoshake.
# We also provide a restful tool named "mongoshake-stat" to 
# print ack, lsn, checkpoint and qps information based on this api.
# usage: `./mongoshake-stat --port=9100`
http_profile = 9102
# profiling on net/http/profile
system_profile = 9202

# global log level. lower level message
# will be filter
log_level = info
# log file name. use logs/ prefix folder path
log_file = collector.log
# log buffer or unbuffer. If set true, logs may not be print when exit. If
# set false, performance will be decreased extremely
log_buffer = true

# filter db or collection namespace. at most one of these two parameters can be given.
# if the filter.namespace.black is not empty, the given namespace will be
# filtered while others namespace passed. 
# if the filter.namespace.white is not empty, the given namespace will be
# passed while others filtered. 
# all the namespace will be passed if no condition given.
# db and collection connected by the dot(.).
# different namespaces are splitted by the semicolon(;).
filter.namespace.black = loan.log;log
#filter.namespace.white =

# this parameter is not supported in current opensouce version.
# oplog namespace and global id. others oplog in 
# mongo cluster that has distinct global id will 
# be discard. Query without gid (means getting all
# oplog out) if no oplog.gid set
oplog.gids =   

# [auto]        decide by if there has uniq index in collections. 
#               use `collection` with uniqu index set otherwise `id`
# [id]          shard by ObjectId. handle oplogs in sequence by unique _id 
# [collection]  shard by ns. handle oplogs in sequence by unique ns 
shard_key = auto

# syncer send time interval, unit is second.
# time interval of flushing the syncer reader buffer.
syncer.reader.buffer_time = 3

# oplog transmit worker concurrent
worker = 16
# memory queue configuration, plz visit FAQ document to see more details.
# do not modify these variables if the performance and resource usage can
# meet your needs.
#worker.batch_queue_size = 64
worker.batch_queue_size = 128
#adaptive.batching_max_size = 16384
#adaptive.batching_max_size = 16384
adaptive.batching_max_size = 32768
#fetcher.buffer_capacity = 256
fetcher.buffer_capacity = 256

# batched oplogs have block level checksum value using 
# crc32 algorithm. and compressor for compressing content
# of oplog entry. 
# supported compressor are : gzip,zlib,deflate
# Do not enable this option when tunnel type is "direct"
worker.oplog_compressor = none

# tunnel pipeline type. now we support rpc,file,kafka,mock,direct
tunnel = direct
# tunnel target resource url
# for rpc. this is remote receiver socket address
# for tcp. this is remote receiver socket address
# for file. this is the file path, for instance "data"
# for kafka. this is the topic and brokers address which split by comma, for
# instance: topic@brokers1,brokers2, default topic is "mongoshake"
# for mock. this is uesless
# for direct. this is target mongodb address
tunnel.address = mongodb://2.2.2.2:27017

# collector context storage mainly including store checkpoint
# type include : database, api
# for api storage, address is http url
# for database storage, address is collection name while db name is "mongoshake" by default.
# if source database is replica, context will store in the source mongodb.
# if source database is sharding, context must be given to point the config server address.
# context.storage.url is only used in sharding. When source mongodb type is replicaSet, 
# checkpoint will write into source mongodb as default.
context.storage = database
#context.storage.url = mongodb://127.0.0.1:20070
context.address = ckpt_default_shenzhen_12_07
# pay attention: this is UTC time which is 8 hours latter than CST time. this 
# variable will only be used when checkpoint is not exist.
context.start_position = 2019-12-06T15:00:01Z

# high availabity option.
# enable master election if set true. only one mongoshake can become master
# and do sync, the others will wait and at most one of them become master once 
# previous master die. The master information stores in the `mongoshake` db in the source 
# database by default.
# This option is useless when there is only one mongoshake running.
master_quorum = false

# ----------------------splitter----------------------
# if tunnel type is direct, all the below variable should be set

# only transfer oplog commands for syncing. represent
# by oplog.op are "i","d","u". also include applyOps
replayer.dml_only = true

# executors in single worker
replayer.executor = 16

# oplog changes to Insert while Update found non-exist (_id or unique-index)
replayer.executor.upsert = true
# oplog changes to Update while Insert found duplicated key (_id or unique-index)
replayer.executor.insert_on_dup_update = true
# db. write duplicated logs to mongoshake_conflict
# sdk. write duplicated logs to sdk.
replayer.conflict_write_to = none

# replayer duration mode. drop oplogs and take
# no any action(only for debugging enviroment) if 
# set to false. otherwise write to ${mongo_url} instance
replayer.durable = true

同步跟不上的截图: image

vinllen commented 4 years ago

同步原因不是跟不上,而是本身已经报错了,用户没有关注。

{ts 6767355749539512321} {op u} {g } {ns cash_loan.marketUser} {o [{$set [{employmentName BL PHOTOSTSTE} {monthlySalary 15000} {occupation other} {officeAddress shop no 09 opp govt see. sec. school} {officeAddressDetail [{state Haryana} {city Sirsa} {detail shop no 09 opp govt see. sec. school}]} {officePINCode 125055} {updatedAt 2019-12-06 15:56:35.081 +0000 UTC} {workingSince 2016-08-12 15:54:32.408 +0000 UTC}]} {$set [{employmentName BL PHOTOSTSTE} {monthlySalary 15000} {occupation other} {officeAddress shop no 09 opp govt see. sec. school} {officeAddressDetail [{state B} {city A} {detail xxxx}]} {officePINCode 125015} {updatedAt 2019-12-06 15:56:35.081 +0000 UTC} {workingSince 2016-08-12 15:54:32.408 +0000 UTC}]}]} {o2 map[_id:ObjectIdHex("5dea78e5fc4a0200112f1f5a")]} {uk map[]} {lsid map[id:{4 [196 17 34 254 189 104 70 75 188 245 207 58 100 243 119 101]} uid:[xxxx]]} {fromMigrate false}

releate to #258 请给一下原始的日志信息,看起来是有2个$set导致的。

vinllen commented 4 years ago
{ 
    "ts" : Timestamp(1575647795, 1), 
    "t" : NumberLong(2), 
    "h" : NumberLong(-5331062152699164081), 
    "v" : 2.0, 
    "op" : "u", 
    "ns" : "cash_loan.marketUser", 
    "ui" : UUID("fd1e33c1-dba9-4a9f-82e7-4720ee8e0abb"), 
    "o2" : {
        "_id" : ObjectId("5dea78e5fc4a0200112f1f5a")
    }, 
    "wall" : ISODate("2019-12-06T15:56:35.079+0000"), 
    "lsid" : {
        "id" : UUID("c41122fe-bd68-464b-bcf5-cf3a64f37765"), 
        "uid" : BinData(0, "V0VQACSguONcJscaTvkXYFE6iXAf2Urk.. 8 more bytes")
    }, 
    "txnNumber" : NumberLong(153), 
    "stmtId" : 0.0, 
    "prevOpTime" : {
        "ts" : Timestamp(0, 0), 
        "t" : NumberLong(-1)
    }, 
    "o" : {
        "$v" : 1.0, 
        "$set" : {
            "employmentName" : "BL PHOTOSTSTE", 
            "monthlySalary" : 15000.0, 
            "occupation" : "other", 
            "officeAddress" : "shop no 09 opp govt see. sec. school", 
            "officeAddressDetail" : {
                "state" : "", 
                "city" : "A", 
                "detail" : "xxxx"
            }, 
            "officePINCode" : "125015", 
            "updatedAt" : ISODate("2019-12-06T15:56:35.081+0000"), 
            "workingSince" : ISODate("2016-08-12T15:54:32.408+0000")
        }
    }
}

mongoshake写入的日志格式解析:

{ts 6767355749539512321} 
{op u} 
{g } 
{ns cash_loan.marketUser} 
{o
    [
        {$set[
            {employmentName BL PHOTOSTSTE} 
            {monthlySalary 15000} 
            {occupation other} 
            {officeAddress shop no 09 opp govt see. sec. school} 
            {officeAddressDetail
                [
                    {state Haryana} 
                    {city Sirsa} 
                    {detail shop no 09 opp govt see. sec. school}
                ]} 
            {officePINCode 125055} 
            {updatedAt 2019-12-06 15:56:35.081 +0000 UTC} 
            {workingSince 2016-08-12 15:54:32.408 +0000 UTC}
            ]
        } 
        {$set [
            {employmentName BL PHOTOSTSTE} 
            {monthlySalary 15000} 
            {occupation other} 
            {officeAddress shop no 09 opp govt see. sec. school} 
            {officeAddressDetail
                [
                    {state Haryana} 
                    {city Sirsa} 
                    {detail shop no 09 opp govt see. sec. school}
                ]} 
            {officePINCode 125055} 
            {updatedAt 2019-12-06 15:56:35.081 +0000 UTC} 
            {workingSince 2016-08-12 15:54:32.408 +0000 UTC}
            ]
        }
    ]
} 
{o2 map[_id:ObjectIdHex("5dea78e5fc4a0200112f1f5a")]} 
{uk map[]} 
{lsid map[id:{4 [196 17 34 254 189 104 70 75 188 245 207 58 100 243 119 101]} 
uid:[87 69 80 0 36 160 184 227 92 38 199 26 78 249 23 96 81 58 137 112 31 217 74 228 231 186 78 41 65 5 10 221]]} 
{fromMigrate false}

目前看起来是mongoshake写入了2个$set导致

报错整个日志:

[2019/12/09 10:56:53 CST] [CRIT] [executor.(*Executor).execute:97] Replayer-4, executor-4, oplog for namespace[cash_loan.agendaJobs] op[u] failed. error type[*errors.errorString] error[doUpdate run upsert/update[true] failed[Updating the path 'lockedAt' would create a conflict at 'lockedAt']], logs number[1], firstLog: [{ts 6767355878388531415} {op u} {g } {ns cash_loan.agendaJobs} {o [{$set [{lockedAt 2019-12-06 15:57:05.973 +0000 UTC}]} {$set [{lockedAt 2019-12-06 15:57:05.973 +0000 UTC}]}]} {o2 map[_id:ObjectIdHex("5d8477d3b3b6a021dc4450ab")]} {uk map[]} {lsid <nil>} {fromMigrate false}]
Jonnter commented 4 years ago

请问下,问题有解决么? 我同样遇到了该问题

vinllen commented 4 years ago

2.4.4版本将会修复,参见:#345