matryer / vice

Go channels at horizontal scale (powered by message queues)
https://medium.com/@matryer/introducing-vice-go-channels-across-many-machines-bcac1147d7e2
Apache License 2.0
1.54k stars 79 forks source link

context #10

Closed matryer closed 7 years ago

matryer commented 7 years ago

I toyed with the idea of adding context for cancellation, but it meant that the idea wasn't explicitly represented in the interface. I'd imagine the context would go into the New functions in the implementations, rather than Send and Receive? That's not a deal breaker, but that's why I left Stop and Done in there. But since context is in the std lib, it's definitely still on the table.

We could have a context passed in to the New functions of the transport implementations to handle stopping, instead of a Stop function.

I think we'd still want Done() because it's important that the implementations are given enough time to properly de-register their interest in the underlying queue. Or is there a more elegant way to do that?

matryer commented 7 years ago

@HeavyHorst @GeorgeMac What do you think?

GeorgeMac commented 7 years ago

Have you considered removing the concept of stopping and signaling finished from the interface altogether? Often this detail is implementation specific and not crucial to core functionality of interfacing with a queue.

When an implementation of the interface is instantiated, take for example the NSQ implementation (https://github.com/matryer/vice/blob/master/queues/nsq/nsq.go#L50) you return a pointer to the struct type. So you can still have concurrency control semantics on there, like a Stop or a Done func. But you can also omit them where necessary. You then don't need to pollute the interface. The interface should be designed from the consumers perspective. Not the programmer, but the code that interfaces with the Send and Receive functions. Most concrete code that would consume this interface should be focussed on send and receiving or maybe even just one of those semantics.

You could go as far as to say:

type Sender interface{
    Send(ctxt context.Context, name string, msgs <-chan []byte)
}

type Receiver interface{
    Receive(ctxt context.Context, name string) <-chan []byte
}

Where on Send ctxt.Done() should be closed when it has stopped consuming from msgs chan. Which you can assert in your test harness.

Again though, this is dependent on your audience. Is this repo to be a set of small interface contracts for queue adapters to be written against. Or is it more educational and have a single interface which defines a contract that all users should be aware of (sending, receiving, errors on those operations, graceful shutdown).

I hope this is useful and not completely missing the point ๐Ÿ‘

matryer commented 7 years ago

One of the design goals was to allow users to swap out queues without touching any other code (for most cases). Thatโ€™s why I like error handling and graceful shutdown part of the interface.

Instead of Sender and Receiver interfaces, I think users achieve the same this by passing the channels around.

On 30 Jul 2017, at 13:26, George notifications@github.com wrote:

Have you considered removing the concept of stopping and signaling finished from the interface altogether? Often this detail is implementation specific and not crucial to core functionality of interfacing with a queue.

When an implementation of the interface is instantiated, take for example the NSQ implementation (https://github.com/matryer/vice/blob/master/queues/nsq/nsq.go#L50 https://github.com/matryer/vice/blob/master/queues/nsq/nsq.go#L50) you return a pointer to the struct type. So you can still have concurrency control semantics on there, like a Stop or a Done func. But you can also omit them where necessary. You then don't need to pollute the interface. The interface should be designed from the consumers perspective. Not the programmer, but the code that interfaces with the Send and Receive functions. Most concrete code that would consume this interface should be focussed on send and receiving or maybe even just one of those semantics.

You could go as far as to say:

type Sender interface{ Send(ctxt context.Context, name string, msgs <-chan []byte) }

type Receiver interface{ Receive(ctxt context.Context, name string) <-chan []byte } Where on Send ctxt.Done() should be closed when it has stopped consuming from msgs chan. Which you can assert in your test harness.

Again though, this is dependent on your audience. Is this repo to be a set of small interface contracts for queue adapters to be written against. Or is it more educational and have a single interface which defines a contract that all users should be aware of (sending, receiving, errors on those operations, graceful shutdown).

I hope this is useful and not completely missing the point ๐Ÿ‘

โ€” You are receiving this because you authored the thread. Reply to this email directly, view it on GitHub https://github.com/matryer/vice/issues/10#issuecomment-318898540, or mute the thread https://github.com/notifications/unsubscribe-auth/AAGNG9EmnZFUAzz9YymQwot0fUfeMQemks5sTHZngaJpZM4OnnCy.

HeavyHorst commented 7 years ago

I see one problem if we put the context into the Send and Receive methods.

Receive(ctx context.Context, name string) <-chan []byte
Send(ctx context.Context, name string) chan<- []byte

Send and Receive reuses the channels so it could get tricky to unsubscribe the right ones.

ctx1, cancel1 := context.WithCancel(context.Background())
ctx2, cancel2 := context.WithCancel(context.Background())

// same channels
chan1 := transport.Receive(ctx1, "vicechannel1")
chan2 := transport.Receive(ctx2, "vicechannel1")

cancel2() //this should do nothing because transport.Receive(ctx1, "vicechannel1") is still active

We could have a context passed in to the New functions of the transport implementations to handle stopping, instead of a Stop function.

I think we'd still want Done() because it's important that the implementations are given enough time to properly de-register their interest in the underlying queue. Or is there a more elegant way to do that?

I think this approach is fine.

GeorgeMac commented 7 years ago

Depends on the intention of Receive. You actually make this distinction in your other issue https://github.com/matryer/vice/issues/19

Depending on the message semantics. The result of calling Receive could be a subscription to every message or a subset of the stream as in a work queue. If you made the default that every call to Receive gave you a subscription, you can still distribute work by sharing that resulting channel. By forcing the same semantics as a channel on the Receive action, you are limiting consumers of this API to that messaging pattern. If they want copy the stream, doing so will require they implement it on top of a channel. Whereas if receive was a distinct subscription it is trivial to get work queue behaving by sharing the queue.

However, this abstraction might not make sense for both types of message queue. Maybe it shouldn't be painted with the same brush and they should be separate interfaces?

matryer commented 7 years ago

re context, I tried implementing it in nsq and sadly it failed for some reason. See context branch if interested.

Tbh, I'm not too worried about it having an explicit Stop function; so unless it's trivial to implement, I'll probably end up leaving it. If anyone feels strongly either way, you are more than welcome to have a look.