Closed dirkmc closed 3 years ago
It seems that the root cause of the issue is that when the messagequeue sends a message, the message may contain several blocks, but only one "Sent" event is emitted. https://github.com/ipfs/go-graphsync/blob/11d30c607e3f9062b1a3755dba34f6abcb9eaf85/messagequeue/messagequeue.go#L202-L207
When this event bubbles up to the responsemanager subscriber, it only includes the block data for the first block (not all blocks in the message) https://github.com/ipfs/go-graphsync/blob/master/responsemanager/subscriber.go#L42
Hey, so this exposes a flaw in the mappable subscriber system, and provides a pretext for refactoring the interface.
@acruikshank I am recommending a fix to the way MappableSubscriber currently works, as well as some interface changes that will make things more clear:
These are my recommended new interfaces -- please don't take these set in stone and please feel free to improve naming. (taken from: https://github.com/ipfs/go-graphsync/blob/master/notifications/types.go)
// Topic is a topic that events appear on
type Topic interface{}
// Event is a publishable event
type Event interface{}
// TopicData is data added to every message broadcast on a topic
type TopicData interface{}
// Subscriber is a subscriber that can receive events
type Subscriber interface {
OnNext(Topic, Event, ...TopicData)
OnClose(Topic, ...TopicData)
}
// TopicDataSubscriber is a subscriber that can inject data to into subscriber callbacks when events happen for a given topic
type TopicDataSubscriber interface {
Subscriber
AddTopicData(topic Topic, data TopicData)
}
// Subscribable is a stream that can be subscribed to
type Subscribable interface {
Subscribe(topic Topic, sub Subscriber) bool
Unsubscribe(sub Subscriber) bool
}
// Publisher is an publisher of events that can be subscribed to
type Publisher interface {
Close(Topic)
Publish(Topic, Event)
Shutdown()
Startup()
Subscribable
}
// EventTransform if a fucntion transforms one kind of event to another
type EventTransform func(Event) Event
// Notifee is a topic data subscriber plus a set of data you want to add to any topics subscribed to
// (used to call SubscribeWithData to inject data when events for a given topic emit)
type Notifee struct {
Data TopicData
Subscriber TopicDataSubscriber
}
// SubscribeWithData subscribes to the given subscriber on the given topic, and adds the notifiees
// custom data into the list of data injected into callbacks when events occur on that topic
func SubscribeWithData(p Subscribable, topic Topic, notifee Notifee) {
notifee.Subscriber.AddTopicData(topic, notifee.Data)
p.Subscribe(topic, notifee.Subscriber)
}
--
I also recommend:
--
All of this will neccesitate changes to all of the subscribers. We will need to dispatch multiple BlockSent notifications in the subscriber for the responsemanager, which will ultimately be the solution to the root issue.
I added
TestDataSentEvents
as an integration test in go-data-transfer to watch for events emitted. It seems thatgsBlockSentHook
is not called every time that a block is sent (whereasgsIncomingBlockHook
is called every time a block is received).gsBlockSentHook
is registered with graphsync usingRegisterBlockSentListener
.The full output from the test is below. Note the output for the last two lines:
On the provider graphsync emitted "receive" events for
20439
bytes. On the client graphsync emitted "queue" events for20439
bytes, but only emitted "sent" events for14295
bytes.Full output: