Closed lucasjinreal closed 2 years ago
通過訂閱可以看到,那個錯誤密碼的客戶端成功的發過來了消息,鑑權失敗
@jinfagang 试试这样:
switch client.Version() {
case packets.Version5:
return codes.NewError(codes.BadUserNameOrPassword)
case packets.Version311, packets.Version31:
return codes.NewError(codes.V3BadUsernameorPassword)
default:
return codes.NewError(codes.UnsupportedProtocolVersion)
}
当时写这个例子的时候还没有支持 packets.Version31,逻辑漏了。这个example有点旧了,如需要定制化,参考插件的方式来做吧。
@DrmagicE 感謝及時回復,現在不會連接了。
但還有個問題:這裏拋出的錯誤,是不是可以把ErrorString補全,不然只從一個ErrorCode不好定位。
不是我不想用Plugin,是不會啊,感覺這個example倒是挺簡單的,我可以直接把gmqtt當成庫來用,然後Inject我想要的Hook。
找了一大圈,就感覺這個庫還比較靠譜,其他的都是坑爹玩意兒。。。希望滷煮可以一直維護下去
@DrmagicE 對了,大佬,我想用token來鑑權,怎麼寫代碼。。opts.SetToken("alice") ??沒有這個方法。。。
@jinfagang MQTT里面没有token的概念,这个需要使用者自己封装。ErrorString..等有空我找时间补一下。。
@DrmagicE 那是我是不是可以 把这个setPassword设置为token,然后鉴权的时候手动处理
@jinfagang 完全可以
@DrmagicE 用gmqtt已经开发到某个程度,发现一个问题:
我的需求是现在要对这段代码进行hack:
var onMsgArrived server.OnMsgArrived = func(ctx context.Context, client server.Client, req *server.MsgArrivedRequest) error {
换句话说,我想对消息到达事件进行拦截,做一个拦截重写,然后在这里面对消息进行修改(比如,如果发过来一个base64编码的图片,我就像先把图片保存下来,然后用保存后的url替换掉base64再发回去),那么问题来了。
要怎么再这里重写发送的逻辑啊?
我现在写了一个方法去覆盖,但是最后那一步逻辑不知道怎么写了:
func GlobalInterceptors(ctx context.Context, client server.Client, req *server.MsgArrivedRequest) error {
// version := client.Version()
topic := req.Message.Topic
clientId := client.ClientOptions().ClientID
payload := string(req.Message.Payload)
if strings.HasPrefix(clientId, "supereme_") {
// do nothing
return nil
}
topic = strings.ToLower(topic)
if strings.HasPrefix(topic, "messages/") || strings.HasPrefix(topic, "events/") {
// do
log.Infof("payload: %s", payload)
} else if strings.HasPrefix(topic, "filemessages/") {
// save file
room := strings.Split(topic, "/")[1]
userAddr := db.GetUserIdBySessionId(clientId)
chatMsg := utils.SaveAndGetMessageObject(payload, room, userAddr)
// String newp = JsonParser.toJson(msg);
// publishPacket.setPayload(ByteBuffer.wrap(newp.getBytes(StandardCharsets.UTF_8)));
// publishPacket.setTopic("messages/" + room);
} else if strings.HasPrefix(topic, "personalevents/") || strings.HasPrefix(topic, "invitations/") {
}
return nil
}
注释掉的那几句是参考的JAVA代码,我看其他的mqtt包都有这个重写的方式,gmqtt支持吗?
@jinfagang 改掉 MsgArrivedRequest.Message就可以了
// OnMsgArrived will be called when receive a Publish packets.It provides the ability to modify the message before topic match process.
// The return error is for V5 client to provide additional information for diagnostics and will be ignored if the version of used client is V3.
// If the returned error type is *codes.Error, the code, reason string and user property will be set into the ack packet(puback for qos1, and pubrel for qos2);
// otherwise, the code,reason string will be set to 0x80 and error.Error().
type OnMsgArrived func(ctx context.Context, client Client, req *MsgArrivedRequest) error
// MsgArrivedRequest is the input param for OnMsgArrived hook.
type MsgArrivedRequest struct {
// Publish is the origin MQTT PUBLISH packet, it is immutable. DO NOT EDIT.
Publish *packets.Publish
// Message is the message that is going to be passed to topic match process.
// The caller can modify it.
Message *gmqtt.Message
// IterationOptions provides the the ability to change the options of topic matching process.
// In most of cases, you don't need to modify it.
// The default value is:
// subscription.IterationOptions{
// Type: subscription.TypeAll,
// MatchType: subscription.MatchFilter,
// TopicName: msg.Topic,
// }
// The user of this field is the federation plugin.
// It will change the Type from subscription.TypeAll to subscription.subscription.TypeAll ^ subscription.TypeShared
// that will prevent publishing the shared message to local client.
IterationOptions subscription.IterationOptions
}
@DrmagicE 要怎么吧一个 struct编码成 gmqtt.Message
啊
@jinfagang 你只要改Payload的话,应该这样就可以了。
func GlobalInterceptors(ctx context.Context, client server.Client, req *server.MsgArrivedRequest) error {
// version := client.Version()
topic := req.Message.Topic
clientId := client.ClientOptions().ClientID
payload := string(req.Message.Payload)
if strings.HasPrefix(clientId, "supereme_") {
// do nothing
return nil
}
topic = strings.ToLower(topic)
if strings.HasPrefix(topic, "messages/") || strings.HasPrefix(topic, "events/") {
// do
log.Infof("payload: %s", payload)
} else if strings.HasPrefix(topic, "filemessages/") {
// save file
room := strings.Split(topic, "/")[1]
userAddr := db.GetUserIdBySessionId(clientId)
chatMsg := utils.SaveAndGetMessageObject(payload, room, userAddr)
// String newp = JsonParser.toJson(msg);
// publishPacket.setPayload(ByteBuffer.wrap(newp.getBytes(StandardCharsets.UTF_8)));
// publishPacket.setTopic("messages/" + room);
} else if strings.HasPrefix(topic, "personalevents/") || strings.HasPrefix(topic, "invitations/") {
}
req.Message.Payload = 你要的值
return nil
}
@DrmagicE 大佬,感觉是得把struct encode成 byte[]对吗? golang里面我要把一个struct encode成这个可以接受的byte[]应该怎么转
@jinfagang 这个超出gmqtt的范畴了,你可以尝试用json.Marshal。
@DrmagicE 我用编码之后可以了,大佬,最后一个问题啊,我发现我在 进行鉴权的时候,还有一个需求就是以服务器的身份去给一个topic发消息,这样的话我就需要在后台里面起一个Client去链接我自己,这个有办法可以做到吗?
参考JAVA的代码:
public class ChatClient {
public static void connectAndPublish(String payload, String topic){
connectAndPublish(payload, topic, false);
}
public static void connectAndPublish(String payload, String topic, boolean retain){
try {
final Mqtt3BlockingClient client = Mqtt3Client.builder()
.identifier("supreme_" + UUID.randomUUID().toString())
.serverHost("localhost")
.buildBlocking();
client.connect();
client.publishWith().topic(topic).retain(retain).qos(MqttQos.AT_LEAST_ONCE).payload(payload.getBytes()).send();
client.disconnect();
}catch (Exception e){
e.printStackTrace();
}
}
}
就是起一个client,然后非常简单的publish一个payload到一个指定的topic上
@jinfagang 你需要用到 Publisher: https://github.com/DrmagicE/gmqtt/blob/9bee1682dd2f54199de72736c45d85e3aa21adba/server/server.go#L90 这个用插件做会比较好,admin插件有例子: https://github.com/DrmagicE/gmqtt/blob/9bee1682dd2f54199de72736c45d85e3aa21adba/plugin/admin/publish.go#L21 或者试试看 example/service/main.go 里面的: https://github.com/DrmagicE/gmqtt/blob/9bee1682dd2f54199de72736c45d85e3aa21adba/examples/service/main.go#L59
by the way.最好还是用插件..
@DrmagicE 这个Hooks跟插件有啥本质的区别吗?插件看起来我不太懂怎么去生成,看不懂,hook看起来还比较好懂。
@DrmagicE 我主要是需要再 onBaseAuth这个hook里面发布,我是直接实例化一个 pub:= srv.Publisher() 吗?
@DrmagicE 插件的好处是啥呢,一大堆文件看起来很恐怖啊,我不需要太复杂的路基,就简单的消息拦截一下就可以了。
其他的我想尽量保持精简,我看了下例子:
mp := server.NewMockPublisher(ctrl)
pub := &publisher{
a: &Admin{
publisher: mp,
},
}
msg := &gmqtt.Message{
QoS: 1,
Retained: true,
Topic: "topic",
Payload: []byte("abc"),
ContentType: "ct",
CorrelationData: []byte("co"),
MessageExpiry: 1,
PayloadFormat: 1,
ResponseTopic: "resp",
UserProperties: []packets.UserProperty{
{
K: []byte("K"),
V: []byte("V"),
},
},
}
mp.EXPECT().Publish(msg)
_, err := pub.Publish(context.Background(), &PublishRequest{
TopicName: msg.Topic,
Payload: string(msg.Payload),
Qos: uint32(msg.QoS),
Retained: msg.Retained,
ContentType: msg.ContentType,
CorrelationData: string(msg.CorrelationData),
MessageExpiry: msg.MessageExpiry,
PayloadFormat: uint32(msg.PayloadFormat),
ResponseTopic: msg.ResponseTopic,
UserProperties: []*UserProperties{
{
K: []byte("K"),
V: []byte("V"),
},
},
})
a.Nil(err)
这两个有啥区别啊?
mp.EXPECT().Publish(msg)
_, err := pub.Publish(context.Background(), &PublishRequest{ 我应该用哪个、 MockPublisher吗?
@DrmagicE 理论上来讲,对于用户来说,这里面的所有PublishRequest的信息我应该都不需要关心,我只需要关注payload和topic即可,这样的话,对于用户来说,就只需要初始化一个Publisher,然后把payload和topic填上,就可以发不出去了,这就可以在server上做任何事情,然后主动发给客户端了。大佬您觉得呢? 比如用户login成功之后,我要发过去一个系统消息,把用户的信息同步过去,就很方便了。
@DrmagicE I made a global variable : and init in listen start. But when call it in another function, the message can not be sent out.
var globalPub server.Publisher
s := server.New(
server.WithTCPListener(ln),
server.WithHook(hooks),
server.WithLogger(l),
server.WithConfig(config.DefaultConfig()),
)
globalPub = s.Publisher()
Call it like:
func ChatClientConnectAndPublish(payload []byte, topic string) {
// I need using this ChatClient send payload
// log.Infof("------> publish msg: %s", topic)
globalPub.Publish(&gmqtt.Message{
Topic: topic,
Payload: payload,
QoS: packets.Qos2,
})
}
I found client can not receive this message at all, even I set QoS to 2.Why? How should I make it puslish msg success?
Did your client subscribe to a related topic before publishing the message? @jinfagang
@DrmagicE I debugged a long time, And I try using paho client directly as client send msgs out, it actually sent msg now, (server can log this message out). But question is, from logging I notice that, the subcsribe happend after publish:
Is that published before subscribe so that client not able to receive that message anymore?
If so, what if I want sent out a message to client right after it login? Since login is happened in Authorize step, it seems happend befor subscribe any channels (if client doens't authorize, it would not able subscribe anything right? )
Then that's conflict, what should I do?
PS: I found no matter I set time.Sleep before publish, the subscrib always happened after publish, why?
@jinfagang It depends on your client implementation. For instance, there is a DefaultPublishHandler
for paho client to handle messages without subscribing. My advice is to check out if your client has such kinds of features.
@DrmagicE It's not about client, the question is about order of Subsribe and Publish on server. I tried paho in go, and mqtt_client in flutter, both of them can not receive msg.
The reason is simple: subscribe happend after publish, how can I solve this order on server side? How to control the order subscribe and auth in gmqtt?
theoritically, subscribe should happend as fast as authorize isn't it? But now, it always after aothorize, even I spend time to init a paho client to sent out a message. It still doesn't subscribe
BTW, the handler you pointed out is just about PRINT messgae out, it's a function to solve your subsribed msgs.
this is my test client:
package main
import (
"fmt"
"os"
"os/signal"
"syscall"
"time"
mqtt "github.com/eclipse/paho.mqtt.golang"
)
var f mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
fmt.Printf("TOPIC: %s\n", msg.Topic())
fmt.Printf("MSG: %s\n", msg.Payload())
}
func main() {
opts := mqtt.NewClientOptions().AddBroker("tcp://localhost:1884").SetClientID("fuckitsme_subber")
opts.SetKeepAlive(2 * time.Second)
opts.SetPingTimeout(1 * time.Second)
opts.SetUsername("jintian")
opts.SetPassword("119588")
c := mqtt.NewClient(opts)
if token := c.Connect(); token.Wait() && token.Error() != nil {
panic(token.Error())
}
if token := c.Subscribe("archivesmyid/fuckitsme_subber", 2, f); token.Wait() && token.Error() != nil {
fmt.Println(token.Error())
os.Exit(1)
}
keepAlive := make(chan os.Signal)
signal.Notify(keepAlive, os.Interrupt, syscall.SIGTERM)
<-keepAlive
}
Interesting: On JAVA server, seems authorized publish also before subscribe, but client can receieve msg:
God damn it, I know why.....
不知道是不是我打開方式不對,我嘗試運行例子,並沒有按照我預料的那樣運行,主要是鑑權部分。
我用這個代碼來進行鑑權:
講道理,如果密碼不對,應該直接把這個鏈接掐掉,但是現在,我用錯誤的密碼會跑出一個錯誤,但是這個錯誤並不會把鏈接掐掉,而是可以繼續發消息???!!! 這就很神奇了。
這是我的Client, paho的:
這裏我設置的用戶名和密碼是錯誤的,但是竟然能夠通過鑑權,這是log:
可以看到,他跑出了一個錯誤,但是緊接着,又登錄成功了。
我他媽就跪了啊,這還怎麼鑑權??