pebbe / zmq4

A Go interface to ZeroMQ version 4
BSD 2-Clause "Simplified" License
1.17k stars 163 forks source link

Add zmq.ChannelPair #81

Closed abligh closed 8 years ago

abligh commented 8 years ago

Add an (experimental) zmq.ChannelPair class which wraps a zmq socket into a TX and RX channel.

Currently ZMQ sockets are not threadsafe. These are difficulty to use in combination with golang channels as you can poll on a set of sockets, or select on a set of channels, but not both. This creates problems if you want to use conventional go techniques, e.g. using a chan bool for ending goroutines.

This experimental patch provides an optional means of wrapping a ZMQ socket into a ChannelPair, which provides an Rx and Tx channel (as well as an error channel). This is loosely based on the idea of another go binding: http://github.com/vaughan0/go-zmq but works with ZMQ 4.x.

This is currently lightly tested / experimental, and is submitted to solicit feedback.

Signed-off-by: Alex Bligh alex@alex.org.uk

pebbe commented 8 years ago

My initial response...

I don't feel like something like this adding to the package. It is on a higher level than what zmq4 is supposed to be.

Also, a user may want to modify this for specific needs.

If this is something more people may find useful, a good place for this could be the wiki, under design patterns:

https://github.com/pebbe/zmq4/wiki/DesignPatterns

aletheia7 commented 8 years ago

Looks good.

Some of my preferences in implementation would be:

  1. Use package context.WithCancel() instead of sync.WaitGroup/wg.Add()/wg.Wait(). I have used wg.Wait() extensively. wg.WaitGroup is a reference counting system. If someone forgets to wg.Add() before a go, the program hangs. I think context.WithCancel() provides for simpler, and easier to maintain, code. For example, at https://github.com/vaughan0/go-zmq/blob/master/channels.go#L10, the Channel struct would use a c.ctx context.Context and c.cancel context.CancelFunc. The Channel.Close() would call c.CancelFunc() to close all of the sockets. processOutgoing() and processSockets() would use case <-c.ctx.Done() instead of case <-c.stopch. If a problem occurs in an receiver, just call c.cancel().
  2. Use []string instead of [][]byte for In/Out channels. string involves an extra copy, but when someone sends Out <-[][]byte, the buffer must never be used again or else the real fun begins when send [][]byte are accidentally reused. Using channels with ctx.MakePair() means extra copies of arrays are being made anyway. This feature is balanced more towards usability as opposed to performance. string is a []byte and easier to use.
abligh commented 8 years ago

@pebbe the truth is that I first wrote it as it seemed the natural 'go friendly' way to use ZMQ channels that I'd use in a separate application I'd written. Then I thought 'no, I should stick to the way the repo does it at the moment'. But I found that doing anything non-trivial in a golang friendly manner is really quite awkward. The classic case is doing a little work in a go-routine with a cancelation channel, which is a pretty standard way to do things in go as I'm sure you know; in essence you have to manually provision a zmq PAIR and move to using the Poll() interface. So I've found it a pretty helpful abstraction (more so than the reactor, for instance). And it is in the vaughan0 wrapper. On the other hand, I can see it isn't a faithful port of the C API (which this repo currently is, save for the Reactor). I'd prefer it was included (as it would make using the API easier in my view), but if you don't, I can always wrap it up as a separate library.

abligh commented 8 years ago

@aletheia7 re (1), I don't think I agree there. context adds a whole pile of additional stuff, and appears to be more focused on passing context data with a request. I just need the waitgroups for cancellation, and (given this is the way they are actually used in the go tutorials) I think their use is pretty idiomatic for that.

@aletheia7 re (2), I half agree. What I would like is an idiomatic route to do both. My particular use case wanted to use [][]byte, but it would be better if it could use [][]byte or []string channels. I struggled to do that without a lot of code duplication. I'm using byte[][] which is exactly what the existing API offers, and can be used with exactly the same constraints (re changing underlying data). Of course the existing API also offers []string, and you need to be careful about changing underlying data there too I believe. On the copying point, my understanding is that chan [][]byte only copies the outer slice header. chan string copies the entire string. But what we're actually comparing is chan [][]byte and chan []string which will copy the slice header only in either case. So whilst I don't think there is any copying going on in either case, you are right that the copying element is a wash (but for a different reason) - in both cases it's copying only a slice header. Given retrieving from a chan [][]byte only requires initiating a string, it's not an enormous amount of effort but I suppose some helper functions or an alternate pair of channels might be useful.

pebbe commented 8 years ago

Since there ave been no more responses for a while, I am closing this pull request without merging.

If you still feel this pull request is usefull, please rewrite it as a design pattern, and add it to the wiki: https://github.com/pebbe/zmq4/wiki/DesignPatterns