local skynet = require "skynet"
require "skynet.manager"
local redis = require "skynet.db.redis"
local json = require "json"
local snax = require "skynet.snax"
local CMD = {}
local FUNC = {}
local DATA = {
mqconn = nil, --redis
config = nil, -- redis conf
dbconn = nil,
chnlisteners = {},
listeners = {}
}
local CMD = {}
local tunpack = table.unpack
local tinsert = table.insert
local function disconnect(db)
if db then
local ok, ret = skynet.pcall(db.disconnect, db)
if not ok then
skynet.error("redis disconnect failed", ret)
end
end
end
function FUNC.subscribe(...)
local chns = {...}
for _, c in pairs(chns) do
DATA.chnlisteners[c] = c
end
local db = DATA.mqconn
local ok, ret = pcall(db.subscribe, db, ...)
if not ok then
DATA.mqconn = nil
disconnect(db)
return nil
end
return ret
end
function FUNC.unsubscribe(...)
local chns = {...}
for _, c in pairs(chns) do
DATA.chnlisteners[c] = nil
end
local db = DATA.mqconn
local ok, ret = pcall(db.unsubscribe, db, ...)
if not ok then
DATA.mqconn = nil
disconnect(db)
return nil
end
return ret
end
function FUNC.psubscribe(channel, recevier)
end
function FUNC.publish(channel, v)
local db = DATA.dbconn
return db:publish(channel, v)
end
function CMD.connect()
CMD.connmq()
CMD.conndb()
end
function CMD.connmq()
local db = DATA.mqconn
DATA.mqconn = nil
disconnect(db)
local ok, db = skynet.pcall(redis.watch, DATA.config)
if ok then
local chs = {}
for k, v in pairs(DATA.chnlisteners) do
table.insert(chs, k)
end
DATA.mqconn = db
skynet.info("redis conn ok", db)
skynet.info("mq subscribecount", #chs)
skynet.info("mq subscribe", table.unpack(chs))
local ifok, ret = skynet.pcall(db.subscribe, db, table.unpack(chs))
if not ifok then
DATA.mqconn = nil
skynet.error("redis subscribe failed[api error]", ret)
disconnect(db)
end
else
skynet.error("redis conn failed[watch error]", ok, db)
end
end
function CMD.conndb()
local db = DATA.dbconn
DATA.dbconn = nil
disconnect(db)
local ok, dbpulish = skynet.pcall(redis.connect, DATA.config2)
if ok then
DATA.dbconn = dbpulish
skynet.info("redis conn ok", dbpulish)
else
skynet.error("redis conn failed[connect error]", db)
end
end
function CMD.command(cmd, ...)
local f = FUNC[cmd]
if f == nil then
skynet.error("redis cmd not support", cmd, ...)
return nil
end
skynet.info("redismq##########################", cmd, ...)
local ret, result = skynet.pcall(f, ...)
if ret == false then
skynet.error("redis cmd failed ", result)
return nil
end
return ret, result
end
function CMD.waitMessage(q)
--while true do
local mq = DATA.mqconn
if mq then
local ok, msg, c = skynet.pcall(mq.message, mq)
if not ok then
skynet.error("subscribed failed", msg, c)
DATA.mqconn = nil
disconnect(mq)
end
return ok, msg, c
end
return false
end
function CMD.keepalive(q)
if DATA.dbconn then
local db = DATA.dbconn
local ok, ret = skynet.pcall(db.get, db, "foo")
if not ok then
skynet.info("ping failed reconnect it", ret)
DATA.dbconn = nil
disconnect(db)
CMD.conndb()
end
else
CMD.conndb()
end
if DATA.mqconn then
local db = DATA.mqconn
local ok, ret = skynet.pcall(db.subscribe, db, "foo")
if not ok then
skynet.info("ping failed reconnect it", ret)
DATA.mqconn = nil
disconnect(db)
CMD.connmq()
end
else
CMD.connmq()
end
end
function CMD.setconfig(config, config2)
DATA.config = config
DATA.config2 = config2
如题,这个现象多个环境上可用重新。 下面是我的代码。
外部定时调用keepalive 和waitMessage。 看了比较久,确实看不出来问题了。请帮忙看看。
local skynet = require "skynet" require "skynet.manager" local redis = require "skynet.db.redis" local json = require "json" local snax = require "skynet.snax"
local CMD = {} local FUNC = {} local DATA = { mqconn = nil, --redis config = nil, -- redis conf dbconn = nil, chnlisteners = {}, listeners = {} }
local CMD = {}
local tunpack = table.unpack local tinsert = table.insert
local function disconnect(db) if db then local ok, ret = skynet.pcall(db.disconnect, db)
end
function FUNC.subscribe(...) local chns = {...}
end
function FUNC.unsubscribe(...) local chns = {...} for _, c in pairs(chns) do DATA.chnlisteners[c] = nil end local db = DATA.mqconn local ok, ret = pcall(db.unsubscribe, db, ...)
end
function FUNC.psubscribe(channel, recevier) end
function FUNC.publish(channel, v) local db = DATA.dbconn return db:publish(channel, v) end
function CMD.connect() CMD.connmq() CMD.conndb() end
function CMD.connmq() local db = DATA.mqconn
end
function CMD.conndb() local db = DATA.dbconn
end
function CMD.command(cmd, ...) local f = FUNC[cmd] if f == nil then skynet.error("redis cmd not support", cmd, ...) return nil end
end
function CMD.waitMessage(q) --while true do local mq = DATA.mqconn if mq then local ok, msg, c = skynet.pcall(mq.message, mq)
end
function CMD.keepalive(q) if DATA.dbconn then local db = DATA.dbconn local ok, ret = skynet.pcall(db.get, db, "foo") if not ok then skynet.info("ping failed reconnect it", ret) DATA.dbconn = nil disconnect(db) CMD.conndb() end else CMD.conndb() end
end
function CMD.setconfig(config, config2) DATA.config = config DATA.config2 = config2
end
return CMD