haxetink / tink_streams

Streams from the future. With lasers, of course ... whoaaaaa!!!!
The Unlicense
12 stars 9 forks source link

Recursiveness and php #20

Open benmerckx opened 5 years ago

benmerckx commented 5 years ago

I've been trying to debug an issue where tink_sql craps out after a certain amount of rows on php. I'm not getting to any conclusion, my only hunch is the recursiveness of Generator is causing this (since it all works fine on smaller samples). Either an error is getting swallowed or it actually silently stops resuming. I wrote a simple Iterator stream with a little less recursive calls (below). In this particular situation I was limited to 164 rows, with the stream below 495. Not sure where to go from here and if the recursion can be limited even more. Maybe the escape hatch is giving some private access method in the php driver directly to the iterator and skipping the stream abstraction...

Now I don't know if this is desirable to have in tink_streams (also not sure if streams are supposed to be immutable, this is not).

class IteratorStream<Item, Quality> extends StreamBase<Item, Quality> {
    var iterator: Iterator<Item>;

    public function new(iterator)
        this.iterator = iterator;

    override function next():Future<Step<Item, Quality>>
        return switch iterator.hasNext() {
            case true: Link(iterator.next(), this);
            case false: End;
        }

    override public function forEach<Safety>(handler:Handler<Item, Safety>) {
        return Future.async(function step(cb:Conclusion<Item, Safety, Quality>->Void) 
            switch iterator.hasNext() {
                case true: 
                    handler.apply(iterator.next()).handle(function (s) {
                        switch s {
                            case BackOff:
                                cb(Halted(this));
                            case Finish:
                                cb(Halted(this));
                            case Resume:
                                step(cb);
                            case Clog(e):
                                cb(Clogged(e, this));
                        }
                    });
                case false: cb(Depleted);
            }, true);
    }
}
back2dos commented 5 years ago

Can you please add a failing case here?

benmerckx commented 5 years ago

Problem is I don't know what's going on :) I'm only guessing at recursiveness being at fault. What boggles me is the php script keeps executing after a certain amount of rows were processed. But the handle of the collect call is not called. I can get a test to fail with an actual 'Segmentation fault' message but since I'm not getting that in my scenario I'm not sure how useful this is:

import tink.streams.*;

@:asserts
class Issue20 {
  public function new() {}

  public function testIssue() {
    var max = 80000; // Adjust until it fails
    var s: RealStream<Int> = Stream.ofIterator(0...max);
    s.collect().handle(function (res) {
      switch res {
        case Success(all): 
          asserts.assert(all.length == max);
        case Failure(e):
          throw e;
      }
      asserts.done();
    });
    return asserts;
  }
}
benmerckx commented 5 years ago

I'm assuming recursiveness because:

But I'm clueless as to why it silently stops iterating and never resolves the collect promise. I would expect a "Segmentation fault" as above...