splunk / pipelines

Concurrent processing pipelines in Go.
MIT License
21 stars 8 forks source link

Consider adding a Merge stage. #13

Open kalexmills-splunk opened 2 years ago

kalexmills-splunk commented 2 years ago

Currently, Combine requires the caller to structure their code so as to unify the types involved. This is just one option; another is to use a signature like the below:

Merge[S,T any](ctx context.Context, in1 <-chan T, in2 <-chan S) <-chan Pair[S,T]

...along with a generic Pair type....

type Pair[S, T any] struct {
  First S; Second T
}

But I don't think this package should introduce a generic Pair type. This would also require blocking one half of a combined pipeline in order to allow the other to catch up, which can, of course, get a teeny bit fraught.

Yet another, more viable option is:

// Merge combines values from in1 and in2. Each pair of values received is passed to combiner. After in1 is closed,
// all values received on in2 are passed to combiner, paired with a nil value of *S. Likewise, after in2 is closed, all
// values received on in1 are passed to combiner, paired with a nil value of *T.
Merge[S,T,U any](ctx context.Context, in1 <-chan T, in2 <-chan S, combiner func(*S, *T) U) <-chan U

It's unclear whether either of these API options are better than having the caller align on a unified type as part of pipeline stages upstream. They both introduce some additional complexity.

kalexmills-splunk commented 2 years ago

I don't think that either of these are particularly useful, except as type conversions. The name should change to Merge. Open to other thoughts.

One annoyance to note about the Combine[S,T,U any] func is that the author of combiner has to handle missing values of S or T whenever one of the channels closes before the other.

I would close this issue but I'll leave it open in case anyone else has something to add.