Open decanus opened 5 years ago
@decanus I need to better understand how it would work.
Currently, we have two flows: (1) incoming messages and (2) outgoing messages.
In the case of (1), we just unmarshal protobuf payload to mvds Payload
struct. In the case of (2), we call Node.AppendMessage()
first and later mvds' Node
calls Node.Transport.Send()
.
How does Subscribe()
fit into this picture?
@adambabik, this function returns a channel for receiving messages. Right now we unmarshall the payload in the transport, then pick out all the messages before MVDS receives it. This function simply pushes all the messages to the subscriber after MVDS has received them.
@decanus and how MVDS should receive that message? The problem I see is that the current flow is synchronous. We receive a message from Whisper, we push it through all layers (transport, encryption, data sync, application), done. The trigger is the consumer calling Retrieve*
method on Messenger
.
If we want to use the subscription, I believe that the message would still be received from Whisper, go through transport, encryption and data sync layers but after that the flow would be interrupted and switched to async mode. At some point, the subscriber would receive the MVDS messages from the channel.
It would work fine if we had a stream-based interface here but we don't for now. So this needs to wait I am afraid.
@adambabik, yeah it would flow through everything. Like it does before but we don’t get the message in a hacky way anymore by taking it out before we pass it on. We can discuss changing it to a stream api later too.
@decanus ok, so I believe we are talking about this fragment:
func (d *DataSync) Handle(sender *ecdsa.PublicKey, payload []byte) [][]byte {
// redacted
if err != nil || !datasyncMessage.IsValid() {
// redacted
} else {
logger.Debug("handling datasync message")
// datasync message
for _, message := range datasyncMessage.Messages {
payloads = append(payloads, message.Body)
}
if d.sendingEnabled {
// it calls d.DataSyncNodeTransport.AddPacket
d.add(sender, datasyncMessage)
}
}
return payloads
}
where we pluck messages from the payload before it is passed to the mvds.
With Subscribe
, it would look like this:
func (d *DataSync) Handle(sender *ecdsa.PublicKey, payload []byte) [][]byte {
// redacted
if err != nil || !datasyncMessage.IsValid() {
// redacted
} else {
logger.Debug("handling datasync message")
if d.sendingEnabled {
subscription := d.Subscribe()
// it calls d.DataSyncNodeTransport.AddPacket
d.add(sender, datasyncMessage)
allMessagesLoop:
for {
// It won't work because there are many goroutines and this can exit too early.
select {
case m := <-subscription:
payloads = append(payloads, m)
default:
d.Unsubscribe()
break forLoop
}
}
} else {
// Handles a case when the node does not send mvds messages
// but is capable of handle received mvds messages.
for _, message := range datasyncMessage.Messages {
payloads = append(payloads, message.Body)
}
}
}
return payloads
}
As you can see it is neither pretty nor it can work in the current flow. I agree that mvds should return messages but in the current approach we need a different interface that could work like this:
func (d *DataSync) Handle(sender *ecdsa.PublicKey, payload []byte) [][]byte {
// redacted
if err != nil || !datasyncMessage.IsValid() {
// redacted
} else {
logger.Debug("handling datasync message")
if d.sendingEnabled {
messages := d.DataSyncNodeTransport.ProcessPayload(sender, datasyncMessage)
payloads = append(payloads, messages...)
} else {
// Handles a case when the node does not send mvds messages
// but is capable of handle received mvds messages.
for _, message := range datasyncMessage.Messages {
payloads = append(payloads, message.Body)
}
}
}
return payloads
}
@adambabik I don't think it would be in the handle message, as that gets called everytime a packet is received. The call to the subscription should happen elsewhere in some go routine.
Update the MVDS usage to use the new subscription feature as discussed with @cammellos.
How this is done is documented in the usage section.