alibaba / MongoShake

MongoShake is a universal data replication platform based on MongoDB's oplog. Redundant replication and active-active replication are two most important functions. 基于mongodb oplog的集群复制工具,可以满足迁移和同步的需求,进一步实现灾备和多活功能。
GNU General Public License v3.0
1.72k stars 441 forks source link

只同步了最新的一个db #176

Closed 7937 closed 5 years ago

7937 commented 5 years ago

filter.namespace.black filter.namespace.white 都没有填写,但是只同步了最新创建的一个db。mongodb4.0版本迁移到3.6版本。

vinllen commented 5 years ago

同步模式是all模式?是否设置的位点不对,导致拉取的位点之后只有1个db的修改,贴一下整个配置文件的信息

7937 commented 5 years ago

同步模式是all this is the configuration of mongo-shake. if this is your first time using mongo-shake, you can only configue source mongodb address(mongo_urls) and target mongodb address(tunnel.address).

----------------------splitter----------------------

connect source mongodb, set username and password if enable authority. split by comma(,) if use multiple instance in one replica-set. mongodb://username:password@127.0.0.1:20040,127.0.0.1:20041 split by semicolon(;) if sharding enable. E.g., mongodb://username:password@127.0.0.1:20040,127.0.0.1:20041;10.1.1.1:20050,10.1.1.1:20051 mongo_urls = mongodb://127.0.0.1:27001 connect mode: primary: fetch data from primary. secondaryPreferred: fetch data from secondary if has, otherwise primary.(default) standalone: fetch data from given 1 node, no matter primary, secondary or hidden. mongo_connect_mode = standalone

collector name collector.id = mongoshake

sync mode: all/document/oplog. defalut is oplog. all means full synchronization + incremental synchronization. document means full synchronization. oplog means incremental synchronization. sync_mode = all

save checkpoint interval(ms) if necessary. the checkpoint will be checked and stored after starting 3 minutes. checkpoint.interval = 5000

http api interface. Users can use this api to monitor mongoshake. We also provide a restful tool named "mongoshake-stat" to print ack, lsn, checkpoint and qps information based on this api. usage: ./mongoshake-stat --port=9100 http_profile = 9100 profiling on net/http/profile system_profile = 9200

global log level: debug, info, warning, error. lower level message will be filter log_level = info log file name. use logs/ prefix folder path log_file = collector.log log buffer or unbuffer. If set true, logs may not be print when exit. If set false, performance will be decreased extremely log_buffer = true

filter db or collection namespace. at most one of these two parameters can be given. if the filter.namespace.black is not empty, the given namespace will be filtered while others namespace passed. if the filter.namespace.white is not empty, the given namespace will be passed while others filtered. all the namespace will be passed if no condition given. db and collection connected by the dot(.). different namespaces are splitted by the semicolon(;). filter: filterDbName1.filterCollectionName1;filterDbName2 filter.namespace.black = filter.namespace.white =

this parameter is not supported in current opensouce version. oplog namespace and global id. others oplog in mongo cluster that has distinct global id will be discard. Query without gid (means getting all oplog out) if no oplog.gid set oplog.gids =

[auto] decide by if there has uniq index in collections. use collection with uniqu index set otherwise id [id] shard by ObjectId. handle oplogs in sequence by unique _id [collection] shard by ns. handle oplogs in sequence by unique ns shard_key = collection

syncer send time interval, unit is second. time interval of flushing the syncer reader buffer. syncer.reader.buffer_time = 1

oplog transmit worker concurrent if the source is sharding, worker number must equal to shard numbers. worker = 8 memory queue configuration, plz visit FAQ document to see more details. do not modify these variables if the performance and resource usage can meet your needs. worker.batch_queue_size = 64 adaptive.batching_max_size = 16384 fetcher.buffer_capacity = 256

batched oplogs have block level checksum value using crc32 algorithm. and compressor for compressing content of oplog entry. supported compressor are : gzip,zlib,deflate Do not enable this option when tunnel type is "direct" worker.oplog_compressor = none

tunnel pipeline type. now we support rpc,file,kafka,mock,direct tunnel = direct tunnel target resource url for rpc. this is remote receiver socket address for tcp. this is remote receiver socket address for file. this is the file path, for instance "data" for kafka. this is the topic and brokers address which split by comma, for instance: topic@brokers1,brokers2, default topic is "mongoshake" for mock. this is uesless for direct. this is target mongodb address which format is the same as mongo_urls tunnel.address = mongodb://172.16.16.136:27017

