Closed xurwxj closed 6 years ago
Not implemented yet. Can you make feature request
can add a interface func in types.go
type Packets interface {
PacketCountQoS0([]byte) (int, error)
PacketCountQoS12([]byte) (int, error)
PacketCountUnAck([]byte) (int, error)
PacketStoreQoS0([]byte, *PersistedPacket) error
PacketStoreQoS12([]byte, *PersistedPacket) error
PacketsForEachQoS0([]byte, interface{}, PacketLoader) error
PacketsForEachQoS12([]byte, interface{}, PacketLoader) error
PacketsForEachUnAck([]byte, interface{}, PacketLoader) error
PacketsStore([]byte, PersistedPackets) error
//PacketsStoreUnAck([]byte, PersistedPackets) error
PacketsDelete([]byte) error
ConnectionStore(string, string, string, string)
}
then in clients/sessions.go
// OnConnection implements transport.Handler interface and handles incoming connection
func (m *Manager) OnConnection(conn transport.Conn, authMngr *auth.Manager) (err error) {
defer func() {
if r := recover(); r != nil {
fmt.Println(r)
err = errors.New("panic")
}
}()
cn := connection.New(
connection.OnAuth(m.onAuth),
connection.NetConn(conn),
connection.TxQuota(types.DefaultReceiveMax),
connection.RxQuota(int32(m.Options.ReceiveMax)),
connection.Metric(m.Systree.Metric().Packets()),
connection.RetainAvailable(m.Options.RetainAvailable),
connection.OfflineQoS0(m.Options.OfflineQoS0),
connection.MaxTxPacketSize(types.DefaultMaxPacketSize),
connection.MaxRxPacketSize(m.Options.MaxPacketSize),
connection.MaxRxTopicAlias(m.Options.MaxTopicAlias),
connection.MaxTxTopicAlias(0),
connection.KeepAlive(m.Options.ConnectTimeout),
connection.Persistence(m.persistence),
)
var connParams *connection.ConnectParams
var ack *mqttp.ConnAck
if ch, e := cn.Accept(); e == nil {
for dl := range ch {
var resp mqttp.IFace
switch obj := dl.(type) {
case *connection.ConnectParams:
connParams = obj
resp, e = m.processConnect(cn, connParams, authMngr)
case connection.AuthParams:
resp, e = m.processAuth(connParams, obj)
case error:
e = obj
default:
e = errors.New("unknown")
}
if e != nil || resp == nil {
cn.Stop(e)
cn = nil
return nil
} else {
if resp.Type() == mqttp.AUTH {
cn.Send(resp)
} else {
ack = resp.(*mqttp.ConnAck)
break
}
}
}
}
m.persistence.ConnectionStore(fmt.Sprintf("%s", conn.RemoteAddr()), string(connParams.Username), string(connParams.Password), connParams.AuthMethod)
m.newSession(cn, connParams, ack, authMngr)
return nil
}
with above, u can do your things you want to
Why remote address need to be persisted?
i need to know where is client connected from
So it should be either in SYSTREE or metrics plugin
oh, i should take a look at systree. i will not use metrics plugin when it seems with dependence on promethus
how to get client ip when connect, subcribe or publish?