bluesky-social / jetstream

A simplified JSON event stream for AT Proto
MIT License
128 stars 11 forks source link

Periodic control message with time_usec #18

Closed ngerakines closed 1 week ago

ngerakines commented 1 week ago

I'd like to introduce a feature that periodically sends a WS data message that has the kind of cursor and a time_us key and value.

{
  "time_us": 1725911162329308,
  "kind": "cursor"
}

Rationale

Having a stream of cursors provides several benefits to consumers who rely on Jetstream.

First, with Jetstream running in multiple regions, this allows consumers to create low-cost connections that can remaining active for the purpose of hot-failover.

The added benefit is that consumers with connections open to regions can get a low-cost (in terms of bandwidth and compute) way of knowing if any jetstream instance is lagging behind or exhibiting unexpected behavior.

Second, this allows clients that are subscribing to low-frequency events to receive up-to-date cursor information for the purpose of reconnecting at the the latest possible point.

Use case, Smoke Signal events don't occur often, but I record the time_us value (cursor) for reconnecting after service changes. If a considerable amount of time goes by since the last time I saw a Smoke Signal event, and I reconnect at an old cursor, that could put unnecessary strain to cycle through events that aren't relevant to the connection's filters.

Compatibility

Because this is an opt-in feature, this addition would be backwards compatible to existing clients and consumers without disruption.

Configuration

This is an optional feature that can be added the following ways:

The informCursorSeconds query string parameter set to a number in the range of 60 to 600. Provided values will be clamped to the 60-600 range.

The informCursorSeconds attribute on the SubscriberOptionsUpdatePayload payload.

{
    "informCursorSeconds": 60
}

Set Once

This value can only be set once when the request is first made. Subsequent value changes in SubscriberOptionsUpdatePayload payloads past the point in which the response loop is created will be ignored.

Runtime

A new model will be introduced:

type InformCursorEvent struct {
    TimeUS  int64   `json:"time_us"`
    Kind    string  `json:"kind"`
}

The emit event function will be updated to set the sub.seq = timeUS earlier and above any methods what would return either a message to emit or nil. This gives each subscriber an up-to-date sequence value that is in sync with the firehose consumer.

With that value set, the main server response loop at https://github.com/bluesky-social/jetstream/blob/main/pkg/server/server.go#L241-L268 will be modified with the following behavior:

var tickerChannel chan Time
if value > 0 {
    ticker := time.NewTicker(value * time.Second)
    tickerChannel = ticker.C
} else {
    // When this feature isn't enabled, this channel will never receive making it a no-op in the loop
    tickerChannel = make(chan Time)
}
for {
    select {
    case <-ctx.Done():
        log.Info("shutting down subscriber")
        return nil
    case t := <-tickerChannel:
        cursorInformMsg := InformCursorEvent{ Kind: "cursor", TimeUS: sub.seq }
        if compress {
            if err := sub.WriteMessage(websocket.BinaryMessage, cursorInformMsg); err != nil {
                log.Error("failed to write message to websocket", "error", err)
                return nil
            }
            continue
        }
        if err := sub.WriteMessage(websocket.TextMessage, cursorInformMsg); err != nil {
            log.Error("failed to write message to websocket", "error", err)
            return nil
        }

    case msg := <-sub.outbox:
        // ... 
    }
}
joncar commented 1 week ago

Because you always receive account and identity messages this effectively exists today (as long as people keep signing up for Bluesky!). This happens enough that my measured cursor latency is always less than a minute.

ericvolp12 commented 1 week ago

FWIW I'm not super compelled to add this right now. I've been trying to keep the footprint of Jetstream and its complexity(and maintenance load) pretty relaxed and I think joncar's point that there are events you can use as milestones with seqs on them is fair.

ngerakines commented 1 week ago

Yeah, I think @joncar brought up a really good point and I'm using that recommendation currently. Will close out this issue.