superfly / corrosion

Gossip-based service discovery (and more) for large distributed systems.
https://superfly.github.io/corrosion/
Apache License 2.0
623 stars 19 forks source link

Deduplication of the subscriptions #215

Open gedw99 opened 1 month ago

gedw99 commented 1 month ago

Here is an example setup.

You have a 3 node corrosion running and 3 golang servers subscribing using https://superfly.github.io/corrosion/api/subscriptions.html

If , in the golang servers, the data feed from corrosion is something that does , for example, sending an email, how can I ensure only 1 email is sent instead of 3 ?

This is why I called the issue de-duplication, as it’s the closest name for this problem space.

I am looking for architecture advice and to see if we can use corrosion for this type of work pattern.

I guess I could slap in a nats Jetstream queue that ensures only 1 email job runs, but I don’t want to as it’s more moving parts :) Less is more :)

jeromegn commented 1 month ago

Exactly-once delivery is pretty hard to achieve in distributed systems.

There are ways around the issue, but there are caveats!

You could do consistent hashing the same node always processes it's own partition of emails. However, if you change the number of nodes ingesting the subscription, you'll have to update the logic to process updates so new nodes get to send emails too.

If you have numerical, unique, IDs, then you could try SQLite's mod() math function:

This would have the effect of creating different subscriptions for each client node (your golang servers). It has the same issue as consistent hashing where you need to change the query if you add or remove nodes. This only really works if you precisely control the golang nodes and can assign them indexes without gaps between 0 and len - 1.

These are just ideas. It would probably be best to put the subscription in a proper queue and consume that queue.

gedw99 commented 1 month ago

Thank you @jeromegn

I appreciate the response.

it’s funny but at the end of the day we both sort of hit the same old thing. Use a queue.

Imagine you have a golang server farm with a GUI written in htmx. Every server would try to push html to the browser , as a result of a subscription feed. The current user is only connected over SSE to one user , so it would be du-dipped by the fact that the human is on one device at a time. The other golang servers would do nothing with the subscription feed in this case.

Complex stuff .

Corrosion could ideally model subscriptions with an ACK. Under the hood, it would save that the subscription sequence was handled and distribute that to the other corrosion servers. There is a timing race condition here of course so it would again be “ once or more “ queue semantics.

Either way all devs using corrosion will hit this problem .

Does the rust client for corrosion handle this problem using hashes etc ?

gedw99 commented 1 month ago

Nats Jetstream using its native de-duplication looks like a solution I will try

https://nats.io/blog/new-per-subject-discard-policy/

I still need to write a golang client for corrosion to easily digest the corrosion stream . I raised another issue about other client , like golang , in this repo issues list

jeromegn commented 1 month ago

I think that's a hard problem that Corrosion is not well suited to solve. It's a strongly eventually consistent database. It can't offer the kinds of guarantees.

You can't use Corrosion as a queue if you need exactly once guarantees. The pub/sub functionality is designed so that every client gets to the same state eventually. You do need a different system if you need some kind of "locking" to make sure an entry is processed only once.

If you do need to achieve this, then I suggest using a single client to send the emails. That would require something like leader election if you need it to be resilient.

gedw99 commented 1 month ago

I agree fully, and I appreciate the brain storm.

I need exactly-once processing. https://docs.nats.io/using-nats/developer/develop_jetstream/model_deep_dive#exactly-once-semantics

I will take the output from the Corrosion subscription and feed it into NATS.

So it is a combination of de-duplication by the server when receiving a published message as well as a double ack call by the subscription that had received the message (plus retries if necessary).

package main

import (
    "log"
    "time"

    "github.com/nats-io/nats.go"
)

func failOnErr(err error) {
    if err != nil {
        log.Fatal(err)
    }
}

func main() {
    // Connect and get the JetStream context.
    nc, _ := nats.Connect(nats.DefaultURL)
    js, _ := nc.JetStream()

    // Create a test stream.
    _, err := js.AddStream(&nats.StreamConfig{
        Name:       "test",
        Storage:    nats.MemoryStorage,
        Subjects:   []string{"test.>"},
        Duplicates: time.Minute,
    })
    failOnErr(err)

    defer js.DeleteStream("test")

    // Publish some messages with duplicates.
    js.Publish("test.1", []byte("hello"), nats.MsgId("1"))
    js.Publish("test.2", []byte("world"), nats.MsgId("2"))
    js.Publish("test.1", []byte("hello"), nats.MsgId("1"))
    js.Publish("test.1", []byte("hello"), nats.MsgId("1"))
    js.Publish("test.2", []byte("world"), nats.MsgId("2"))
    js.Publish("test.2", []byte("world"), nats.MsgId("2"))

    // Create an explicit pull consumer on the stream.
    _, err = js.AddConsumer("test", &nats.ConsumerConfig{
        Durable:       "test",
        AckPolicy:     nats.AckExplicitPolicy,
        DeliverPolicy: nats.DeliverAllPolicy,
    })
    failOnErr(err)
    defer js.DeleteConsumer("test", "test")

    // Create a subscription on the pull consumer.
    // Subject can be empty since it defaults to all subjects bound to the stream.
    sub, err := js.PullSubscribe("", "test", nats.BindStream("test"))
    failOnErr(err)

    // Only two should be delivered.
    batch, _ := sub.Fetch(10)
    log.Printf("%d messages", len(batch))

    // AckSync both to ensure the server received the ack.
    batch[0].AckSync()
    batch[1].AckSync()

    // Should be zero.
    batch, _ = sub.Fetch(10, nats.MaxWait(time.Second))
    log.Printf("%d messages", len(batch))
}

Pushing that then to Web Clients uses Out of Order streaming based on DSD Web Components ( zero javascript needed ). SO the GUI just hangs waiting for more data.

https://scottnath.com/blahg/profile-components--dsd/

A Transaction from the GUI goes to NATS and then Corrosion. Any other Web component fragment just updates off the subscription at the same time. SO PUSH and PULL are 100% decoupled. A new Page load is just a corrosion Query, and if I over subscribe then the GUI hides it as it does not care about over fetching.

So the Web GUI is an exact mirror of the DB. Kind of like the old "Naked Objects" manifesto.
If I want to change the GUI, I actually change the SQL Data Schema.

Still need to test live data schema changes with CRDT. I read the way it works. its freaky ingenious...