alibaba / MongoShake

MongoShake is a universal data replication platform based on MongoDB's oplog. Redundant replication and active-active replication are two most important functions. 基于mongodb oplog的集群复制工具,可以满足迁移和同步的需求,进一步实现灾备和多活功能。
GNU General Public License v3.0
1.72k stars 441 forks source link

mongoshake全量同步了部分数据后报get next document fail i/o timeout #716

Closed uilmas closed 2 years ago

uilmas commented 2 years ago

1 Mongoshake版本为:develop,2f31ff91b5aa5e548e88dfefcf8021e1eeb08aba,release,go1.11.5,2020-06-06_06:26:50

2 源MongoDB版本: MongoDB shell version v4.4.1 Build Info: { "version": "4.4.1", "gitVersion": "ad91a93a5a31e175f5cbf8c69561e788bbc55ce1", "openSSLVersion": "OpenSSL 1.1.1 11 Sep 2018", "modules": [], "allocator": "tcmalloc", "environment": { "distmod": "ubuntu1804", "distarch": "x86_64", "target_arch": "x86_64" } } 3 目的MongoDB版本: MongoDB shell version v4.0.28-rc0 git version: af1a9dc12adcfa83cc19571cb3faba26eeddac92 OpenSSL version: OpenSSL 1.0.2g 1 Mar 2016 allocator: tcmalloc modules: none build environment: distmod: ubuntu1604 distarch: x86_64 target_arch: x86_64

