haxetink / tink_streams

Streams from the future. With lasers, of course ... whoaaaaa!!!!
The Unlicense
12 stars 9 forks source link

Stream merge #4

Closed kevinresol closed 8 years ago

kevinresol commented 8 years ago

While map() is 1-to-1 mapping, merge() is N-to-1 mapping or it is also like a partial-fold.

Maybe there is a better name, but I can't find it in my mind, yet.

back2dos commented 8 years ago

Very nice :)

kevinresol commented 8 years ago

Seems 'compact/condense' are good names to use

Definition: compact, condense Synonyms: abbreviate, abridge, abstract, bind, boil down, coagulate, concentrate, consolidate, constrict, contract, cram, cramp, crowd, crush, decrease, dehydrate, densen, densify, epitomize, force into space, make brief, narrow, pack, press, press together, ram, reduce, restrict, shorten, shrink, shrivel, squash, squeeze, stuff, summarize, syncopate, telescope, tighten, wedge, wrap

back2dos commented 8 years ago

Maybe it should just be called regroup and perform N-to-M. Interestingly, map and filter are then just special cases of it.

One problem with the current implementation is that you have to find a way for the buffer to be flushable at the end. Just consider a stupid example:

function chunk<T>(s:Stream<T>, size:Int) 
  return s.merge(function (buf) return if (buf.length == size) Some(buf) else None)

So if you want to chunk a stream of 13 into chunks of 5 you will get 2 chunks and lose 3 items.

One alternative would be to have a stateful 1-to-M transformation that is aware of ends.

class Shatter<In, Out> extends StreamBase<Out> {
  var source:Stream<In> = _;
  var worker:Maybe<In>->Null<Array<Out>> = _;
  var buf:Array<Out>;

  function consume(?next) {
    if (next != null) 
      buf = next;
    if (buf != null) {
      for (i in 0...buf.length)
        if (!item(buf)) {
          buf = buf.slice(i+1);
          return false;
        }
      buf = null;
    }
    return true;
  }

  override public function forEach(item:Out->Bool) {
    if (!consume()) 
      return Future.sync(Success(false));

    return source.forEach(function (i) {
      return consume(worker(Maybe.Some(i));
    }) >> function (done:Bool) 
      return done && consume(worker(Maybe.None));//TODO: make eager
  }
  //TODO: forEachAsync - make async consume and the rest shouldn't be too hard
}

On top of this map, filter and your merge/compact would be easily implemented (in the last case the pure transformation would be wrapped in a worker that has the buffer).

kevinresol commented 8 years ago

Honestly I read the code a few times and I still can't comprehend it. But seems if map/filter is implemented over this, it will allocate some unnecessary arrays? (Correct me if I'm wrong)

back2dos commented 8 years ago

Honestly I read the code a few times and I still can't comprehend it.

My bad :D ... the item function needs to be passed to consume for it to work at all (or consume could just be a local function within forEach for that matter). I'll just make a working version of this and then we can take it from there.

But yes, it will allocate arrays, but those are cheap on most platforms. For the async version it's negligible anyway, since futures have a higher footprint in comparison. But it will be best to just measure that.

kevinresol commented 8 years ago

Oh I understand the item thing. I am just stuck at the slice part and not sure what's going on there. Is it discarding "old" items?

Yeah maybe a working example is better