I copied code from Palomar to subscribe to the Firehose. That includes persisting the cursor of the last processed message to a database (and also periodically). Note: This is also used by, e.g. Rainbow when saving the cursor to disk.
I'm receiving a message with sequence number N and persist N to the database. On next startup, I read N from the database and pass it as cursor=N to com.atproto.sync.subscribeRepos. I noticed that the first message I receive has sequence number N, not N+1, which means I process that message twice.
This was an easy fix on my end (and I have a PR to fix search/firehose.go), but I was curious if there were other components affected, so I dug a bit deeper.
// LowerBound specifies the smallest key (inclusive) that the iterator will
// return during iteration. If the iterator is seeked or iterated past this
// boundary the iterator will return Valid()==false. Setting LowerBound
// effectively truncates the key space visible to the iterator.
The first element of the iterator is returned, i.e., this returns all events with evt.Seq >= since.
EDIT: What confuses me is that I thought the current Firehose used DbPersistence or maybe DiskPersistence, but it doesn't behave that way :thinking: .
Solutions
It would be nice to have documentation for com.atproto.sync.subscribeRepos that states whether it should return everything > since or >= since.
The EventPersistence interface should have documentation to specify how since should be handled.
The implementations should all behave the same way.
I can submit PRs for 2 and 3, probably. Let me know :)
EDIT2: Seeing how there's a test for PebblePersist and DiskPersistence, apparently they behave the same and I'm confused somewhere. I'll keep looking...
EDIT3: Well, after thinking about this for a while, I can't really put my finger on where exactly, but I believe something is not right. The Firehose (and the EventPersistence interface, based on the existing tests) return events with evt.SeqNo >= since, i.e., including since. A bunch of the consumers, including search/firehose.go and splitter use the last processed sequence number, or the highest sequence number in PebbleDB for since. I think they process one event twice.
If you let me know how it should behave, I can send PRs to fix it.
EDIT4: Just to add, the streaming endpoint of Labelers return events with seqNo > since.
The Situation
I copied code from Palomar to subscribe to the Firehose. That includes persisting the cursor of the last processed message to a database (and also periodically). Note: This is also used by, e.g. Rainbow when saving the cursor to disk. I'm receiving a message with sequence number
N
and persistN
to the database. On next startup, I readN
from the database and pass it ascursor=N
tocom.atproto.sync.subscribeRepos
. I noticed that the first message I receive has sequence numberN
, notN+1
, which means I process that message twice. This was an easy fix on my end (and I have a PR to fixsearch/firehose.go
), but I was curious if there were other components affected, so I dug a bit deeper.I didn't find good documentation for
com.atproto.sync.subscribeRepos
, but I did find this: https://github.com/bluesky-social/atproto/blob/main/lexicons/com/atproto/sync/subscribeRepos.json#L13 ...which states"The last known event seq number to backfill from."
. My gut feeling tells me if I already know the sequence number, I don't want it included in the response, i.e., I want everything> cursor
.Checking the Implementation
The Firehose runs
EventPersistence.Playback(ctx, cursor, ...)
with the user-provided cursor: https://github.com/bluesky-social/indigo/blob/main/events/events.go#L360 Checking all implementations of that interface, I get:DbPersistence
correctly queries forseq > ?
: https://github.com/bluesky-social/indigo/blob/main/events/dbpersist.go#L379DiskPersistence
... not 100% sure, but I think this correctly seeks to an event withevt.Seq > since
here: https://github.com/bluesky-social/indigo/blob/main/events/diskpersist.go#L245-L257PebblePersist
queries forLowerBound: [8]byte{since}
: https://github.com/bluesky-social/indigo/blob/main/events/pebblepersist.go#L123 The docs on that say:The first element of the iterator is returned, i.e., this returns all events with
evt.Seq >= since
.EventRingBuffer
(I think) correctly skips whileevt.Seq <= since
: https://github.com/bluesky-social/indigo/blob/main/splitter/ringbuf.go#L116-L118MemPersister
is probably correct, if thatTODO
is telling the truth: https://github.com/bluesky-social/indigo/blob/main/events/persist.go#L76-L80YoloPersister
does not support playback.EDIT: What confuses me is that I thought the current Firehose used
DbPersistence
or maybeDiskPersistence
, but it doesn't behave that way :thinking: .Solutions
com.atproto.sync.subscribeRepos
that states whether it should return everything> since
or>= since
.EventPersistence
interface should have documentation to specify howsince
should be handled.I can submit PRs for
2
and3
, probably. Let me know :)EDIT2: Seeing how there's a test for
PebblePersist
andDiskPersistence
, apparently they behave the same and I'm confused somewhere. I'll keep looking...EDIT3: Well, after thinking about this for a while, I can't really put my finger on where exactly, but I believe something is not right. The Firehose (and the
EventPersistence
interface, based on the existing tests) return events withevt.SeqNo >= since
, i.e., includingsince
. A bunch of the consumers, includingsearch/firehose.go
andsplitter
use the last processed sequence number, or the highest sequence number in PebbleDB forsince
. I think they process one event twice. If you let me know how it should behave, I can send PRs to fix it.EDIT4: Just to add, the streaming endpoint of Labelers return events with
seqNo > since
.