redis / rueidis

A fast Golang Redis client that supports Client Side Caching, Auto Pipelining, Generics OM, RedisJSON, RedisBloom, RediSearch, etc.
Apache License 2.0
2.34k stars 149 forks source link

Combining FT.SEARCH and JSON.SET in a Lua Script for Atomic Operations in Concurrent Goroutines #187

Closed totorofly closed 1 year ago

totorofly commented 1 year ago

When using FT.SEARCH for concurrent searches with multiple parallel goroutines, I want each independent goroutine to immediately modify the preOrderTime within the key using JSON.SET upon retrieving keys that match the query conditions in FT.SEARCH.

For example, set it to the current time + 30 seconds. When other goroutines perform FT.SEARCH, they won't obtain keys that have already been set with the current time + 30 seconds by JSON.SET, as their query time is less than the preOrderTime.

However, since I cannot combine FT.SEARCH and JSON.SET into an atomic operation, there might be a case where multiple concurrent FT.SEARCHes retrieve the same key value when under high concurrency. So, I'm wondering if it's possible to use your library and a Lua script to combine FT.SEARCH, JSON.SET, and the final key retrieval into an atomic operation.

By taking advantage of Redis' single-threaded nature, this would achieve atomicity and prevent multiple goroutines from reading the same key in FT.SEARCH.

Is this approach feasible? If I can use a Lua script, how do I use your library to call the Lua script? Could you please provide some example code?

rueian commented 1 year ago

Here is an example of how to use Lua script https://github.com/rueian/rueidis#lua-script.

totorofly commented 1 year ago

I wrote the functions fetchAndUpdatePhoneWithOccupiedTime and fetchUpdateAndDecodePhoneWithOccupiedTime. These functions can execute Lua scripts normally and return the expected results on a standalone Redis instance. However, when the environment is switched to Redis Cluster (a cluster with properly installed RediSearch and ReJSON modules), an error occurs: Error fetching and decoding ordered numbers: Blocking module command called from Lua script script: ae9e7723d25e3679d21830f4d1197cbea42484e3, on @user_script:13.

func fetchUpdateAndDecodePhoneWithOccupiedTime(ctx context.Context, options FetchOptions) ([]good_num.PhoneRedisObjWithKey, error) {
    resp, err := fetchAndUpdatePhoneWithOccupiedTime(ctx, options)
    if err != nil {
        return nil, err
    }
    return decodePhoneRedisObjects(resp)
}

