redis / rueidis

A fast Golang Redis client that supports Client Side Caching, Auto Pipelining, Generics OM, RedisJSON, RedisBloom, RediSearch, etc.
Apache License 2.0
2.42k stars 152 forks source link

Is it possible to use Redis pipeline to efficiently write 10 million Redis JSON-formatted data in a high-throughput manner even in a Redis cluster environment? #181

Closed totorofly closed 1 year ago

totorofly commented 1 year ago

Hi,Is it possible to use Redis pipeline to efficiently write 10 million Redis JSON-formatted data in a high-throughput manner even in a Redis cluster environment? How should the specific code be written?

rueian commented 1 year ago

Hi @dwzkit,

The most easy way to do that will be using multiple goroutines, and each of them keep inserting json formatted record. For example:

func BenchmarkPipelining(b *testing.B, client rueidis.Client) {
    // the below client.Do() operations will be issued from
    // multiple goroutines and thus will be pipelined automatically.
    b.RunParallel(func(pb *testing.PB) {
        for pb.Next() {
            client.Do(context.Background(), client.B().Set().Key("k").Value("formatted").Build()).ToString()
        }
    })
}
totorofly commented 1 year ago

Sorry, I didn't quite understand the code example you provided. I have 10 million phone numbers, each with a similar structure as the following JSON format:

// loop over 10 million phone numbers
for i := 0; i < 10000000; i++ {
    // create the JSON-formatted data for the current phone number
    data := fmt.Sprintf(`{
        "phone": "%d",
        "diffNumCount": 65,
        "status": "1",
        "owner": "",
        "ttlInSecond": 0,
        "preOrderTime": 0,
        "providerCode": "qhyd",
        "touchCode": "no",
        "province": "qh",
        "city": "xn",
        "saleProvinceRange": "all",
        "saleCityRange": "all",
        "vendor": "1",
        "rule_common": {
            "prefixNum": "13X",
            "prefix3Bit": "135",
            "include4": "yes",
            "hitMassRule": "yes",
            "hitMassRuleId": "anyBrithYear",
            "fiveNum": "no"
        },
        "rule_phone": {
            "last4_mid4": "19753874",
            "last4_mid3": "9753874,197X3874",
            "last4_mid2": "753874",
            "last4": "3874",
            "last3_mid4": "1975X874",
            "last3_mid3": "197X874,975X874",
            "last3_mid2": "75X874",
            "last3": "874"
        },
        "rule_idCard": {
            "last4_mmdd": "3874",
            "last4_yyyy": "3874",
            "last4_idCardLast4": "3874",
            "mid4_mmdd": "3874",
            "mid4_yyyy": "3874",
            "mid4_idCardLast4": "3874",
            "last2_yy": "74"
        },
        "rule_car": {
            "any5": "53874",
            "any4": "X3874,5X874,53X74,538X4,5387X",
            "any3": "XX874,X3X74,X38X4,X387X,5XX74,5X8X4,5X87X,53XX4,53X7X,538XX",
            "tail5": "53874",
            "tail4": "3874",
            "tail3": "874",
            "continuous5": "75387,97538,19753,51975,35197,13519",
            "continuous4": "5387,7538,9753,1975,5197,3519,1351",
            "continuous3": "387,538,753,975,197,519,351,135"
        },
        "rule_mass": {
            "anyBrithYear": "yes"
        },
        "rule_continuity": {
            "continuity2": "13,35,51,19,97,75,53,38,87,74",
            "continuity3": "135,351,519,197,975,753,538,387,`
            "continuity4": "1351,3519,5197,1975,9753,7538,5387,3874",
            "continuity5": "13519,35197,51975,19753,97538,75387,53874",
            "continuity6": "135197,351975,519753,197538,975387,753874",
            "continuity7": "1351975,3519753,5197538,1975387,9753874",
            "continuity8": "13519753,35197538,51975387,19753874",
            "continuity9": "135197538,351975387,519753874",
            "continuity10": "1351975387,3519753874",
            "rear2": "74",
            "rear3": "874",
            "rear4": "3874",
            "rear5": "53874",
            "rear6": "753874",
            "rear7": "9753874",
            "rear8": "19753874",
            "rear9": "519753874",
            "rear10": "3519753874"
        }
      }`)

If I use the auto pipeline technology you mentioned, how should I write the code to write the data to a Redis cluster? Is this the correct way to write it?

expireInSecond := 180
for i, phone := range data {
  go func() {
    client.Do(ctx, client.B().JsonSet().Key(strconv.Itoa(i)).Path("$").Value(phone).Build()).Error()
  }()
}
rueian commented 1 year ago

Yes, your code is correct and it looks good to me.

The little difference between the benchmark code I provided and yours is that the former one will reuse goroutines instead of keep creating and destroying goroutines.

totorofly commented 1 year ago

Yes, your code is correct and it looks good to me.

The little difference between the benchmark code I provided and yours is that the former one will reuse goroutines instead of keep creating and destroying goroutines.

I adjusted the code to be:

expireInSecond := 180
for i, phone := range data {
  eg.Go(func() error {
    client.Do(ctx, client.B().JsonSet().Key(strconv.Itoa(i)).Path("$").Value(phone).Build()).Error()
    if err != nil {
      panic(err.Error())
     }
     //设置存储的有效期
    if expireInSecond != 0 {
      err = client.Do(ctx, client.B().Expire().Key(key).Seconds(expireInSecond).Build()).Error()
      if err != nil {
     fmt.Println(err.Error())
      }
    }
   return err
  })
}
eg.Wait()

The speed of writing to Redis has indeed become much faster. I was able to write about 150,000 pieces of data in only about 127 seconds, while previously it took more than ten minutes to write them one by one.

So I wrote the above Redis writing method as an API for other programs to call. However, when multiple programs call this API concurrently, it seems that separate and independent pipelines are established for different calls, resulting in multiple pipelines working simultaneously. This leads to a significant increase in CPU usage, up to over 95% (on an 8-core CPU with 32GB of memory).

What is the best way for me to optimize this situation?

rueian commented 1 year ago

The new code make me confused. Did you put eg.Wait() inside the loop? I believe it should be outside the loop at least to utilize the auto pipelining technique.

Besides that, you should use DoMulti() to send JSON.SET and EXPIRE commands at once.

it seems that separate and independent pipelines are established for different calls, resulting in multiple pipelines working simultaneously.

No, there won’t be independent pipelines established for different calls. They shared same pipelines to each Redis nodes.

This leads to a significant increase in CPU usage, up to over 95%

As mentioned previously, keep recreating goroutines is the most easiest way doing things like this. However, this way will cause some CPU overhead. You can try to reuse goroutines to remove those overhead.

Another thing is that handling remote calls from external programs is very expensive. You should take some pprof snapshots for further improvement.

totorofly commented 1 year ago

The new code make me confused. Did you put eg.Wait() inside the loop? I believe it should be outside the loop at least to utilize the auto pipelining technique.

Besides that, you should use DoMulti() to send JSON.SET and EXPIRE commands at once.

it seems that separate and independent pipelines are established for different calls, resulting in multiple pipelines working simultaneously.

No, there won’t be independent pipelines established for different calls. They shared same pipelines to each Redis nodes.

This leads to a significant increase in CPU usage, up to over 95%

As mentioned previously, keep recreating goroutines is the most easiest way doing things like this. However, this way will cause some CPU overhead. You can try to reuse goroutines to remove those overhead.

Another thing is that handling remote calls from external programs is very expensive. You should take some pprof snapshots for further improvement.

eg.Wait()

Sorry, it was a typo. Actually, the eg.Wait() should be placed outside of the loop.In my actual program, eg.Wait() is placed outside of the loop.

Based on the optimization ideas you provided, I will try to optimize it again.

totorofly commented 1 year ago

The new code make me confused. Did you put eg.Wait() inside the loop? I believe it should be outside the loop at least to utilize the auto pipelining technique.

Besides that, you should use DoMulti() to send JSON.SET and EXPIRE commands at once.

for _, resp := range client.DoMulti(ctx, client.JsonSet().Key(key).Path("$").Value(string(jsons)).Build(), client.Expire().Key(key).Seconds(expireInSecond).Build()) {
if err = resp.Error(); err != nil {
    panic(err)
  }
}

I changed the way the code above is written, and the program can run properly. However, I am not sure if what I wrote is correct.Is this written correctly?

totorofly commented 1 year ago

Another thing is that handling remote calls from external programs is very expensive. You should take some pprof snapshots for further improvement.

I don't have a very good understanding of the working mechanism of Redis pipeline. When you mentioned "handling remote calls from external programs is very expensive," my own understanding is that in a high-concurrency API request scenario, even if goroutines are reused (creating a goroutine pool, waiting in the queue when there are no available goroutines), it will still cause a high workload for Redis pipeline?

If my understanding is correct, would it be better to add message queuing such as RocketMQ before writing to Redis, cache all API request data to the message queue first, and then gradually write the data to be written from the message queue according to the performance level that the Redis server can withstand?

rueian commented 1 year ago

The new code make me confused. Did you put eg.Wait() inside the loop? I believe it should be outside the loop at least to utilize the auto pipelining technique. Besides that, you should use DoMulti() to send JSON.SET and EXPIRE commands at once.

for _, resp := range client.DoMulti(ctx, client.JsonSet().Key(key).Path("$").Value(string(jsons)).Build(), client.Expire().Key(key).Seconds(expireInSecond).Build()) {
if err = resp.Error(); err != nil {
  panic(err)
  }
}

I changed the way the code above is written, and the program can run properly. However, I am not sure if what I wrote is correct.Is this written correctly?

Yes, it is correct.

When you mentioned "handling remote calls from external programs is very expensive,"

I mean receiving and decoding those remote requests are very expensive. They may had already taken a lot of your CPU usage.

If my understanding is correct, would it be better to add message queuing such as RocketMQ before writing to Redis

Your understanding is correct. However, your message queuing needs to be faster than redis, or it won’t be better.

totorofly commented 1 year ago

Thank you for your answer. I will try to optimize and upgrade in the next two weeks. I will close the issue for now, and if I have any new questions, I will reopen it. Thank you.