➜ observer go run main.go
observer 1 got data at 2021-07-31 08:45:32.99711044 +0800 CST m=+0.000034798
observer 2 got data at 2021-07-31 08:45:32.997190807 +0800 CST m=+0.000115153
&{} &{}
internal/service/pubsub/subscribe.go
// Subscribe subscribes to a channel.
func (s *Service) Subscribe(sub message.Subscriber, ev *event.Subscription) bool {
if conn, ok := sub.(service.Conn); ok && !conn.CanSubscribe(ev.Ssid, ev.Channel) {
return false
}
// Add the subscription to the trie
s.trie.Subscribe(ev.Ssid, sub)
// Broadcast direct subscriptions
s.notifier.NotifySubscribe(sub, ev)
return true
}
注意这里的参数sub message.Subscriber 是一个interface
internal/message/sub.go
// Subscriber is a value associated with a subscription.
type Subscriber interface {
ID() string
Type() SubscriberType
Send(*Message) error
}
internal/service/pubsub/publish.go
// Publish publishes a message to everyone and returns the number of outgoing bytes written.
func (s *Service) Publish(m *message.Message, filter func(message.Subscriber) bool) (n int64) {
size := m.Size()
for _, subscriber := range s.trie.Lookup(m.Ssid(), filter) {
subscriber.Send(m)
if subscriber.Type() == message.SubscriberDirect {
n += size
}
}
return
}
internal/broker/conn.go
// Conn represents an incoming connection.
type Conn struct {
sync.Mutex
tracked uint32 // Whether the connection was already tracked or not.
socket net.Conn // The transport used to read and write messages.
luid security.ID // The locally unique id of the connection.
guid string // The globally unique id of the connection.
service *Service // The service for this connection.
subs *message.Counters // The subscriptions for this connection.
measurer stats.Measurer // The measurer to use for monitoring.
limit *rate.Limiter // The read rate limiter.
keys *keygen.Service // The key generation provider.
connect *event.Connection // The associated connection event.
username string // The username provided by the client during MQTT connect.
links map[string]string // The map of all pre-authorized links.
}
// ID returns the unique identifier of the subsriber.
func (c *Conn) ID() string {
return c.guid
}
// Type returns the type of the subscriber
func (c *Conn) Type() message.SubscriberType {
return message.SubscriberDirect
}
// Send forwards the message to the underlying client.
func (c *Conn) Send(m *message.Message) (err error) {
defer c.MeasureElapsed("send.pub", time.Now())
packet := mqtt.Publish{
Header: mqtt.Header{QOS: 0},
Topic: m.Channel, // The channel for this message.
Payload: m.Payload, // The payload for this message.
}
_, err = packet.EncodeTo(c.socket)
return
}
已经离线的peer(集群中的其他emitter)
internal/service/cluster/peer.go
// DeadPeer represents a peer which is no longer online
type deadPeer struct {
name mesh.PeerName
}
// ID returns the unique identifier of the subsriber.
func (p *deadPeer) ID() string {
return p.name.String()
}
// Type returns the type of the subscriber.
func (p *deadPeer) Type() message.SubscriberType {
return message.SubscriberOffline
}
// Send forwards the message to the remote server.
func (p *deadPeer) Send(m *message.Message) error {
return nil
}
正常的peer(集群中的其他emitter)
internal/service/cluster/peer.go
// Peer represents a remote peer.
type Peer struct {
sync.Mutex
sender mesh.Gossip // The gossip interface to use for sending.
name mesh.PeerName // The peer name for communicating.
frame message.Frame // The current message frame.
subs *message.Counters // The SSIDs of active subscriptions for this peer.
activity int64 // The time of last activity of the peer.
cancel context.CancelFunc // The cancellation function.
}
// ID returns the unique identifier of the subsriber.
func (p *Peer) ID() string {
return p.name.String()
}
// Type returns the type of the subscriber.
func (p *Peer) Type() message.SubscriberType {
return message.SubscriberRemote
}
// Send forwards the message to the remote server.
func (p Peer) Send(m message.Message) error {
p.Lock()
defer p.Unlock()
// Make sure we don't send to a dead peer
if p.IsActive() {
p.frame = append(p.frame, *m)
}
return nil
模式都是来源于生活,在我们日常生活中观察者模式也是随处可见,最常见的一个例子就是上图中的模式,你订阅了一份杂志,当每次有新的杂志出来之后报社都会将最新的杂志寄给你,这样你不会漏掉每一期新的杂志,这就是最简单的观察者模式。
接下来就是抽象的表达这个接口了。整个设计的原则就是抽象和具体分离, 所以在书上我们经常会看到 interface xxx, Concert xxx的字样,这里interface就是抽象, concert就是对interface的具体实现。
下面我们就用golang来实现这个结构:
在业务上可以这样使用:
输出:
这样我们再新加一个concrete Obsever就比较简单了,observer1, observer2 以及sub类都不用动, 只需要增加一个, 完全符合开闭原则
observer in real world
在消息队列中经常使用这种模式,subscriber的时候将subscriber放到一个内部的结构体中,当有新的消息到达时和sub的topic的进行check,如果匹配就通知对应的subscriber。
我们以emmiter(一个简单的mqtt server实现)为例。一下时subscribe的是实现:
注意这里的参数
sub message.Subscriber
是一个interface一下时有新的消息到达时的逻辑,在sub.tire中查找subscriber,然后使用调用上面接口体中的Send方法。
这里就是一个observer模式的具体应用,我们可以添加不用类型的Subscriber, 目前在emitter中由以下的几种subscirber。
// Peer represents a remote peer. type Peer struct { sync.Mutex sender mesh.Gossip // The gossip interface to use for sending. name mesh.PeerName // The peer name for communicating. frame message.Frame // The current message frame. subs *message.Counters // The SSIDs of active subscriptions for this peer. activity int64 // The time of last activity of the peer. cancel context.CancelFunc // The cancellation function. }
// ID returns the unique identifier of the subsriber. func (p *Peer) ID() string { return p.name.String() }
// Type returns the type of the subscriber. func (p *Peer) Type() message.SubscriberType { return message.SubscriberRemote }
// Send forwards the message to the remote server. func (p Peer) Send(m message.Message) error { p.Lock() defer p.Unlock()
}