func fetchAndUpdatePhoneWithOccupiedTime(ctx context.Context, options FetchOptions) (searchResult []rueidis.RedisMessage, err error) {
    jsonKey := "preOrderTime"
    luaScript := ""
    if options.OrderBy != nil {
        if strings.ToLower(options.OrderBy.Order) == "asc" {
            luaScript = `
-- KEYS: [phoneIndex]
-- ARGV: [searchQuery, offset, limit, timeToAdd, jsonKey]

local phoneIndex = KEYS[1]
local searchQuery = ARGV[1]
local offset = tonumber(ARGV[2])
local limit = tonumber(ARGV[3])
local timeToAdd = tonumber(ARGV[4])
local jsonKey = ARGV[5]
local OrderBy = ARGV[6]

-- 使用FT.SEARCH命令查找相关数据
local searchResult = redis.call('FT.SEARCH', phoneIndex, searchQuery, 'SORTBY', OrderBy, 'ASC', 'LIMIT', offset, limit)

-- 获取当前时间并加上timeToAdd秒
local currentTimePlusTimeToAdd = redis.call('TIME')[1] + timeToAdd

-- searchResult[1]是搜索结果的数量,从索引2开始是找到的key数组
-- 跳过第一个元素(搜索结果数量),从索引2开始,以2为步长遍历,即遍历key数组
for i = 2, #searchResult, 2 do
    local key = searchResult[i]
    -- 使用JSON.SET修改key中的jsonKey值
    redis.call("JSON.SET", key, "$."..jsonKey, currentTimePlusTimeToAdd)
end

return searchResult
`
            //builder := global.ASIR_RedisDB.B().FtSearch().Index(options.Index).Query(options.Query).Sortby(options.OrderBy.Field).Asc().Limit().OffsetNum(options.Offset, options.Limit)
        } else {
            luaScript = `
-- KEYS: [phoneIndex]
-- ARGV: [searchQuery, offset, limit, timeToAdd, jsonKey]

local phoneIndex = KEYS[1]
local searchQuery = ARGV[1]
local offset = tonumber(ARGV[2])
local limit = tonumber(ARGV[3])
local timeToAdd = tonumber(ARGV[4])
local jsonKey = ARGV[5]
local OrderBy = ARGV[6]

-- 使用FT.SEARCH命令查找相关数据
local searchResult = redis.call('FT.SEARCH', phoneIndex, searchQuery, 'SORTBY', OrderBy, 'DESC', 'LIMIT', offset, limit)

-- 获取当前时间并加上timeToAdd秒
local currentTimePlusTimeToAdd = redis.call('TIME')[1] + timeToAdd

-- searchResult[1]是搜索结果的数量,从索引2开始是找到的key数组
-- 跳过第一个元素(搜索结果数量),从索引2开始,以2为步长遍历,即遍历key数组
for i = 2, #searchResult, 2 do
    local key = searchResult[i]
    -- 使用JSON.SET修改key中的jsonKey值
    redis.call("JSON.SET", key, "$."..jsonKey, currentTimePlusTimeToAdd)
end

return searchResult
`
            //builder := global.ASIR_RedisDB.B().FtSearch().Index(options.Index).Query(options.Query).Sortby(options.OrderBy.Field).Desc().Limit().OffsetNum(options.Offset, options.Limit)
        }
    } else {
        luaScript = `
-- KEYS: [phoneIndex]
-- ARGV: [searchQuery, offset, limit, timeToAdd, jsonKey]

local phoneIndex = KEYS[1]
local searchQuery = ARGV[1]
local offset = tonumber(ARGV[2])
local limit = tonumber(ARGV[3])
local timeToAdd = tonumber(ARGV[4])
local jsonKey = ARGV[5]

-- 使用FT.SEARCH命令查找相关数据
local searchResult = redis.call('FT.SEARCH', phoneIndex, searchQuery, 'LIMIT', offset, limit)

-- 获取当前时间并加上timeToAdd秒
local currentTimePlusTimeToAdd = redis.call('TIME')[1] + timeToAdd

-- searchResult[1]是搜索结果的数量,从索引2开始是找到的key数组
-- 跳过第一个元素(搜索结果数量),从索引2开始,以2为步长遍历,即遍历key数组
for i = 2, #searchResult, 2 do
    local key = searchResult[i]
    -- 使用JSON.SET修改key中的jsonKey值
    redis.call("JSON.SET", key, "$."..jsonKey, currentTimePlusTimeToAdd)
end

return searchResult
`
        //builder := global.ASIR_RedisDB.B().FtSearch().Index(options.Index).Query(options.Query).Limit().OffsetNum(options.Offset, options.Limit)
    }
    script := rueidis.NewLuaScript(luaScript)
    if options.OrderBy != nil {
        searchResult, err = script.Exec(ctx, global.ASIR_RedisDB, []string{options.Index}, []string{options.Query, fmt.Sprint(options.Offset), fmt.Sprint(options.Limit), fmt.Sprint(options.OccupiedTime), jsonKey, fmt.Sprint(options.OrderBy.Field)}).ToArray()
    } else {
        searchResult, err = script.Exec(ctx, global.ASIR_RedisDB, []string{options.Index}, []string{options.Query, fmt.Sprint(options.Offset), fmt.Sprint(options.Limit), fmt.Sprint(options.OccupiedTime), jsonKey}).ToArray()
    }
    if err != nil {
        fmt.Println("Error executing script:", err)
    }

    return searchResult, err
}

It doesn't seem to be that Redis Cluster doesn't support Lua scripts or doesn't support running commands like FT.SEARCH within Lua scripts. However, I cannot pinpoint the exact reason for the issue.Did I do something wrong or overlook any information?

rueian commented 1 year ago

I have found related thread about this https://github.com/RediSearch/RediSearch/issues/249 hope this would be helpful.

rueian commented 1 year ago

Hi @dwzkit, does the above thread solve your problem?

totorofly commented 1 year ago

Hi @dwzkit, does the above thread solve your problem?

Thank you very much for your support. After some research, the conclusion is that the JSON.SET blocking command is used in the Lua script, which involves Keys on multiple nodes. Due to the characteristics of Redis Cluster, it is not allowed to perform cross-node operations (i.e., involving different hash slots) in a single Lua script. This is because, during the execution of the Lua script, requests from other clients are blocked. If the script involves Keys on multiple nodes, it may lead to a decrease in the performance of the distributed system or even cause a deadlock situation.