alibaba / MongoShake

MongoShake is a universal data replication platform based on MongoDB's oplog. Redundant replication and active-active replication are two most important functions. 基于mongodb oplog的集群复制工具,可以满足迁移和同步的需求,进一步实现灾备和多活功能。
GNU General Public License v3.0
1.72k stars 441 forks source link

增量同步日志报*mgo.BulkError错误 #405

Closed admh closed 4 years ago

admh commented 4 years ago

环境: mongoshake版本为v2.4.8二进制版,源库为印度区阿里云4.0版本副本集,目标库为深圳ECS自建4.2版本副本集,中间有经过新加坡节点Nginx4层代理提升网络质量。现在看log日志有几条CRIT级别的报错

[18:30:49 CST 2020/08/05] [CRIT] (mongoshake/executor.(*Executor).execute:112) Replayer-1, executor-1, oplog for namespace[questionnaire.question] op[i] failed. error type[*mgo.BulkError] error[index[0], msg[EOF], dup[false]], logs number[22], firstLog: {"ts":68568
30582718988294,"op":"i","g":"","ns":"questionnaire.question","o":[{"Name":"_id","Value":"5f285b161a528d00110e0c40"},{"Name":"answer","Value":["4"]},{"Name":"content","Value":"How many family members?"},{"Name":"namespace","Value":"paisaPay"},{"Name":"questionnaireT
emplateId","Value":"5ee321227d9a6800116fe112"},{"Name":"submitDate","Value":"2020-08-03T18:44:38.929Z"},{"Name":"title","Value":"Questionnaire LoanBus"},{"Name":"topicId","Value":"5ee321227d9a6800116fe107"},{"Name":"type","Value":"number"},{"Name":"userId","Value":
"5f2859ca043ff80018970c21"},{"Name":"__v","Value":0},{"Name":"createdAt","Value":"2020-08-03T18:44:38.931Z"},{"Name":"updatedAt","Value":"2020-08-03T18:44:38.931Z"}],"o2":null,"uk":null,"lsid":{"id":{"Kind":4,"Data":"e1pqN7diTnaUFFk6IseHFQ=="},"uid":"rXqrRR3grYpdeS
jNmDMw4qGlRBZaK69RN8lrhO4APDE="},"fromMigrate":false}

[18:51:00 CST 2020/08/05] [CRIT] (mongoshake/executor.(*Executor).execute:112) Replayer-1, executor-1, oplog for namespace[cash_loan_collection.collectionInfo] op[u] failed. error type[*mgo.BulkError] error[index[0], msg[EOF], dup[false]], logs number[225], firstLog: {"ts":6856842965109702678,"op":"u","g":"","ns":"cash_loan_collection.collectionInfo","o":[{"Name":"$set","Value":[{"Name":"status","Value":"pending"},{"Name":"updatedAt","Value":"2020-08-03T19:32:41.074Z"}]}],"o2":{"_id":"5f15e50fc758d60011f442a7"},"uk":null,"lsid":{"id":{"Kind":4,"Data":"ACowGwl4QcORzcQmMyP8ew=="},"uid":"ecf5T7U5V30pAbcOwuFRwpzoRc2H0qQ6mIRJJRiWJX0="},"fromMigrate":false}

这条记录在目标库查询status字段是没有更新的,跟源库不一致。

配置文件:

# for the detail explanation, please visit xxxx
# 如果有问题,请先查看FAQ文档以及wiki上的说明。
# 关于各个参数的详细说明,请参考:xxx

# current configuration version, do not modify.
# 当前配置文件的版本号,请不要修改该值。
conf.version = 5

# --------------------------- global configuration ---------------------------
# collector name
# id用于输出pid文件等信息。
id = mongoshake

# high availability option.
# enable master election if set true. only one mongoshake can become master
# and do sync, the others will wait and at most one of them become master once
# previous master die. The master information stores in the `mongoshake` db in the source
# database by default.
# This option is useless when there is only one mongoshake running.
# 如果开启主备mongoshake拉取同一个源端,此参数需要开启。
master_quorum = false

# http api interface. Users can use this api to monitor mongoshake.
# `curl 127.0.0.1:9100`.
# We also provide a restful tool named "mongoshake-stat" to
# print ack, lsn, checkpoint and qps information based on this api.
# usage: `./mongoshake-stat --port=9100`
# 全量和增量的restful监控端口,可以用curl查看内部监控metric统计情况。详见wiki。
full_sync.http_port = 9101
incr_sync.http_port = 9408
# profiling on net/http/profile
# profiling端口,用于查看内部go堆栈。
system_profile_port = 9308

# global log level: debug, info, warning, error. lower level message will be filter
log.level = info
# log directory. log and pid file will be stored into this file.
# if not set, default is "./logs/"
# log和pid文件的目录,如果不设置默认打到当前路径的logs目录。
log.dir =
# log file name.
# log文件名。
log.file = collector.log
# log flush enable. If set false, logs may not be print when exit. If
# set true, performance will be decreased extremely
# 设置log刷新,false表示包含缓存,如果true那么每条log都会直接刷屏,但对性能有影响;
# 反之,退出不一定能打印所有的log,调试时建议配置true。
log.flush = false

# sync mode: all/full/incr. default is incr.
# all means full synchronization + incremental synchronization.
# full means full synchronization only.
# incr means incremental synchronization only.
# 同步模式,all表示全量+增量同步,full表示全量同步,incr表示增量同步。
sync_mode = incr

# connect source mongodb, set username and password if enable authority. Please note: password shouldn't contain '@'.
# split by comma(,) if use multiple instance in one replica-set. E.g., mongodb://username1:password1@primaryA,secondaryB,secondaryC
# split by semicolon(;) if sharding enable. E.g., mongodb://username1:password1@primaryA,secondaryB,secondaryC;mongodb://username2:password2@primaryX,secondaryY,secondaryZ
# 源MongoDB连接串信息,逗号分隔同一个副本集内的结点,分号分隔分片sharding实例,免密模式
# 可以忽略“username:password@”,注意,密码里面不能含有'@'符号。
# 举例:
# 副本集:mongodb://username1:password1@primaryA,secondaryB,secondaryC
# 分片集:mongodb://username1:password1@primaryA,secondaryB,secondaryC;mongodb://username2:password2@primaryX,secondaryY,secondaryZ
mongo_urls = mongodb://mongoshake:11111111111111@dds-1111111111.mongodb.ap-south-1.rds.aliyuncs.com:3717,dds-111111111111.mongodb.ap-south-1.rds.aliyuncs.com:3717/admin?replicaSet=mgset-111111111&readPreference=secondary
# please fill the source config server url if source mongodb is sharding.
mongo_cs_url =
# please give at least one mongos address if source is sharding.
# 如果源端采用change stream拉取,这里还需要配置至少一个mongos的地址,多个mongos地址以逗号(,)分割
mongo_s_url =

# tunnel pipeline type. now we support rpc,file,kafka,mock,direct
# 通道模式。
tunnel = direct
# tunnel target resource url
# for rpc. this is remote receiver socket address
# for tcp. this is remote receiver socket address
# for file. this is the file path, for instance "data"
# for kafka. this is the topic and brokers address which split by comma, for
# instance: topic@brokers1,brokers2, default topic is "mongoshake"
# for mock. this is uesless
# for direct. this is target mongodb address which format is the same as `mongo_urls`. If
# the target is sharding, this should be the mongos address.
# direct模式用于直接写入MongoDB,其余模式用于一些分析,或者远距离传输场景,
# 注意,如果是非direct模式,需要通过receiver进行解析,具体参考FAQ文档。
# 此处配置通道的地址,格式与mongo_urls对齐。
tunnel.address = mongodb://1.1.1.1:3717
# the message format in the tunnel, used when tunnel is kafka.
# "raw": batched raw data format which has good performance but encoded so that users
# should parse it by receiver.
# "json": single oplog format by json.
# "bson": single oplog format by bson.
# 通道数据的类型,只用于kafka和file通道类型。
# raw是默认的类型,其采用聚合的模式进行写入和
# 读取,但是由于携带了一些控制信息,所以需要专门用receiver进行解析。
# json以json的格式写入kafka,便于用户直接读取。
# bson以bson二进制的格式写入kafka。
tunnel.message = raw

# connect mode:
# primary: fetch data from primary.
# secondaryPreferred: fetch data from secondary if has, otherwise primary.(default)
# standalone: fetch data from given 1 node, no matter primary, secondary or hidden. This is only
# support when tunnel type is direct.
# 连接模式,primary表示从主上拉取,secondaryPreferred表示优先从secondary拉取(默认建议值),
# standalone表示从任意单个结点拉取。
mongo_connect_mode = secondaryPreferred

# filter db or collection namespace. at most one of these two parameters can be given.
# if the filter.namespace.black is not empty, the given namespace will be
# filtered while others namespace passed.
# if the filter.namespace.white is not empty, the given namespace will be
# passed while others filtered.
# all the namespace will be passed if no condition given.
# db and collection connected by the dot(.).
# different namespaces are split by the semicolon(;).
# filter: filterDbName1.filterCollectionName1;filterDbName2
# 黑白名单过滤,目前不支持正则,白名单表示通过的namespace,黑名单表示过滤的namespace,
# 不能同时指定。分号分割不同namespace,每个namespace可以是db,也可以是db.collection。
filter.namespace.black = cash_loan.log;log
# some databases like "admin", "local", "mongoshake", "config", "system.views" are
# filtered, users can enable these database based on some special needs.
# different database are split by the semicolon(;).
# e.g., admin;mongoshake.
# pay attention: collection isn't support like "admin.xxx" except "system.views"
# 正常情况下,不建议配置该参数,但对于有些非常特殊的场景,用户可以启用admin,mongoshake等库的同步,
# 以分号分割,例如:admin;mongoshake。
filter.pass.special.db =
# only transfer oplog commands for syncing. represent
# by oplog.op are "i","d","u".
# DDL will be transferred if disable like create index, drop databse,
# transaction in mongodb 4.0.
# 是否需要开启DDL同步,true表示开启,源是sharding暂时不支持开启。
# 如果目的端是sharding,暂时不支持applyOps命令,包括事务。
filter.ddl_enable = false

# checkpoint info, used in resuming from break point.
# checkpoint存储信息,用于支持断点续传。
# context.storage.url is used to mark the checkpoint store database. E.g., mongodb://127.0.0.1:20070
# if not set, checkpoint will be written into source mongodb(db=mongoshake)
# checkpoint的具体写入的MongoDB地址,如果不配置,对于副本集和分片集群都将写入源库(db=mongoshake)
checkpoint.storage.url = mongodb://root:1111111@1.1.1.1:3717/admin
# checkpoint db's name.
# checkpoint存储的db的名字
checkpoint.storage.db = mongoshake
# checkpoint collection's name.
# checkpoint存储的表的名字,如果启动多个mongoshake拉取同一个源可以修改这个表名以防止冲突。
checkpoint.storage.collection = ckpt_mongo_shake
# real checkpoint: the fetching oplog position.
# pay attention: this is UTC time which is 8 hours latter than CST time. this
# variable will only be used when checkpoint is not exist.
# 本次开始拉取的位置,如果checkpoint已经存在(位于上述存储位置)则该参数无效,
# 如果需要强制该位置开始拉取,需要先删除原来的checkpoint,详见FAQ。
# 若checkpoint不存在,且该值为1970-01-01T00:00:00Z,则会拉取源端现有的所有oplog。
# 若checkpoint不存在,且该值不为1970-01-01T00:00:00Z,则会先检查源端oplog最老的时间是否
# 大于给定的时间,如果是则会直接报错退出。
checkpoint.start_position = 2020-08-03T18:00:00Z

# transform from source db or collection namespace to dest db or collection namespace.
# at most one of these two parameters can be given.
# transform: fromDbName1.fromCollectionName1:toDbName1.toCollectionName1;fromDbName2:toDbName2
# 转换命名空间,比如a.b同步后变成c.d,谨慎建议开启,比较耗性能。
transform.namespace =

# --------------------------- full sync configuration ---------------------------
# the number of collection concurrence
# 并发最大拉取的表个数,例如,6表示同一时刻shake最多拉取6个表。
full_sync.reader.collection_parallel = 6
# the number of document writer thread in each collection.
# 同一个表内并发写的线程数,例如,8表示对于同一个表,将会有8个写线程进行并发写入。
full_sync.reader.write_document_parallel = 8
# number of documents in a batch insert in a document concurrence
# 目的端写入的batch大小,例如,128表示一个线程将会一次聚合128个文档然后再写入。
full_sync.reader.document_batch_size = 128
# number of documents reading in single reader thread.
# 用于单表倾斜的优化,单个拉取线程读取的最多的文档数,默认0表示拉取是单线程拉取,非0情况下必须>=10000。
# 例如,表内有50000文档,设置10000则读取段拉取为5个线程(建议并发在1-32个线程)。
full_sync.reader.read_document_count = 0

# drop the same name of collection in dest mongodb in full synchronization
# 同步时如果目的库存在,是否先删除目的库再进行同步,true表示先删除再同步,false表示不删除。
full_sync.collection_exist_drop = true

# create foreground indexes when data sync finish in full sync stage.
# 全量期间数据同步完毕后,是否需要创建索引,none表示不创建,foreground表示创建前台索引,
# background表示创建后台索引。
full_sync.create_index = none

# convert insert to update when duplicate key found
# 如果_id存在在目的库,是否将insert语句修改为update语句。
full_sync.executor.insert_on_dup_update = false
# filter orphan document for source type is sharding.
# 源端是sharding,是否需要过滤orphan文档
full_sync.executor.filter.orphan_document = false
# enable majority write in full sync.
# the performance will degrade if enable.
# 全量阶段写入端是否启用majority write
full_sync.executor.majority_enable = false

# --------------------------- incrmental sync configuration ---------------------------
# fetch method:
# oplog: fetch oplog from source mongodb (default)
# change_stream: use change to receive change event from source mongodb, support MongoDB >= 4.0
incr_sync.mongo_fetch_method = oplog

# global id. used in active-active replication.
# this parameter is not supported on current open-source version.
# gid用于双活防止环形复制,目前只用于阿里云云上MongoDB,如果是阿里云云上实例互相同步
# 希望开启gid,请联系阿里云售后,sharding的有多个gid请以分号(;)分隔。
incr_sync.oplog.gids =

# distribute data to different worker by hash key to run in parallel.
# [auto]        decide by if there has unique index in collections.
#               use `collection` if has unique index otherwise use `id`.
# [id]          shard by ObjectId. handle oplogs in sequence by unique _id
# [collection]  shard by ns. handle oplogs in sequence by unique ns
# hash的方式,id表示按文档hash,collection表示按表hash,auto表示自动选择hash类型。
# 如果没有索引建议选择id达到非常高的同步性能,反之请选择collection。
incr_sync.shard_key = collection

# oplog transmit worker concurrent
# if the source is sharding, worker number must equal to shard numbers.
# 内部发送的worker数目,如果机器性能足够,可以提高worker个数。
incr_sync.worker = 8
# batched oplogs have block level checksum value using
# crc32 algorithm. and compressor for compressing content
# of oplog entry.
# supported compressor are : gzip,zlib,deflate
# Do not enable this option when tunnel type is "direct"
# 是否启用发送,非direct模式发送可以选择压缩以减少网络带宽消耗。
incr_sync.worker.oplog_compressor = none

# set the sync delay just like mongodb secondary slaveDelay parameter. unit second.
# 设置目的端的延迟,比如延迟源端20分钟,类似MongoDB本身主从同步slaveDelay参数,单位:秒
# 0表示不启用
incr_sync.target_delay = 0

# memory queue configuration, plz visit FAQ document to see more details.
# do not modify these variables if the performance and resource usage can
# meet your needs.
# 内部队列的配置参数,如果目前性能足够不建议修改,详细信息参考FAQ。
incr_sync.worker.batch_queue_size = 64
incr_sync.adaptive.batching_max_size = 1024
incr_sync.fetcher.buffer_capacity = 256

# --- direct tunnel only begin ---
# if tunnel type is direct, all the below variable should be set
# 下列参数仅用于tunnel为direct的情况。

# oplog changes to Insert while Update found non-exist (_id or unique-index)
# 如果_id不存在在目的库,是否将update语句修改为insert语句。
incr_sync.executor.upsert = true
# oplog changes to Update while Insert found duplicated key (_id or unique-index)
# 如果_id存在在目的库,是否将insert语句修改为update语句。
incr_sync.executor.insert_on_dup_update = true
# db. write duplicated logs to mongoshake_conflict
# sdk. write duplicated logs to sdk.
# 如果写入存在冲突,记录冲突的文档。
incr_sync.conflict_write_to = none

# enable majority write in incrmental sync.
# the performance will degrade if enable.
# 增量阶段写入端是否启用majority write
incr_sync.executor.majority_enable = false

# --- direct tunnel only end ---

# After the document is updated, the fields that only need to be updated are set to false,
# and the contents of all documents are set to true
# 更新文档后,只需要更新的字段则设为false,需要全部文档内容则设为true
# 只在mongo_fetch_method = change_stream 模式下生效,且性能有所下降
incr_sync.change_stream.watch_full_document = false

另外,在重放的时候同步挺慢的。刚跑的时候lsn_ckpt.time是 2020-08-04 02:00:00,程序跑了2、3小时lsn_ckpt.time才变成 2020-08-04 04:00:00

log输出

[2020/08/05 19:40:51 CST] [INFO] Duplicated document found. reinsert or update to [cash_loan] [creditHistoryLoanRecord]
[2020/08/05 19:40:51 CST] [INFO] Duplicated document found. reinsert or update to [cash_loan] [modelProcess]
[2020/08/05 19:40:51 CST] [INFO] Duplicated document found. reinsert or update to [cash_loan] [creditHistoryRepaymentHistory]
[2020/08/05 19:40:52 CST] [INFO] Duplicated document found. reinsert or update to [cash_loan] [modelProcess]
vinllen commented 4 years ago

msg[EOF] 这个看起来是目的端断开连接了,你看看网络状态。可能跟你这个网络状况有关系。 你这个跨的地域比较长了,导致同步的性能比较低,建议把shake放在跟目的端同机房,可以提高一定的性能,

admh commented 4 years ago

msg[EOF] 这个看起来是目的端断开连接了,你看看网络状态。可能跟你这个网络状况有关系。 你这个跨的地域比较长了,导致同步的性能比较低,建议把shake放在跟目的端同机房,可以提高一定的性能,

这个在源库看来只是用shark从推的方式改成拉的方式,主要还是网络问题是硬伤吧。

admh commented 4 years ago

我有个补救的想法就是写个脚本去读输出到错误日志中的_id,然后在源库找到记录再upsert目标库去。

vinllen commented 4 years ago

你这种跨国的确会有这个问题,一般来说放目的端会好一些,因为写入的性能低于读取,放目的端可以降低写入的开销。 如果你只是用于一次性迁移,那你这个补救方式理论上可以,不过会比较复杂,而且有些key可能_id存在,但是内容不一致。

admh commented 4 years ago

shake放目的端如果连源库网络不好的情况下会重拉吗?要是放在目的端,shake只能是用阿里云给的mongodb公网地址连接了,深圳直连印度网络还是挺差的,现在用推的方式还能在中间加个代理减少些网络延时。

vinllen commented 4 years ago

以前云上也有做全球同步的,是collector和receiver分离的,开源的时候为了易部署性考虑,两个并到一块了。不过目前还是有提供一个receiver,只是实现了对接collector的功能,没有做写入的功能,你可以在这个上面进行开发,不过需要消耗一些人力。

admh commented 4 years ago

好的。我这边想把shake用打包成docker镜像,我需要注意什么吗?

vinllen commented 4 years ago

注意点都在wiki里面了,不过没有单独说明关于docker的,基本是一样的。