tair-opensource / RedisShake

RedisShake is a Redis data processing and migration tool.
https://tair-opensource.github.io/RedisShake/
MIT License
3.86k stars 699 forks source link

在关闭目的端回复后,存在切换db时卡住问题 #836

Closed EquentR closed 4 months ago

EquentR commented 4 months ago

问题描述(Issue Description)

在关闭目的端回复后,如果同步的数据跨了多个db,则会在切换db时卡住,日志正常打印,但是ops都变成0了

环境信息(Environment)

其他信息(Additional Information)

出现问题代码:在切换数据库的时候向未初始化的chWaitReply推送了切换db的命令

func NewRedisStandaloneWriter(ctx context.Context, opts *RedisWriterOptions) Writer {
    rw := new(redisStandaloneWriter)
    rw.address = opts.Address
    rw.stat.Name = "writer_" + strings.Replace(opts.Address, ":", "_", -1)
    rw.client = client.NewRedisClient(ctx, opts.Address, opts.Username, opts.Password, opts.Tls)
        // 此处如果关闭回复了,就没有初始化chWaitReply,导致后面的问题
    if opts.OffReply {
        log.Infof("turn off the reply of write")
        rw.offReply = true
        rw.client.Send("CLIENT", "REPLY", "OFF")
    } else {
        rw.chWaitReply = make(chan *entry.Entry, config.Opt.Advanced.PipelineCountLimit)
        rw.chWg.Add(1)
        go rw.processReply()
    }
    return rw
}

//......省略

func (w *redisStandaloneWriter) switchDbTo(newDbId int) {
    log.Debugf("[%s] switch db to [%d]", w.stat.Name, newDbId)
    w.client.Send("select", strconv.Itoa(newDbId))
    w.DbId = newDbId
        // 在这里推送了切换db的回复通道命令
    w.chWaitReply <- &entry.Entry{
        Argv:    []string{"select", strconv.Itoa(newDbId)},
        CmdName: "select",
    }
}

解决方案:加上if判断

func (w *redisStandaloneWriter) switchDbTo(newDbId int) {
    log.Debugf("[%s] switch db to [%d]", w.stat.Name, newDbId)
    w.client.Send("select", strconv.Itoa(newDbId))
    w.DbId = newDbId
    if !w.offReply {
        w.chWaitReply <- &entry.Entry{
            Argv:    []string{"select", strconv.Itoa(newDbId)},
            CmdName: "select",
        }
    }
}

这样实测能解决问题,如果有潜在风险请知会一声

suxb201 commented 4 months ago

应该没问题,欢迎提交 PR 👏

EquentR commented 4 months ago

应该没问题,欢迎提交 PR 👏

好,已提