4 最后10行出错的日志 [2022/05/04 07:19:22 UTC] [INFO] [common.(ReplicationMetric).startup.func1:175] [name=BigBoss, stage=full, get=67377838, tps=49345] [2022/05/04 07:19:27 UTC] [INFO] [common.(ReplicationMetric).startup.func1:175] [name=BigBoss, stage=full, get=67632685, tps=47617] [2022/05/04 07:19:32 UTC] [INFO] [common.(ReplicationMetric).startup.func1:175] [name=BigBoss, stage=full, get=67886956, tps=54849] [2022/05/04 07:19:37 UTC] [INFO] [common.(ReplicationMetric).startup.func1:175] [name=BigBoss, stage=full, get=68162284, tps=56383] [2022/05/04 07:19:42 UTC] [INFO] [common.(ReplicationMetric).startup.func1:175] [name=BigBoss, stage=full, get=68462380, tps=66623] [2022/05/04 07:19:47 UTC] [INFO] [common.(ReplicationMetric).startup.func1:175] [name=BigBoss, stage=full, get=68798575, tps=69313] [2022/05/04 07:19:52 UTC] [INFO] [common.(ReplicationMetric).startup.func1:175] [name=BigBoss, stage=full, get=69124014, tps=67328] [2022/05/04 07:19:54 UTC] [CRIT] [docsyncer.(DBSyncer).collectionSync.func1:419] splitter reader[DocumentReader src[mongodb://sa:***@52.80.53.220:8077] ns[{imapi user}] query[map[]]] get next document failed: read tcp 172.31.18.129:47700->52.80.53.220:8077: i/o timeout

5 配置文件

current configuration version, do not modify.

当前配置文件的版本号,请不要修改该值。

conf.version = 3

--------------------------- global configuration ---------------------------

collector name

id用于输出pid文件等信息。

id = mongoshake2sg

high availability option.

enable master election if set true. only one mongoshake can become master

and do sync, the others will wait and at most one of them become master once

previous master die. The master information stores in the mongoshake db in the source

database by default.

This option is useless when there is only one mongoshake running.

如果开启主备mongoshake拉取同一个源端,此参数需要开启。

master_quorum = false

http api interface. Users can use this api to monitor mongoshake.

curl 127.0.0.1:9100.

We also provide a restful tool named "mongoshake-stat" to

print ack, lsn, checkpoint and qps information based on this api.

usage: ./mongoshake-stat --port=9100

全量和增量的restful监控端口,可以用curl查看内部监控metric统计情况。详见wiki。

full_sync.http_port = 9101 incr_sync.http_port = 9100

profiling on net/http/profile

profiling端口,用于查看内部go堆栈。

system_profile_port = 9200

global log level: debug, info, warning, error. lower level message will be filter

log.level = info

log directory. log and pid file will be stored into this file.

if not set, default is "./logs/"

log和pid文件的目录,如果不设置默认打到当前路径的logs目录。

log.dir =

log file name.

log文件名。

log.file = collector.log

log flush enable. If set false, logs may not be print when exit. If

set true, performance will be decreased extremely

设置log刷新,false表示包含缓存,如果true那么每条log都会直接刷屏,但对性能有影响;

反之,退出不一定能打印所有的log,调试时建议配置true。

log.flush = false

sync mode: all/full/incr. default is incr.

all means full synchronization + incremental synchronization.

full means full synchronization only.

incr means incremental synchronization only.

同步模式,all表示全量+增量同步,full表示全量同步,incr表示增量同步。

sync_mode = all

connect source mongodb, set username and password if enable authority. Please note: password shouldn't contain '@'.

split by comma(,) if use multiple instance in one replica-set. E.g., mongodb://username1:password1@primaryA,secondaryB,secondaryC

split by semicolon(;) if sharding enable. E.g., mongodb://username1:password1@primaryA,secondaryB,secondaryC;mongodb://username2:password2@primaryX,secondaryY,secondaryZ

源MongoDB连接串信息,逗号分隔同一个副本集内的结点,分号分隔分片sharding实例,免密模式

可以忽略“username:password@”,注意,密码里面不能含有'@'符号。

举例:

副本集:mongodb://username1:password1@primaryA,secondaryB,secondaryC

分片集:mongodb://username1:password1@primaryA,secondaryB,secondaryC;mongodb://username2:password2@primaryX,secondaryY,secondaryZ

mongo_urls = mongodb://sa:f9b806614!@52.80.53.220:8077

please fill the source config server url if source mongodb is sharding.

mongo_cs_url =

please give one mongos address if using change stream to fetching data in incremental stage.

如果源端采用change stream拉取,这里还需要配置一个mongos的地址

mongo_s_url =

tunnel pipeline type. now we support rpc,file,kafka,mock,direct

通道模式。

tunnel = direct

tunnel target resource url

for rpc. this is remote receiver socket address

for tcp. this is remote receiver socket address

for file. this is the file path, for instance "data"

for kafka. this is the topic and brokers address which split by comma, for

instance: topic@brokers1,brokers2, default topic is "mongoshake"

for mock. this is uesless

for direct. this is target mongodb address which format is the same as mongo_urls. If

the target is sharding, this should be the mongos address.

direct模式用于直接写入MongoDB,其余模式用于一些分析,或者远距离传输场景,

注意,如果是非direct模式,需要通过receiver进行解析,具体参考FAQ文档。

此处配置通道的地址,格式与mongo_urls对齐。

tunnel.address = mongodb://sa:f9b806614!@localhost:8077

the message format in the tunnel, used when tunnel is kafka.

"raw": batched raw data format which has good performance but encoded so that users

should parse it by receiver.

"json": single oplog format by json.

"bson": single oplog format by bson.

通道数据的类型,只用于kafka和file通道类型。

raw是默认的类型,其采用聚合的模式进行写入和

读取,但是由于携带了一些控制信息,所以需要专门用receiver进行解析。

json以json的格式写入kafka,便于用户直接读取。

bson以bson二进制的格式写入kafka。

tunnel.message = raw

connect mode:

primary: fetch data from primary.

secondaryPreferred: fetch data from secondary if has, otherwise primary.(default)

standalone: fetch data from given 1 node, no matter primary, secondary or hidden. This is only

support when tunnel type is direct.

连接模式,primary表示从主上拉取,secondaryPreferred表示优先从secondary拉取(默认建议值),

standalone表示从任意单个结点拉取。

mongo_connect_mode = secondaryPreferred

filter db or collection namespace. at most one of these two parameters can be given.

if the filter.namespace.black is not empty, the given namespace will be

filtered while others namespace passed.

if the filter.namespace.white is not empty, the given namespace will be

passed while others filtered.

all the namespace will be passed if no condition given.

db and collection connected by the dot(.).

different namespaces are split by the semicolon(;).

filter: filterDbName1.filterCollectionName1;filterDbName2

黑白名单过滤,目前不支持正则,白名单表示通过的namespace,黑名单表示过滤的namespace,

不能同时指定。分号分割不同namespace,每个namespace可以是db,也可以是db.collection。

filter.namespace.black = filter.namespace.white =

some databases like "admin", "local", "mongoshake", "config", "system.views" are

filtered, users can enable these database based on some special needs.

different database are split by the semicolon(;).

e.g., admin;mongoshake.

pay attention: collection isn't support like "admin.xxx" except "system.views"

正常情况下,不建议配置该参数,但对于有些非常特殊的场景,用户可以启用admin,mongoshake等库的同步,

以分号分割,例如:admin;mongoshake。

filter.pass.special.db =

only transfer oplog commands for syncing. represent

by oplog.op are "i","d","u".

DDL will be transferred if disable like create index, drop databse,

transaction in mongodb 4.0.

是否需要开启DDL同步,true表示开启,源是sharding暂时不支持开启。

如果目的端是sharding,暂时不支持applyOps命令,包括事务。

filter.ddl_enable = false

checkpoint info, used in resuming from break point.

checkpoint存储信息,用于支持断点续传。

context.storage.url is used to mark the checkpoint store database. E.g., mongodb://127.0.0.1:20070

if not set, checkpoint will be written into source mongodb when source mongodb is replica-set(db=mongoshake),

when source mongodb is sharding, the checkpoint will be written into config-server(db=admin)

checkpoint的具体写入的MongoDB地址,如果不配置,对于副本集将写入源库(db=mongoshake),对于分片集

将写入config-server(db=admin)

checkpoint.storage.url =

checkpoint db's name.

checkpoint存储的db的名字

checkpoint.storage.db = mongoshake

checkpoint collection's name.

checkpoint存储的表的名字,如果启动多个mongoshake拉取同一个源可以修改这个表名以防止冲突。

checkpoint.storage.collection = ckpt_sjpmaster

real checkpoint: the fetching oplog position.

pay attention: this is UTC time which is 8 hours latter than CST time. this

variable will only be used when checkpoint is not exist.

本次开始拉取的位置,如果checkpoint已经存在(位于上述存储位置)则该参数无效,

如果需要强制该位置开始拉取,需要先删除原来的checkpoint,详见FAQ。

若checkpoint不存在,且该值为1970-01-01T00:00:00Z,则会拉取源端现有的所有oplog。

若checkpoint不存在,且该值不为1970-01-01T00:00:00Z,则会先检查源端oplog最老的时间是否

大于给定的时间,如果是则会直接报错退出。

checkpoint.start_position = 1970-01-01T00:00:00Z

transform from source db or collection namespace to dest db or collection namespace.

at most one of these two parameters can be given.

transform: fromDbName1.fromCollectionName1:toDbName1.toCollectionName1;fromDbName2:toDbName2

转换命名空间,比如a.b同步后变成c.d,谨慎建议开启,比较耗性能。

transform.namespace =

--------------------------- full sync configuration ---------------------------

the number of collection concurrence

并发最大拉取的表个数,例如,6表示同一时刻shake最多拉取6个表。

full_sync.reader.collection_parallel = 6

the number of document writer thread in each collection.

同一个表内并发写的线程数,例如,8表示对于同一个表,将会有8个写线程进行并发写入。

full_sync.reader.write_document_parallel = 4

number of documents in a batch insert in a document concurrence

目的端写入的batch大小,例如,128表示一个线程将会一次聚合128个文档然后再写入。

full_sync.reader.document_batch_size = 64

drop the same name of collection in dest mongodb in full synchronization

同步时如果目的库存在,是否先删除目的库再进行同步,true表示先删除再同步,false表示不删除。

full_sync.collection_exist_drop = true

create foreground indexes when data sync finish in full sync stage.

全量期间数据同步完毕后,是否需要创建索引,none表示不创建,foreground表示创建前台索引,

background表示创建后台索引。

full_sync.create_index = background

convert insert to update when duplicate key found

如果_id存在在目的库,是否将insert语句修改为update语句。

full_sync.executor.insert_on_dup_update = false

filter orphan document for source type is sharding.

源端是sharding,是否需要过滤orphan文档

full_sync.executor.filter.orphan_document = false

enable majority write in full sync.

the performance will degrade if enable.

全量阶段写入端是否启用majority write

full_sync.executor.majority_enable = false

--------------------------- incrmental sync configuration ---------------------------

fetch method:

oplog: fetch oplog from source mongodb (default)

change_stream: use change to receive change event from source mongodb, support MongoDB >= 4.0

incr_sync.mongo_fetch_method = oplog

global id. used in active-active replication.

this parameter is not supported on current open-source version.

gid用于双活防止环形复制,目前只用于阿里云云上MongoDB,如果是阿里云云上实例互相同步

希望开启gid,请联系阿里云售后,sharding的有多个gid请以分号(;)分隔。

incr_sync.oplog.gids =

distribute data to different worker by hash key to run in parallel.

[auto] decide by if there has unique index in collections.

use collection if has unique index otherwise use id.

[id] shard by ObjectId. handle oplogs in sequence by unique _id

[collection] shard by ns. handle oplogs in sequence by unique ns

hash的方式,id表示按文档hash,collection表示按表hash,auto表示自动选择hash类型。

如果没有索引建议选择id达到非常高的同步性能,反之请选择collection。

incr_sync.shard_key = collection

oplog transmit worker concurrent

if the source is sharding, worker number must equal to shard numbers.

内部发送的worker数目,如果机器性能足够,可以提高worker个数。

incr_sync.worker = 8

batched oplogs have block level checksum value using

crc32 algorithm. and compressor for compressing content

of oplog entry.

supported compressor are : gzip,zlib,deflate

Do not enable this option when tunnel type is "direct"

是否启用发送,非direct模式发送可以选择压缩以减少网络带宽消耗。

incr_sync.worker.oplog_compressor = none

memory queue configuration, plz visit FAQ document to see more details.

do not modify these variables if the performance and resource usage can

meet your needs.

内部队列的配置参数,如果目前性能足够不建议修改,详细信息参考FAQ。

incr_sync.worker.batch_queue_size = 64 incr_sync.adaptive.batching_max_size = 1024 incr_sync.fetcher.buffer_capacity = 256

--- direct tunnel only begin ---

if tunnel type is direct, all the below variable should be set

下列参数仅用于tunnel为direct的情况。

oplog changes to Insert while Update found non-exist (_id or unique-index)

如果_id不存在在目的库,是否将update语句修改为insert语句。

incr_sync.executor.upsert = false

oplog changes to Update while Insert found duplicated key (_id or unique-index)

如果_id存在在目的库,是否将insert语句修改为update语句。

incr_sync.executor.insert_on_dup_update = false

db. write duplicated logs to mongoshake_conflict

sdk. write duplicated logs to sdk.

如果写入存在冲突,记录冲突的文档。

incr_sync.conflict_write_to = none

enable majority write in incrmental sync.

the performance will degrade if enable.

增量阶段写入端是否启用majority write

incr_sync.executor.majority_enable = false

--- direct tunnel only end ---

zhangst commented 2 years ago

这是网络读取超时了,建议检查下源DB的压力和链路的网络情况。 还有建议使用2.6.X以上的版本,github上有打好的包下载

uilmas commented 2 years ago

非常感谢您快速的回复~ 我也下载了最新的版本 mongo-shake-v2.6.6 配置也是跟旧版本配置一样的,但只要执行就报错中断了,错误信息如下:,不知道是啥原因,请您指点迷津,非常谢谢~ [2022/05/05 03:01:01 UTC] [INFO] start running with mode[all], fullBeginTs[7094081917565272066[1651719659, 2]] [2022/05/05 03:01:01 UTC] [INFO] run serialize document oplog [2022/05/05 03:01:01 UTC] [INFO] source is replica or mongos, no need to fetching chunk map [2022/05/05 03:01:31 UTC] [CRIT] run replication failed: start document replication failed: ping to mongodb://sa:***@18.163.82.224:8077 failed: server selecti on error: server selection timeout, current topology: { Type: ReplicaSetNoPrimary, Servers: [{ Addr: 172.18.0.2:27017, Type: Unknown, State: Connected, Averag e RTT: 0, Last error: connection() : dial tcp 172.18.0.2:27017: connect: no route to host }, ] } [2022/05/05 03:01:31 UTC] [WARN]

zhangst commented 2 years ago

这看上去是连接172.18.0.2:27017不通、。你用mongoshell连接试一下

uilmas commented 2 years ago

感谢您,我用mongoshell连接OK的,我用mongoshake conf.version = 3的版本连接也正常的,就是用新版本conf.version = 10连接报这个错误,是否跟source is replica or mongos, no need to fetching chunk map有关,不知是否MongoDB需要关闭balance吗?但是我不做任何改动,用conf.version = 3的版本连接正常

zhangst commented 2 years ago

18.163.82.224:8077 这个节点也可以成功连接吗?你的副本集没有primary节点吗? ReplicaSetNoPrimary 如果是用mongo_connect_mode=standalone试试

uilmas commented 2 years ago

您好,我改mongo_connect_mode=standalone试试也是报同样的错误。 我搭建了一套新内网环境来执行,也是报同样的错误,以下是各版本信息,报错log及配置信息,还请您给予指导,谢谢!

1 Mongoshake版本为:mongo-shake-v2.6.6

2 源MongoDB版本: MongoDB shell version v4.4.3 Build Info: { "version": "4.4.3", "gitVersion": "913d6b62acfbb344dde1b116f4161360acd8fd13", "openSSLVersion": "OpenSSL 1.1.1 11 Sep 2018", "modules": [], "allocator": "tcmalloc", "environment": { "distmod": "ubuntu1804", "distarch": "x86_64", "target_arch": "x86_64"

3 目的MongoDB版本: MongoDB shell version v4.4.3 Build Info: { "version": "4.4.3", "gitVersion": "913d6b62acfbb344dde1b116f4161360acd8fd13", "openSSLVersion": "OpenSSL 1.1.1 11 Sep 2018", "modules": [], "allocator": "tcmalloc", "environment": { "distmod": "ubuntu1804", "distarch": "x86_64", "target_arch": "x86_64

4 报错的日志 [2022/05/07 08:58:17 UTC] [INFO] log init succ. log.dir[] log.name[collector.log] log.level[info] [2022/05/07 08:58:17 UTC] [INFO] MongoDB Version Source[4.4.3] Target[4.4.3] [2022/05/07 08:58:17 UTC] [WARN]


\ \ __ | \ \ / __-=O'/|O'/| \ MongoShake, Here we go !! _____\ / | / ) / / '/-==_ /|/=-| -GM / Alibaba Cloud / * \ | | / / (o)

if you have any problem, please visit https://github.com/alibaba/MongoShake/wiki/FAQ

[2022/05/07 08:58:17 UTC] [INFO] New session to mongodb://sa:@172.31.1.76:8077 successfully [2022/05/07 08:58:17 UTC] [INFO] Close session with mongodb://sa:@172.31.1.76:8077 [2022/05/07 08:58:17 UTC] [INFO] New session to mongodb://sa:@172.31.1.76:8077 successfully [2022/05/07 08:58:17 UTC] [INFO] Close session with mongodb://sa:@172.31.1.76:8077 [2022/05/07 08:58:17 UTC] [INFO] Collector startup. shard_by[collection] gids[[]] [2022/05/07 08:58:17 UTC] [INFO] Collector configuration {"ConfVersion":10,"Id":"mongoshake","MasterQuorum":false,"FullSyncHTTPListenPort":9101,"IncrSyncHTTPListenPort":9100,"SystemProfilePort":9200,"LogLevel":"info","LogDirectory":"","LogFileName":"collector.log","LogFlush":false,"SyncMode":"all","MongoUrls":["mongodb://sa:@172.31.1.76:8077"],"MongoCsUrl":"","MongoSUrl":"","MongoSslRootCaFile":"","MongoSslClientCaFile":"","MongoConnectMode":"secondaryPreferred","Tunnel":"direct","TunnelAddress":["mongodb://sa:@localhost:8077"],"TunnelMessage":"raw","TunnelKafkaPartitionNumber":1,"TunnelJsonFormat":"","TunnelMongoSslRootCaFile":"","FilterNamespaceBlack":[],"FilterNamespaceWhite":[],"FilterPassSpecialDb":[],"FilterDDLEnable":false,"FilterOplogGids":false,"CheckpointStorageUrl":"mongodb://sa:@172.31.1.76:8077","CheckpointStorageDb":"mongoshake","CheckpointStorageCollection":"ckpt_nxtonx","CheckpointStorageUrlMongoSslRootCaFile":"","CheckpointStartPosition":1,"TransformNamespace":[],"SpecialSourceDBFlag":"","FullSyncReaderCollectionParallel":6,"FullSyncReaderWriteDocumentParallel":8,"FullSyncReaderDocumentBatchSize":128,"FullSyncReaderParallelThread":1,"FullSyncReaderParallelIndex":"_id","FullSyncCollectionDrop":true,"FullSyncCreateIndex":"background","FullSyncReaderOplogStoreDisk":false,"FullSyncReaderOplogStoreDiskMaxSize":256000,"FullSyncExecutorInsertOnDupUpdate":false,"FullSyncExecutorFilterOrphanDocument":false,"FullSyncExecutorMajorityEnable":false,"IncrSyncMongoFetchMethod":"oplog","IncrSyncChangeStreamWatchFullDocument":false,"IncrSyncOplogGIDS":[],"IncrSyncShardKey":"collection","IncrSyncShardByObjectIdWhiteList":[],"IncrSyncWorker":8,"IncrSyncTunnelWriteThread":8,"IncrSyncTargetDelay":0,"IncrSyncWorkerBatchQueueSize":64,"IncrSyncAdaptiveBatchingMaxSize":1024,"IncrSyncFetcherBufferCapacity":256,"IncrSyncExecutorUpsert":false,"IncrSyncExecutorInsertOnDupUpdate":false,"IncrSyncConflictWriteTo":"none","IncrSyncExecutorMajorityEnable":false,"CheckpointStorage":"database","CheckpointInterval":5000,"FullSyncExecutorDebug":false,"IncrSyncDBRef":false,"IncrSyncExecutor":1,"IncrSyncExecutorDebug":false,"IncrSyncReaderDebug":"","IncrSyncCollisionEnable":false,"IncrSyncReaderBufferTime":1,"IncrSyncWorkerOplogCompressor":"none","IncrSyncTunnelKafkaDebug":"","Version":"$","SourceDBVersion":"4.4.3","TargetDBVersion":"4.4.3","IncrSyncTunnel":"","IncrSyncTunnelAddress":null,"IncrSyncTunnelMessage":"","HTTPListenPort":0,"SystemProfile":0} [2022/05/07 08:58:17 UTC] [INFO] New session to mongodb://sa:@172.31.1.76:8077 successfully [2022/05/07 08:58:17 UTC] [INFO] Close session with mongodb://sa:@172.31.1.76:8077 [2022/05/07 08:58:17 UTC] [INFO] New session to mongodb://sa:@172.31.1.76:8077 successfully [2022/05/07 08:58:17 UTC] [INFO] Close session with mongodb://sa:@172.31.1.76:8077 [2022/05/07 08:58:17 UTC] [INFO] all node timestamp map: map[BigBoss:{7094912783283650561 7094916133358141441}] [2022/05/07 08:58:17 UTC] [INFO] New session to mongodb://sa:@172.31.1.76:8077 successfully [2022/05/07 08:58:17 UTC] [INFO] BigBoss Regenerate checkpoint but won't persist. content: {"name":"BigBoss","ckpt":1,"version":2,"fetch_method":"","oplog_disk_queue":"","oplog_disk_queue_apply_finish_ts":1} [2022/05/07 08:58:17 UTC] [INFO] BigBoss checkpoint using mongod/replica_set: {"name":"BigBoss","ckpt":1,"version":2,"fetch_method":"","oplog_disk_queue":"","oplog_disk_queue_apply_finish_ts":1}, ckptRemote set? [false] [2022/05/07 08:58:17 UTC] [INFO] BigBoss syncModeAll[true] ts.Oldest[7094912783283650561], confTsMongoTs[4294967296] [2022/05/07 08:58:17 UTC] [INFO] start running with mode[all], fullBeginTs[7094916133358141441[1651913890, 1]] [2022/05/07 08:58:17 UTC] [INFO] run serialize document oplog [2022/05/07 08:58:17 UTC] [INFO] source is replica or mongos, no need to fetching chunk map [2022/05/07 08:58:47 UTC] [CRIT] run replication failed: start document replication failed: ping to mongodb://sa:***@172.31.1.76:8077 failed: server selection error: server selection timeout, current topology: { Type: ReplicaSetNoPrimary, Servers: [{ Addr: 172.24.0.2:27017, Type: RSPrimary, State: Connected, Average RTT: 443292 }, ] } [2022/05/07 08:58:47 UTC] [WARN]

|

Oh we finish ? # #|# #

|

     |       ############
                 # #

| # #

     |     |    #   #      |        |

| | # # | | | | # .-. # |

( O )# | | |

| ################. .############### |

|_| ### | | _

| |

| | | | | | | |

######################################

                #####

5 配置文件

if you have any problem, please visit https://github.com/alibaba/MongoShake/wiki/FAQ

for the detail explanation, please visit xxxx

如果有问题,请先查看FAQ文档以及wiki上的说明。

关于各个参数的详细说明,请参考:xxx

current configuration version, do not modify.

当前配置文件的版本号,请不要修改该值。

conf.version = 10

--------------------------- global configuration ---------------------------

collector name

id用于输出pid文件等信息。

id = mongoshake

high availability option.

enable master election if set true. only one mongoshake can become master

and do sync, the others will wait and at most one of them become master once

previous master die. The master information stores in the mongoshake db in the source

database by default.

This option is useless when there is only one mongoshake running.

如果开启主备mongoshake拉取同一个源端,此参数需要开启。

master_quorum = false

http api interface. Users can use this api to monitor mongoshake.

curl 127.0.0.1:9100.

We also provide a restful tool named "mongoshake-stat" to

print ack, lsn, checkpoint and qps information based on this api.

usage: ./mongoshake-stat --port=9100

全量和增量的restful监控端口,可以用curl查看内部监控metric统计情况。详见wiki。

full_sync.http_port = 9101 incr_sync.http_port = 9100

profiling on net/http/profile

profiling端口,用于查看内部go堆栈。

system_profile_port = 9200

global log level: debug, info, warning, error. lower level message will be filter

log.level = info

log directory. log and pid file will be stored into this file.

if not set, default is "./logs/"

log和pid文件的目录,如果不设置默认打到当前路径的logs目录。

log.dir =

log file name.

log文件名。

log.file = collector.log

log flush enable. If set false, logs may not be print when exit. If

set true, performance will be decreased extremely

设置log刷新,false表示包含缓存,如果true那么每条log都会直接刷屏,但对性能有影响;

反之,退出不一定能打印所有的log,调试时建议配置true。

log.flush = false

sync mode: all/full/incr. default is incr.

all means full synchronization + incremental synchronization.

full means full synchronization only.

incr means incremental synchronization only.

同步模式,all表示全量+增量同步,full表示全量同步,incr表示增量同步。

sync_mode = all

connect source mongodb, set username and password if enable authority. Please note: password shouldn't contain '@'.

split by comma(,) if use multiple instance in one replica-set. E.g., mongodb://username1:password1@primaryA,secondaryB,secondaryC

split by semicolon(;) if sharding enable. E.g., mongodb://username1:password1@primaryA,secondaryB,secondaryC;mongodb://username2:password2@primaryX,secondaryY,secondaryZ

源MongoDB连接串信息,逗号分隔同一个副本集内的结点,分号分隔分片sharding实例,免密模式

可以忽略“username:password@”,注意,密码里面不能含有'@'符号。

举例:

副本集:mongodb://username1:password1@primaryA,secondaryB,secondaryC

分片集:mongodb://username1:password1@primaryA,secondaryB,secondaryC;mongodb://username2:password2@primaryX,secondaryY,secondaryZ

mongo_urls = mongodb://sa:f9b806614!@172.31.1.76:8077

please fill the source config server url if source mongodb is sharding.

mongo_cs_url =

please give at least one mongos address if source is sharding.

如果源端采用change stream拉取,这里还需要配置至少一个mongos的地址,多个mongos地址以逗号(,)分割

mongo_s_url =

enable source ssl

mongo_ssl_root_ca_file =

tunnel pipeline type. now we support rpc,file,kafka,mock,direct

通道模式。

tunnel = direct

tunnel target resource url

for rpc. this is remote receiver socket address

for tcp. this is remote receiver socket address

for file. this is the file path, for instance "data"

for kafka. this is the topic and brokers address which split by comma, for

instance: topic@brokers1,brokers2, default topic is "mongoshake"

for mock. this is uesless

for direct. this is target mongodb address which format is the same as mongo_urls. If

the target is sharding, this should be the mongos address.

direct模式用于直接写入MongoDB,其余模式用于一些分析,或者远距离传输场景,

注意,如果是非direct模式,需要通过receiver进行解析,具体参考FAQ文档。

此处配置通道的地址,格式与mongo_urls对齐。

tunnel.address = mongodb://sa:f9b806614!@localhost:8077

the message format in the tunnel, used when tunnel is kafka.

"raw": batched raw data format which has good performance but encoded so that users

should parse it by receiver.

"json": single oplog format by json.

"bson": single oplog format by bson.

通道数据的类型,只用于kafka和file通道类型。

raw是默认的类型,其采用聚合的模式进行写入和

读取,但是由于携带了一些控制信息,所以需要专门用receiver进行解析。

json以json的格式写入kafka,便于用户直接读取。

bson以bson二进制的格式写入kafka。

tunnel.message = raw

how many partitions will be written, use some hash function in "incr_sync.shard_key".

如果目的端是kafka,最多启用多少个partition,最大不超过"incr_sync.worker"。默认1

tunnel.kafka.partition_number = 1

tunnel json format, it'll only take effect in the case of tunnel.message = json

and tunnel == kafka. Set canonical_extended_json if you want to use "Canonical

Extended JSON Format", #559.

写入异构通道的json格式。如果希望使用Canonical Extended Json Format,则设置为

canonical_extended_json

tunnel.json.format =

if tunnel == driect or kafka and enable ssl

tunnel.mongo_ssl_root_ca_file =

connect mode:

primary: fetch data from primary.

secondaryPreferred: fetch data from secondary if has, otherwise primary.(default)

standalone: fetch data from given 1 node, no matter primary, secondary or hidden. This is only

support when tunnel type is direct.

连接模式,primary表示从主上拉取,secondaryPreferred表示优先从secondary拉取(默认建议值),

standalone表示从任意单个结点拉取。

mongo_connect_mode = secondaryPreferred

filter db or collection namespace. at most one of these two parameters can be given.

if the filter.namespace.black is not empty, the given namespace will be

filtered while others namespace passed.

if the filter.namespace.white is not empty, the given namespace will be

passed while others filtered.

all the namespace will be passed if no condition given.

db and collection connected by the dot(.).

different namespaces are split by the semicolon(;).

filter: filterDbName1.filterCollectionName1;filterDbName2

黑白名单过滤,目前不支持正则,白名单表示通过的namespace,黑名单表示过滤的namespace,

不能同时指定。分号分割不同namespace,每个namespace可以是db,也可以是db.collection。

filter.namespace.black = filter.namespace.white =

some databases like "admin", "local", "mongoshake", "config", "system.views" are

filtered, users can enable these database based on some special needs.

different database are split by the semicolon(;).

e.g., admin;mongoshake.

pay attention: collection isn't support like "admin.xxx" except "system.views"

正常情况下,不建议配置该参数,但对于有些非常特殊的场景,用户可以启用admin,mongoshake等库的同步,

以分号分割,例如:admin;mongoshake。

filter.pass.special.db =

only transfer oplog commands for syncing. represent

by oplog.op are "i","d","u".

DDL will be transferred if disable like create index, drop databse,

transaction in mongodb 4.0.

是否需要开启DDL同步,true表示开启,源是sharding暂时不支持开启。

如果目的端是sharding,暂时不支持applyOps命令,包括事务。

filter.ddl_enable = false

filter oplog gid if enabled.

如果MongoDB启用了gid,但是目的端MongoDB不支持gid导致同步会失败,可以启用gid过滤,将会去掉gid字段。

谨慎建议开启,shake本身性能受损很大。

filter.oplog.gids = false

checkpoint info, used in resuming from break point.

checkpoint存储信息,用于支持断点续传。

context.storage.url is used to mark the checkpoint store database. E.g., mongodb://127.0.0.1:20070

if not set, checkpoint will be written into source mongodb(db=mongoshake)

checkpoint的具体写入的MongoDB地址,如果不配置,对于副本集和分片集群都将写入源库(db=mongoshake)

2.4版本以后不需要配置为源端cs的地址。

checkpoint.storage.url =

checkpoint db's name.

checkpoint存储的db的名字

checkpoint.storage.db = mongoshake

checkpoint collection's name.

checkpoint存储的表的名字,如果启动多个mongoshake拉取同一个源可以修改这个表名以防止冲突。

checkpoint.storage.collection = ckpt_nxtonx

set if enable ssl

checkpoint.storage.url.mongo_ssl_root_ca_file =

real checkpoint: the fetching oplog position.

pay attention: this is UTC time which is 8 hours latter than CST time. this

variable will only be used when checkpoint is not exist.

本次开始拉取的位置,如果checkpoint已经存在(位于上述存储位置)则该参数无效,

如果需要强制该位置开始拉取,需要先删除原来的checkpoint,详见FAQ。

若checkpoint不存在,且该值为1970-01-01T00:00:00Z,则会拉取源端现有的所有oplog。

若checkpoint不存在,且该值不为1970-01-01T00:00:00Z,则会先检查源端oplog最老的时间是否

大于给定的时间,如果是则会直接报错退出。

checkpoint.start_position = 1970-01-01T00:00:00Z

transform from source db or collection namespace to dest db or collection namespace.

at most one of these two parameters can be given.

transform: fromDbName1.fromCollectionName1:toDbName1.toCollectionName1;fromDbName2:toDbName2

转换命名空间,比如a.b同步后变成c.d,谨慎建议开启,比较耗性能。

transform.namespace =

--------------------------- full sync configuration ---------------------------

the number of collection concurrence

并发最大拉取的表个数,例如,6表示同一时刻shake最多拉取6个表。

full_sync.reader.collection_parallel = 6

the number of document writer thread in each collection.

同一个表内并发写的线程数,例如,8表示对于同一个表,将会有8个写线程进行并发写入。

full_sync.reader.write_document_parallel = 8

number of documents in a batch insert in a document concurrence

目的端写入的batch大小,例如,128表示一个线程将会一次聚合128个文档然后再写入。

full_sync.reader.document_batch_size = 128

max number of fetching thread per table. default is 1

单个表最大拉取的线程数,默认是单线程拉取。需要具备splitVector权限。

注意:对单个表来说,仅支持索引对应的value是同种类型,如果有不同类型请勿启用该配置项!

full_sync.reader.parallel_thread = 1

the parallel query index if set full_sync.reader.parallel_thread. index should only has

1 field.

如果设置了full_sync.reader.parallel_thread,还需要设置该参数,并行拉取所扫描的index,value

必须是同种类型。对于副本集,建议设置_id;对于集群版,建议设置shard_key。key只能有1个field。

full_sync.reader.parallel_index = _id

drop the same name of collection in dest mongodb in full synchronization

同步时如果目的库存在,是否先删除目的库再进行同步,true表示先删除再同步,false表示不删除。

full_sync.collection_exist_drop = true

create index option.

none: do not create indexes.

foreground: create indexes when data sync finish in full sync stage.

background: create indexes when starting.

全量期间数据同步完毕后,是否需要创建索引,none表示不创建,foreground表示创建前台索引,

background表示创建后台索引。

full_sync.create_index = background

convert insert to update when duplicate key found

如果_id存在在目的库,是否将insert语句修改为update语句。

full_sync.executor.insert_on_dup_update = false

filter orphan document for source type is sharding.

源端是sharding,是否需要过滤orphan文档

full_sync.executor.filter.orphan_document = false

enable majority write in full sync.

the performance will degrade if enable.

全量阶段写入端是否启用majority write

full_sync.executor.majority_enable = false

--------------------------- incrmental sync configuration ---------------------------

fetch method:

oplog: fetch oplog from source mongodb (default)

change_stream: use change to receive change event from source mongodb, support MongoDB >= 4.0.

we recommand to use change_stream if possible.

incr_sync.mongo_fetch_method = oplog

After the document is updated, the fields that only need to be updated are set to false,

and the contents of all documents are set to true

更新文档后,只需要更新的字段则设为false,需要全部文档内容则设为true

只在mongo_fetch_method = change_stream 模式下生效,且性能有所下降

incr_sync.change_stream.watch_full_document = false

global id. used in active-active replication.

this parameter is not supported on current open-source version.

gid用于双活防止环形复制,目前只用于阿里云云上MongoDB,如果是阿里云云上实例互相同步

希望开启gid,请联系阿里云售后,sharding的有多个gid请以分号(;)分隔。

incr_sync.oplog.gids =

distribute data to different worker by hash key to run in parallel.

[auto] decide by if there has unique index in collections.

use collection if has unique index otherwise use id.

[id] shard by ObjectId. handle oplogs in sequence by unique _id

[collection] shard by ns. handle oplogs in sequence by unique ns

hash的方式,id表示按文档hash,collection表示按表hash,auto表示自动选择hash类型。

如果没有索引建议选择id达到非常高的同步性能,反之请选择collection。

incr_sync.shard_key = collection

if shard_key is collection, and users want to improve performance when some collections

do not have unique key.

对于按collection哈希,如果某些表不具有唯一索引,则可以设置按_id哈希以提高并发度。

用户需要确认该表不会创建唯一索引,一旦检测发现存在唯一索引,则会立刻crash退出。

例如,db1.collection1;db2.collection2,不支持仅指定db

incr_sync.shard_by_object_id_whitelist =

oplog transmit worker concurrent

if the source is sharding, worker number must equal to shard numbers.

内部发送(写目的DB)的worker数目,如果机器性能足够,可以提高worker个数。

incr_sync.worker = 8

how many writing threads will be used in one worker.

对于目的端是kafka等非direct tunnel,启用多少个序列化线程,必须为"incr_sync.worker"的倍数。

默认为"incr_sync.worker"的值。

incr_sync.tunnel.write_thread = 8

set the sync delay just like mongodb secondary slaveDelay parameter. unit second.

设置目的端的延迟,比如延迟源端20分钟,类似MongoDB本身主从同步slaveDelay参数,单位:秒

0表示不启用

incr_sync.target_delay = 0

memory queue configuration, plz visit FAQ document to see more details.

do not modify these variables if the performance and resource usage can

meet your needs.

内部队列的配置参数,如果目前性能足够不建议修改,详细信息参考FAQ。

batch_queue_size:每个worker线程的队列长度,worker线程从此队列取任务

batching_max_size:一次分发给worker的任务最多包含多少个文档

buffer_capacity:PendingQueue队列中一个buffer至少包含的文档个数,进行序列化

incr_sync.worker.batch_queue_size = 64 incr_sync.adaptive.batching_max_size = 1024 incr_sync.fetcher.buffer_capacity = 256

--- direct tunnel only begin ---

if tunnel type is direct, all the below variable should be set

下列参数仅用于tunnel为direct的情况。

oplog changes to Insert while Update found non-exist (_id or unique-index)

如果_id不存在在目的库,是否将update语句修改为insert语句。

incr_sync.executor.upsert = false

oplog changes to Update while Insert found duplicated key (_id or unique-index)

如果_id存在在目的库,是否将insert语句修改为update语句。

incr_sync.executor.insert_on_dup_update = false

db. write duplicated logs to mongoshake_conflict

如果写入存在冲突,记录冲突的文档。选项:db, none

db:冲突写到目的DB的mongshake_conflict库中

incr_sync.conflict_write_to = none

enable majority write in incrmental sync.

the performance will degrade if enable.

增量阶段写入端是否启用majority write

incr_sync.executor.majority_enable = false

--- direct tunnel only end ---

特殊字段,标识源端类型,默认为空。阿里云MongoDB serverless集群请配置aliyun_serverless

special.source.db.flag =

zhangst commented 2 years ago

你的配置文件并没有改正确。还是secondary mongo_connect_mode = secondaryPreferred 如果你是用副本集的方式测试,你要保证你的集群有primary节点

uilmas commented 2 years ago

明白。连接模式。但我的配置mongo_connect_mode无论改为那种模式(primary secondaryPreferred standalone),都还是同样报同样的错误。 我改用mongoshake conf.version = 3的版本连接却是正常,是否mongoshake新版本还有啥特定设置。

zhangst commented 2 years ago

麻烦下载一个最新的2.6.6的压缩包(新包后续有加过一些日志),然后跑一下。把conf和log压缩为一个包上传下。

uilmas commented 2 years ago

好的,我已经下载了最新的2.6.6并执行了还是报此前同样的错误。 为了便于您分析问题,压缩包里共有4个文件,2个旧版的文件和2个新版的文件,同样的机器和环境,旧版能顺利跑成功,新版就是报错,敬请您协助~

zhangst commented 2 years ago

这个应该依赖的库不同造成的差异,老版本只使用了mgo库,新版本mgo和原厂的go-driver都使用了,且在逐步替换过程中。 这两个库对密码的规则要求有不同,go-driver中要求密码中不能有! % @等特殊字符,你修改下密码或者新建一个账号试下。 文件我已经收到,已经被我删除了。

uilmas commented 2 years ago

您好,我新建了一个账号(ops),密码不包含特殊字符,权限相同于SA,还是报同样的错误,这个后面会修复改善吗?

[2022/05/10 07:48:02 UTC] [INFO] New session to mongodb://ops:***@69.34.13.75:8077 successfully [2022/05/10 07:48:02 UTC] [INFO] BigBoss Regenerate checkpoint but won't persist. content: {"name":"BigBoss","ckpt":1,"version":2,"fetch_method":"","oplog_disk_queue":"","oplog_disk_queue_apply_finish_ts":1} [2022/05/10 07:48:02 UTC] [INFO] BigBoss checkpoint using mongod/replica_set: {"name":"BigBoss","ckpt":1,"version":2,"fetch_method":"","oplog_disk_queue":"","oplog_disk_queue_apply_finish_ts":1}, ckptRemote set? [false] [2022/05/10 07:48:02 UTC] [INFO] BigBoss syncModeAll[true] ts.Oldest[7094912783283650561], confTsMongoTs[4294967296] [2022/05/10 07:48:02 UTC] [INFO] start running with mode[all], fullBeginTs[7096011281299144705[1652168874, 1]] [2022/05/10 07:48:02 UTC] [INFO] run serialize document oplog [2022/05/10 07:48:02 UTC] [INFO] source is replica or mongos, no need to fetching chunk map

zhangst commented 2 years ago

你贴的日志中没有错误日志

uilmas commented 2 years ago

collector (3).log

敬请您查阅,和此前的报错一样哈,谢谢~

zhangst commented 2 years ago

69.234.213.75:8077 这个副本集有primary节点吗?请换一个内网通信的副本集或者把shake部署到前面IP的内网试一下

uilmas commented 2 years ago

collector (4).log

这个副本集就是primary节点。我把我的配置mongo_connect_mode无论改为那种模式(primary secondaryPreferred standalone),源端IP地址也都改为内部地址,网络都开,用户名和密码都是新的简单的,都还是同样报同样的错误。日志如上

uilmas commented 2 years ago

或者您可以远程协助我看一看,我自己也是认为太玄幻了

zhangst commented 2 years ago

这里为什么会有两个地址:172.31.47.145:8077 和 172.24.0.2:27017 。先看下是不是mongo配置的问题。

uilmas commented 2 years ago

您好,我们的MongoDB全都是用是docker的,172..31.47.145是内网地址,172.24.0.2是docker MongoDB容器地址,8077是docker映射MongoDB 27017的端口,我们现在用旧版同步成功的多个例子都是同样的配置和docker

zhangst commented 2 years ago

mongoshell测试时,你用的是什么版本的mongoshell?有使用4.4版本的mongoshell测试吗?

uilmas commented 2 years ago

源端用的是4.0.28的版本,目的端是用的是4.4.3的版本,直接docker exec进入到MongoDB容器执行的

mongoDB shell version v4.0.28-rc0 connecting to: mongodb://127.0.0.1:27017/admin?gssapiServiceName=mongodb

MongoDB shell version v4.4.3

zhangst commented 2 years ago

你在shake所在的机器上连接试试呢?

uilmas commented 2 years ago

您好~ 我测试过了,在shake机器上用mongo shell使用ops的用户名和密码可以连接到源端的MongoDB 或者您可以远程协助我看一看呢,我把相关信息邮件给您?

zhangst commented 2 years ago

邮箱:shuntong.zhang AT alibaba-inc.com

uilmas commented 2 years ago

已经将相关信息邮件给您了,感谢