collector context storage mainly including store checkpoint. type include : database, api for api storage, address is http url for database storage, address is collection name while db name is "mongoshake" by default. context.storage = database context.storage.url is only used in to mark the checkpoint store database. If the source mongodb type is sharding, the address should be config server when MongoShake's version >= 1.5, otherwise, this is replicaSet address. When source mongodb type is replicaSet, checkpoint will write into source mongodb as default if context.storage.url is not set, otherwise, the checkpoint will be written into this mongodb. E.g., mongodb://127.0.0.1:20070 context.storage.url = checkpoint collection's name. context.address = ckpt_default real checkpoint: the fetching oplog position. pay attention: this is UTC time which is 8 hours latter than CST time. this variable will only be used when checkpoint is not exist. context.start_position = 2000-01-01T00:00:01Z

high availabity option. enable master election if set true. only one mongoshake can become master and do sync, the others will wait and at most one of them become master once previous master die. The master information stores in the mongoshake db in the source database by default. This option is useless when there is only one mongoshake running. master_quorum = false

transform from source db or collection namespace to dest db or collection namespace. at most one of these two parameters can be given. transform: fromDbName1.fromCollectionName1:toDbName1.toCollectionName1;fromDbName2:toDbName2 transform.namespace = if use dbref in document rename, need to set it true, but it will decrease performance of replication dbref = false

----------------------splitter---------------------- if tunnel type is direct, all the below variable should be set only transfer oplog commands for syncing. represent by oplog.op are "i","d","u". DDL will be transferred if disable like create index, drop databse, transaction in mongodb 4.0. replayer.dml_only = false

executors in single worker. do not modify it. replayer.executor = 1

oplog changes to Insert while Update found non-exist (_id or unique-index) replayer.executor.upsert = false oplog changes to Update while Insert found duplicated key (_id or unique-index) replayer.executor.insert_on_dup_update = false db. write duplicated logs to mongoshake_conflict sdk. write duplicated logs to sdk. replayer.conflict_write_to = none replayer duration mode. drop oplogs and take no any action(only for debugging enviroment) if set to false. otherwise write to ${mongo_url} instance replayer.durable = true

----------------------splitter---------------------- full synchronization configuration

the number of collection concurrence replayer.collection_parallel = 6

the number of document concurrence in a collection concurrence replayer.document_parallel = 8

number of documents in a batch insert in a document concurrence replayer.document_batch_size = 256 drop the same name of collection in dest mongodb in full synchronization replayer.collection_drop = false

vinllen commented 5 years ago

检查一下源端是否有checkpoint存在,如果存在只会进行增量同步,而不是全量。mongoshake/ckpt_default

7937 commented 5 years ago

[CRIT] (mongoshake/executor.(Executor).execute:97) Replayer-0, executor-0, oplog for namespace[cos.cos] op[c] failed. error type[mgo.QueryError] error[{ "Invalid field specified for createIndexes command: v" }], logs number[1], firstLog: &{6709778955567628290 c cos.cos [{createIndexes cos} {v 2} {key [{age 1}]} {name age_1}] map[] map[] false map[] 0 0} 这里增量同步报了错,是不支持增量同步索引吗?

vinllen commented 5 years ago

是个bug,我修复一下

vinllen commented 5 years ago

问题记录:

之前DDL操作采用runCommand方式,这是有问题的。 采用applyOps也有问题,3.6以上对于uuid有要求,数据同步场景对应的uuid是有问题,所以不会成功。目前对于op=c的情况,createIndexes操作直接applyOps是不行的,uuid缺失导致写入失败,需要进行解析适配,其余的无论不含有uuid(uid字段)都可以直接采用applyOps执行: drop, dropDatabase, renameCollection, ,deleteIndex,deleteIndexes,dropIndex,dropIndexes,convertToCapped,emptycapped等,含有uuid的话必须匹配,对于同步场景在目的端执行虽然成功,但是不会真的执行,所以对uuid进行剥离处理。(uuid本身维护了数据库+db+表的信息) applyOps会加全局写锁,但考虑到对于同步,DDL操作本身不频繁,所以这个影响不大。此外,applyOps会有写放大,但基本不太会有问题,除非用户的单条oplog已经接近16MB,加上applyOps几个字节超过16MB。

总结:

对于createIndexes进行适配写入,适配协议参考官方命令文档。 对于其余DDL命令剥离uuid字段(如果有的话),再调用applyOps进行写入。

vinllen commented 5 years ago

v2.0.2版本已经修复该问题,请下载最新二进制文件进行运行