crystal-lang / crystal

The Crystal Programming Language
https://crystal-lang.org
Apache License 2.0
19.42k stars 1.62k forks source link

[RFC] Implement Ruby's Enumerator Class #6357

Open felixbuenemann opened 6 years ago

felixbuenemann commented 6 years ago

I really like Ruby's Enumerator class and I think it would be great to have something similar in Crystal.

The Enumerator class can be used to implement generators.

Here's an example from the Ruby docs:

fib = Enumerator.new do |y|
  a = b = 1
  loop do
    y << a
    a, b = b, a + b
  end
end

p fib.take(10) # => [1, 1, 2, 3, 5, 8, 13, 21, 34, 55]

The y block parameter in the example is called the yielder and the iterator returned from new blocks until the yielder receives a value or the block ends.

Since I was porting some Ruby code that depended on Enumerator to Crystal, I wen't ahead and implemented my own generic Enumerator class using the Iterator module and Channel to implement the concurrent behavior:

class Enumerator(T)
  include Iterator(T)

  def initialize(&block : Yielder(T) -> Nil)
    @channel = Channel(T).new
    @block = block
    @block_called = false
    spawn_block
  end

  def next
    @channel.receive
  rescue Channel::ClosedError
    stop
  end

  def rewind
    if @block_called
      @channel = Channel(T).new
      @block_called = false
      spawn_block
    end
    self
  end

  private def spawn_block : Nil
    spawn do
      begin
        @block.call(Yielder(T).new(@channel))
      ensure
        @channel.close
        @block_called = true
      end
    end
  end

  private struct Yielder(T)
    def initialize(@channel : Channel(T))
    end

    def <<(data : T)
      @channel.send(data)
      self
    end
  end
end

The Fibonacci generator could now be implemented similar to the Ruby example:

fib = Enumerator(UInt32).new do |y|
  a = b = 1_u32
  loop do
    y << a
    a, b = b, a + b
  end
end

p fib.first(10).to_a # => [1, 1, 2, 3, 5, 8, 13, 21, 34, 55]

Please let me know if you think this is a useful addition to the Crystal Standard Library.

If you do, I would be happy to polish the code up into a PR along with documentation and specs.

I've also implemented Enumerator#peek locally, which allows to look at the next iterator value without moving it forward by buffering the value on peek, but left it out of the example for simplicity.

asterite commented 6 years ago

Yup, it's been discussed in the past:

https://groups.google.com/forum/#!searchin/crystal-lang/Enumerator|sort:date/crystal-lang/L08Hz8oXuIU/WqIDLxzKDgAJ

https://play.crystal-lang.org/#/r/aob

The main issue is that it's super slow. Like, 1000x times slower than Iterator. And we don't want to encourage slow code, much less in the standard library...

Well, at least that was the last time I benchmarked it, maybe it should be benchmarked again.

By the way, fibonacci can be currently implemented with Iterator, and it's super fast, no need for fibers nor channels:

a = b = 1
fib = Iterator.of do
  a.tap { a, b = b, a + b }
end

p fib.first(10).to_a

https://play.crystal-lang.org/#/r/4gz8

But, of course, more complex enumerators which yield in the middle of a method and continue from there can't be implemented with Iterator.of.

bew commented 6 years ago

maybe similar to #4438 ?

@felixbuenemann fun fact: your implementation reminds me #4198 ;)

felixbuenemann commented 6 years ago

@asterite Yeah, my actual use case is much more complicated. I have a nested while loop with XML::Reader that builds records from expanded sub nodes and yields them into the iterator.

It can certainly implemented with a simple yield without an iterator, but it makes it harder to do stuff like each_slice which you get for free with iterators.

In my case the performance of the enumerator shouldn't really matter, since most of the processing is in libxml2 anyways.

I'm not partial to the ruby specific implementation, but some form of a generator abstraction that doesn't have Iterators limitations would be great.

@bew Thanks for the link, I searched the existing issues for "enumerator" but not for "generator".

asterite commented 6 years ago

@felixbuenemann You can implement Iterator and use a stack. That's a way to remove recursion (for example to implement Iterator for a binary tree).

I just tried to implement Enumerator with just fibers, no channels, like in Ruby. It's just a bad implementation because I'm confused about where a fiber should resume, but this basic example works:

class Enumerator(T)
  include Iterator(T)

  struct Yielder(T)
    getter! enumerator

    def initialize(@enumerator : Enumerator(T))
    end

    def <<(value : T)
      @enumerator.value = value
    end
  end

  getter! fiber : Fiber
  getter! yielder : Yielder(T)

  def initialize(&@block : Yielder(T) ->)
    @current_fiber = Fiber.current
    @yielder = Yielder(T).new(self)
    @fiber = Fiber.new do
      @block.call(@yielder.not_nil!)
    end
  end

  @value : T?

  def value=(@value : T)
    @current_fiber.resume
  end

  def next
    # This is probably not correct
    @current_fiber = Fiber.current

    stop unless fiber.alive?

    fiber.resume

    fiber.alive? ? @value.as(T) : stop
  end
end

fib = Enumerator(Int32).new do |y|
  3.times do |i|
    y << i
  end
end

p fib.next
p fib.next
p fib.next

That is, using some sort of cooperative fibers, instead of the non-cooperative fibers we have now. But that @current_fiber.resume should probably be Ruby's Fiber.yield.

But still, doing a simple benchmark of that implementation using Fibers vs Iterator gives 0.37s for Enumerator vs. 0.005s for Iterator. It's a dramatic difference. The code is something like this:

e = Enumerator.new do |y|
  loop do
    y << 1
  end
end

time = Time.now
10_000_000.times do
  e.next
end
puts Time.now - time

But then in Ruby that same example takes 12 seconds... so maybe it's not that bad? :-)

If we can figure out how to implement it with fibers and context switching, without channels, it might be a good fit for the standard library, even if it's slower than a manually written Iterator (a hand-written Iterator can always be written if Enumerator becomes the bottleneck of your program).

asterite commented 6 years ago

Oh, and in the example above I defined Fiber#@alive and Fiber#alive? for it to work (just a boolean that's true when the fiber starts, and false when it ends). But for some reason once the fiber finishes, everything crashes with a segfault... probably because the fiber is not registered in the Scheduler.

asterite commented 6 years ago

(using the implementation in the first comment in this issue, using channels, the benchmark takes 0.6 seconds)

felixbuenemann commented 6 years ago

Hey, I was also looking for a way to find out if a fiber was still running, because at first I tried to save the fiber from spawn to an instance variable and then restart the fiber if needed during rewind.

So I think adding such a flag to Fiber would be generally useful.

I'll see if I can get your improved version running with my code.

Can you post the patch you did to the Fiber class?

asterite commented 6 years ago

It's just setting @alive = true in Fiber, then in run ensure it's set to false.

felixbuenemann commented 6 years ago

@asterite I tried you implementation, but it just hangs.

Here's the test suite I used (quick conversion from minitest.cr to crystal spec):

require "spec"
require "../src/enumerator"

describe Enumerator do

  describe "#initialize" do
    it "returns an instance of Enumerator" do
      instance = Enumerator(String).new {}
      instance.should be_a(Enumerator(String))
    end
  end

  describe "#next" do
    it "returns the values in the order they were yielded" do
      enumerator = Enumerator(String?).new do |yielder|
        yielder << "hello"
        yielder << nil
        yielder << "world"
      end

      enumerator.next.should eq("hello")
      enumerator.next.should be_nil
      enumerator.next.should eq("world")
      enumerator.next.should be_a(Iterator::Stop)
    end
  end

  describe "#peek" do
    it "peeks at the next value without affecting next" do
      next # Not implemented
      enumerator = Enumerator(String?).new do |yielder|
        yielder << "hello"
        yielder << "world"
      end

      enumerator.peek.should eq("hello")
      enumerator.peek.should eq("hello")
      enumerator.next.should eq("hello")
      enumerator.peek.should eq("world")
      enumerator.peek.should eq("world")
      enumerator.next.should eq("world")
      enumerator.peek.should be_a(Iterator::Stop)
      enumerator.next.should be_a(Iterator::Stop)
      enumerator.rewind
      enumerator.peek.should eq("hello")
    end
  end

  describe "#each" do
    it "iterates the yielded values" do
      enumerator = Enumerator(String).new do |yielder|
        yielder << "hello"
        yielder << "world"
      end

      values = [] of String
      enumerator.each { |value| values << value }
      values.should eq(["hello", "world"])
    end
  end

  describe "Enumerable" do
    it "responds to Enumerable methods" do
      enumerator = Enumerator(String).new do |yielder|
        yielder << "hello"
        yielder << "world"
      end

      enumerator.to_a.should eq(["hello", "world"])
    end
  end

  describe "#rewind" do
    it "rewinds the iterator" do
      next # Not implemented
      enumerator = Enumerator(String).new do |yielder|
        yielder << "hello"
        yielder << "world"
      end
      # We rewind first to check that block isn't called multiple
      # times if it was not enumerated before calling #rewind.
      enumerator.rewind.should be_a(Enumerator(String))

      enumerator.to_a.should eq(["hello", "world"])

      # Must be rewound, should be empty.
      enumerator.to_a.should eq([] of String)
      enumerator.rewind.should be_a(Enumerator(String))
      enumerator.to_a.should eq(["hello", "world"])

      # And make sure rewind properly resets state.
      enumerator.to_a.should eq([] of String)
      enumerator.rewind.should be_a(Enumerator(String))
      enumerator.rewind.should be_a(Enumerator(String))
      enumerator.to_a.should eq(["hello", "world"])
    end
  end

  describe "Nil type support" do
    it "works when the type is Nil" do
      enumerator = Enumerator(Nil).new do |yielder|
        yielder << nil
        yielder << nil
      end

      enumerator.to_a.should eq([nil, nil])
    end
  end

  describe "yielder chaining" do
    it "allows to chain yielder calls" do
      enumerator = Enumerator(String).new do |yielder|
        yielder << "hello" << "world"
      end

      enumerator.to_a.should eq(["hello", "world"])
    end
  end

  describe "Fibonacci enumerator" do
    it "generates the first 10 numbers in the Fibonacci sequence" do
      # Example adapted from Ruby Enumerator docs.
      fib = Enumerator(UInt32).new do |y|
        a = b = 1_u32
        loop do
          y << a
          a, b = b, a + b
        end
      end

      fib.first(10).to_a.should eq([1, 1, 2, 3, 5, 8, 13, 21, 34, 55])
    end
  end

end
asterite commented 6 years ago

I know it hangs. I'm not sure cooperative and non-cooperative fibers can coexist. I believe maybe @waj will know how to implement this, but I don't think he'll have time.

felixbuenemann commented 6 years ago

@asterite I got it working!

The problem with your implementation was that it didin't resume the @current_fiber after the fiber had finished.

I came up with the following implementation based on you code, but using a Deque for storage, so that it works with nil values. I also added #rewind and #peek and replaced the Fiber#alive? checks with a simple @done flag.

class Enumerator(T)
  include Iterator(T)

  def initialize(&@block : Yielder(T) ->)
    @current_fiber = Fiber.current
    @stack = Deque(T).new(1)
    @yielder = Yielder(T).new(self)
    run
  end

  def next
    fetch_next
    @stack.shift { stop }
  end

  def rewind
    run if @done
    self
  end

  def peek
    fetch_next
    if @stack.empty?
      stop
    else
      @stack[0]
    end
  end

  protected def <<(value : T)
    @stack.push(value)
    @current_fiber.resume
  end

  private def fetch_next : Nil
    @current_fiber = Fiber.current
    @done || @fiber.not_nil!.resume if @stack.empty?
  end

  private def run
    @stack.clear
    @done = false
    @fiber = Fiber.new do
      begin
        @block.call(@yielder.not_nil!)
      ensure
        @done = true
        @current_fiber.resume
      end
    end
  end

  private struct Yielder(T)
    def initialize(@enumerator : Enumerator(T))
    end

    def <<(value : T)
      @enumerator << value
      self
    end
  end
end

Results:

Iterator 333.58M (   3.0ns) (± 3.15%)  0 B/op        fastest
   Fiber  31.65M (  31.6ns) (± 4.48%)  0 B/op  10.54× slower
 Channel  19.21M ( 52.07ns) (± 5.16%)  0 B/op  17.37× slower

Let me know what you think!

asterite commented 6 years ago

Awesome! If it's just 10 times slower than an Iterator I think it's a really good candidate for the std.

By the way, this doesn't work as expected:

e = Enumerator(Int32).new do |y|
  y << 1
end

spawn do
  p e.next
end

Fiber.yield

The problem is that the current fiber is saved in initialized, but when calling next we are in another fiber. The solution is simple: set @current_fiber = Fiber.current upon entering next. I think that will always work, but I'm not sure.

I'd say let's go for this, if it's solid. Sometimes writing an Iterator is very boring, and the 10x slowness of some parts of the code will probably not be the bottleneck of a program, and this boosts productivity a lot.

felixbuenemann commented 6 years ago

Thanks for the explanation. I was wondering why you were setting @current_fiber inside #next.

I think I'll have some time this weekend to package it all up into a PR and add some polish to the specs.

felixbuenemann commented 6 years ago

I've updated the code above to set @current_fiber = Fiber.current inside #fetch_next so your example now works with both #peek and #next.

asterite commented 6 years ago

By the way, maybe a Deque is a bit too overkill for this. We can use a union type for @value, or maybe even uninitialized if we really need too. I'm also not sure what's the use of peek, or why we'd like to add it, given that Iterator doesn't have a peek method. We could always have a PeekIterator(T) type that peeks a value and stores it.

Then, rewind doesn't seem to work completely well. It works only if you already consumed the entire iterator. For example this is how it works in Ruby:

e = Enumerator.new do |y|
  y << 1
  y << 2
  y << 3
end

p e.next # => 1
p e.next # => 2

e.rewind

p e.next # => 1
p e.next # => 2

But I'm not sure how we would implement it. So maybe for now rewind can raise until we figure it out.

asterite commented 6 years ago

There's another problem. This code doesn't free the memory:

loop do
  e1 = Enumerator(Int32).new do |y|
    3.times do |i|
      y << i
    end
  end
  e1.next
end

Fibers are created and maintained in a stack. I think these particular fibers should not be placed in that stack, so when they are not referenced anymore they are GCed. The source code in fiber.cr should probably be modified for this...

felixbuenemann commented 6 years ago

Peek is useful, when you don't yet know what's the next value is and you want to do different stuff based on that. I agree that it would make sense to have this for all Iterators.

Regarding the rewind edge case: I think we can handle it by adding another flag that signals to the yielder that it should abort execution.

I didn't know that you can check if an instance variable is initialized. Can you post an example?

RX14 commented 6 years ago

The fiber linked list is required for the GC, you can't just remove a fiber from it: https://github.com/crystal-lang/crystal/blob/master/src/fiber.cr#L325

You need to do this with def finalize.

felixbuenemann commented 6 years ago

The leak described by @asterite is actually the same problem as #rewind when the block isn't @done.

I plan to fix it by raising a YielderStopped exception in Yielder#<< which is captured in the Fiber.new block. The method that stops the yielder could then also be called from Enumerator#finalize.

The user could still do something stupid like rescuing from that exception inside the yielder block, but we could at least raise an exception in that case, since it can be detected by checking if the yielder transitioned to done after being told to stop by the enumerator.

ysbaddaden commented 6 years ago

@RX14 But the GC will find the fiber reference in that linked list, thus never try to collect it, and will never invoke finalize. A fiber that may never finish will never be collected.

We can't just keep a list of active stack roots, instead of fibers, because the GC could then collect all fibers (unless I'm wrong?).

Maybe we need 2 lists: one for "attached" fibers (default), and one to keep the stack roots of "detached" fibers? Once the fiber is no longer referenced, the GC collects it and invokes its finalizer that should return the stack root back to the pool.

asterite commented 6 years ago

@ysbaddaden I think he meant a finalize on Enumerator: once the enumerator isn't referenced anymore, it will delete the Fiber and remove it from that linked list.

RX14 commented 6 years ago

You just need to make sure the fiber doesn't have any references to the enumerator, which is... difficult. You'll probably need to employ WeakRef(T)

asterite commented 6 years ago

Why would the fiber have a reference to the Enumerator?

felixbuenemann commented 6 years ago

Probably because the @yielder is passed to the @block that the @fiber wraps and the yielder has a reference to the @enumerator.

asterite commented 6 years ago

I'm also not sure what happens if you call an IO method inside Enumerator. That would make the fiber be rescheduled, not sure when and how will that resume. I did a few tests and it seems to work, but mixing cooperative and non-cooperative fibers will be tough.

RX14 commented 6 years ago

@asterite but all fibers are by definition cooperative because there's no method to preempt them. I'm not sure what you mean. I don't see much special about the way this fiber is used or scheduled.

asterite commented 6 years ago

@RX14 In Ruby, Fiber#resume stars/resumes a fiber, and Fiber.yield yields control back to the fiber that resumed that fiber. That's known as cooperative fibers. In Crystal it's different because there's no Fiber#resume. Well, there is, but it's for implementation purposes, you are not supposed to use it like that. And Fiber.yield just tries to find a fiber that can run, it's not resuming the one that called resume.

Anyway, Enumerator has indeed a reference to the fiber, which in turn has a reference to the enumerator through a closure, so finalize is never invoked. I'm not sure how it can be implemented, because the reference from fiber to enumerator happens through a closure, not something we can change to be WeakRef. Or maybe it needs to be a special Fiber, I don't know.

RX14 commented 6 years ago

the fiber has the reference to the yielder through the closure, the weakref should be from the yielder to the enumerator, surely?

And to me, anything that's not preemptible is cooperative. Making crystal's fibers cooperative, just not the same implementation as ruby. It's just a terminology thing though, I get what you mean.

asterite commented 6 years ago

@RX14 The fiber is in the linked list, so it can't be GCed. Then the fiber has a reference to the Enumerator through the closure, which can't be turned into a WeakRef. So I can't see how to implement this.

I guess it would be a nice to have, but it seems it's kind of impossible to implement this right now...

RX14 commented 6 years ago

@asterite why does the fiber have a reference to the enumerator and not just the yielder?

asterite commented 6 years ago

@RX14 This is part of Enumerator's code:

  private def run
    @stack.clear
    @done = false
    @fiber = Fiber.new do
      begin
        @block.call(@yielder.not_nil!)
      ensure
        @done = true
        @current_fiber.resume
      end
    end
  end

Here the fiber is accessing instance variable from Enumerator, which means self is being captured. self is Enumerator.

We could move all the instance vars to local variables (or maybe not, @done needs to be set to true, the current fiber must be resumed (and this is another leak, the fiber created by Enumerator never ends and never gets removed from the linked list!)), but still, @yielder has a reference to Enumerator, so the reference will still be there.

RX14 commented 6 years ago

we can proxy all these calls through @yielder, and use yielder = @yielder to make the only thing in the closure the yielder.

asterite commented 6 years ago

But yielder has a reference to Enumerator...

RX14 commented 6 years ago

@asterite thats where the weakref goes

asterite commented 6 years ago

@RX14 You are right!

I experimented with this a bit and I almost got it, but there's still something not working well.

The Enumertor code is this one:

require "weak_ref"

class Enumerator(T)
  include Iterator(T)

  @fiber : Fiber?

  def initialize(&@block : Yielder(T) ->)
    @current_fiber = Fiber.current
    @stack = Deque(T).new(1)
    @yielder = Yielder(T).new(self)
    run
  end

  def next
    fetch_next
    @stack.shift { stop }
  end

  def rewind
    run if @done
    self
  end

  def peek
    fetch_next
    if @stack.empty?
      stop
    else
      @stack[0]
    end
  end

  protected def <<(value : T)
    @stack.push(value)
    @current_fiber.resume
  end

  private def fetch_next : Nil
    @current_fiber = Fiber.current
    @done || @fiber.not_nil!.resume if @stack.empty?
  end

  private def run
    @stack.clear
    @done = false

    block = @block
    yielder = @yielder.not_nil!

    fiber = Fiber.new do
      block.call(yielder)
    end
    fiber.on_finish do
      yielder.done
    end
    @fiber = fiber
  end

  protected def done
    @done = true
    @current_fiber.resume
  end

  def finalize
    LibC.printf "finalize\n"
  end

  private struct Yielder(T)
    @enumerator : WeakRef(Enumerator(T))

    def initialize(enumerator : Enumerator(T))
      @enumerator = WeakRef.new(enumerator)
    end

    def <<(value : T)
      @enumerator.value.try &.<< value
      self
    end

    # :nodoc:
    def done
      @enumerator.value.try &.done
    end
  end
end

You'll also need this diff applied:

diff --git a/src/fiber.cr b/src/fiber.cr
index bc90d8790..7b2b16185 100644
--- a/src/fiber.cr
+++ b/src/fiber.cr
@@ -116,6 +116,9 @@ class Fiber
     end
   end

+  def on_finish(&@on_finish)
+  end
+
   def run
     @proc.call
   rescue ex
@@ -145,7 +148,11 @@ class Fiber
     # Delete the resume event if it was used by `yield` or `sleep`
     @resume_event.try &.free

-    Scheduler.reschedule
+    if on_finish = @on_finish
+      on_finish.call
+    else
+      Scheduler.reschedule
+    end
   end

   @[NoInline]

Now, if you run this:

puts "Running..."
loop do
  e1 = Enumerator(Int32).new do |y|
    y << 1
  end
end

you'll see a list of "finalize" being printed to the console. That means the Enumerator is being collected. The memory still grows because at that point we must somehow tell the Fiber to remove itself from the linked list, but that's another thing to do.

Now, if you run this:

puts "Running..."
loop do
  e1 = Enumerator(Int32).new do |y|
    y << 1
  end
  e1.next
  e1.next
end

it also works: a list of "finalize" get printed, and this time memory doesn't grow because the fiber ends.

But if you run this:

puts "Running..."
loop do
  e1 = Enumerator(Int32).new do |y|
    y << 1
  end
  e1.next
end

then "finalize" never gets printed... so for some reason someone is still holding a reference to Enumerator. I can't understand why starting the fiber and interrupting it in the middle causes this.

So someone will have to figure those two things, but we are closer :-)

asterite commented 6 years ago

I found the reason!

It's because Yielder#<< is doing:

@enumerator.try &.value.<< value

and that will eventually switch to the caller fiber. But try will put enumerator in a local variable, so it will be in the fiber's stack, and this is a strong reference (past the weak ref). The solution is not to keep the enumerator in the stack:

require "weak_ref"

class Enumerator(T)
  include Iterator(T)

  @fiber : Fiber?

  def initialize(&@block : Yielder(T) ->)
    @current_fiber = Fiber.current
    @stack = Deque(T).new(1)
    @yielder = Yielder(T).new(self)
    run
  end

  def current_fiber
    @current_fiber.not_nil!
  end

  def next
    fetch_next
    @stack.shift { stop }
  end

  def rewind
    run if @done
    self
  end

  def peek
    fetch_next
    if @stack.empty?
      stop
    else
      @stack[0]
    end
  end

  protected def <<(value : T)
    @stack.push(value)
  end

  private def fetch_next : Nil
    @current_fiber = Fiber.current
    @done || @fiber.not_nil!.resume if @stack.empty?
  end

  private def run
    @stack.clear
    @done = false

    block = @block
    yielder = @yielder.not_nil!

    fiber = Fiber.new do
      block.call(yielder)
    end
    fiber.on_finish do
      yielder.done
    end
    @fiber = fiber
  end

  protected def done
    @done = true
    @current_fiber.resume
  end

  def finalize
    LibC.printf "finalize\n"
  end

  private struct Yielder(T)
    @enumerator : WeakRef(Enumerator(T))

    def initialize(enumerator : Enumerator(T))
      @enumerator = WeakRef.new(enumerator)
    end

    def <<(value : T)
      # Try to do it without keep a reference to enumerator here
      pass_to_enumerator(value)
      current_fiber = get_current_fiber
      current_fiber.try &.resume
      self
    end

    def pass_to_enumerator(value : T)
      @enumerator.value.try &.<< value
    end

    def get_current_fiber
      @enumerator.value.try &.current_fiber
    end

    # :nodoc:
    def done
      @enumerator.value.try &.done
    end
  end
end

Now I just need to kill a fiber on finalize. Of course this can't be done right there, because it might allocate memory (the fiber is put back in a stack pool), but I bet it can be done in that event we have every 5 seconds to clean the stack pool.

asterite commented 6 years ago

Okay, got it :-)

The code is a bit ugly, but I did it like that to try it out. The infinite loop above never releases memory because it will never give a change for the scheduler to run the clean-up fiber, but that's also what happens when you do loop { spawn { } } so it's not a problem, real programs don't look like this and will probably give control back to the scheduler. So my example now is this:

loop do
  e = Enumerator(Int32).new do |y|
    y << 1
  end
  e.next
  sleep 0.001
end

memory will get high at first but then it stabilizes around 20~40 MB.

Here's the diff. It's a bit ugly that it requires modifying Fiber... maybe there's another way to implement it.

diff --git a/src/enumerator.cr b/src/enumerator.cr
new file mode 100644
index 000000000..2884f0f10
--- /dev/null
+++ b/src/enumerator.cr
@@ -0,0 +1,100 @@
+require "weak_ref"
+
+class Enumerator(T)
+  include Iterator(T)
+
+  @fiber : Fiber?
+
+  def initialize(&@block : Yielder(T) ->)
+    @current_fiber = Fiber.current
+    @stack = Deque(T).new(1)
+    @yielder = Yielder(T).new(self)
+    run
+  end
+
+  def current_fiber
+    @current_fiber.not_nil!
+  end
+
+  def next
+    fetch_next
+    @stack.shift { stop }
+  end
+
+  def rewind
+    @fiber.not_nil!.kill
+    run
+    self
+  end
+
+  def peek
+    fetch_next
+    if @stack.empty?
+      stop
+    else
+      @stack[0]
+    end
+  end
+
+  protected def <<(value : T)
+    @stack.push(value)
+  end
+
+  private def fetch_next : Nil
+    @current_fiber = Fiber.current
+    @done || @fiber.not_nil!.resume if @stack.empty?
+  end
+
+  private def run
+    @stack.clear
+    @done = false
+
+    block = @block
+    yielder = @yielder.not_nil!
+
+    fiber = Fiber.new do
+      block.call(yielder)
+    end
+    fiber.on_finish do
+      yielder.done
+    end
+    @fiber = fiber
+  end
+
+  protected def done
+    @done = true
+    @current_fiber.resume
+  end
+
+  def finalize
+    @fiber.not_nil!.kill
+  end
+
+  private struct Yielder(T)
+    @enumerator : WeakRef(Enumerator(T))
+
+    def initialize(enumerator : Enumerator(T))
+      @enumerator = WeakRef.new(enumerator)
+    end
+
+    def <<(value : T)
+      pass_to_enumerator(value)
+      current_fiber = get_current_fiber
+      current_fiber.try &.resume
+      self
+    end
+
+    def pass_to_enumerator(value : T)
+      @enumerator.value.try &.<< value
+    end
+
+    def get_current_fiber
+      @enumerator.value.try &.current_fiber
+    end
+
+    # :nodoc:
+    def done
+      @enumerator.value.try &.done
+    end
+  end
+end
diff --git a/src/fiber.cr b/src/fiber.cr
index bc90d8790..6ecfb0f82 100644
--- a/src/fiber.cr
+++ b/src/fiber.cr
@@ -22,6 +22,8 @@ class Fiber
   protected property next_fiber : Fiber?
   protected property prev_fiber : Fiber?
   property name : String?
+  @must_die = false
+  @@fibers_that_must_die = 0

   def initialize(@name : String? = nil, &@proc : ->)
     @stack = Fiber.allocate_stack
@@ -116,6 +118,27 @@ class Fiber
     end
   end

+  def self.remove_dead
+    return if @@fibers_that_must_die == 0
+
+    fiber = @@first_fiber
+    while fiber
+      if fiber.@must_die
+        fiber.die
+        @@fibers_that_must_die -= 1
+      end
+      fiber = fiber.next_fiber
+    end
+  end
+
+  def on_finish(&@on_finish)
+  end
+
+  def kill
+    @must_die = true
+    @@fibers_that_must_die += 1
+  end
+
   def run
     @proc.call
   rescue ex
@@ -127,6 +150,16 @@ class Fiber
     ex.inspect_with_backtrace STDERR
     STDERR.flush
   ensure
+    die
+
+    if on_finish = @on_finish
+      on_finish.call
+    else
+      Scheduler.reschedule
+    end
+  end
+
+  def die
     @@stack_pool << @stack

     # Remove the current fiber from the linked list
@@ -144,8 +177,6 @@ class Fiber

     # Delete the resume event if it was used by `yield` or `sleep`
     @resume_event.try &.free
-
-    Scheduler.reschedule
   end

   @[NoInline]
diff --git a/src/kernel.cr b/src/kernel.cr
index 8ce3509ca..17aaab9ff 100644
--- a/src/kernel.cr
+++ b/src/kernel.cr
@@ -473,6 +473,7 @@ end
   spawn do
     loop do
       sleep 5
+      Fiber.remove_dead
       Fiber.stack_pool_collect
     end
   end
asterite commented 6 years ago

Oh, and I forgot to say that the above diff also makes rewind work well now :-)

felixbuenemann commented 6 years ago

Cool that you figured it out! 🎉

My only other idea was to raise an exception from the << method in the yielder causing the code in the block to blow up and rescuing from that exception in the wrapping fiber block.

I'm not sure I fully understand your changes. Does killing the fiber interrupt it mid-execution or does it still complete the block and killing it only marks it for garbage collection?

asterite commented 6 years ago

Killing the fiber will make it so that later (there's a fiber that runs every 5 seconds) it will get deleted... so yeah, it is stopped at the middle of execution.

felixbuenemann commented 6 years ago

I benchmarked the updated version and performance regressed a bit due to the added complexity:

  Iterator 332.06M (  3.01ns) (± 3.15%)  0 B/op        fastest
Enumerator  26.09M ( 38.34ns) (± 4.39%)  0 B/op  12.73× slower

I don't think it really matters, since these were microbenchmarks with absolutely minimal load in the enumerator block and in most use cases where an enumerator is needed instead of an iterator that overhead is very minimal compared to the computation in the block.

On my machine (Broadwell i7-5557U 3.1GHz) 10,000,000 iterations with minimal load take only 400ms.

straight-shoota commented 6 years ago

@asterite looks awesome!

But simply unscheduling a fiber won't unwind it's stack, so it can't release its resources (file descriptors etc.).

RX14 commented 6 years ago

If it ends up that close to the performance of a channel, why not just use a channel?

felixbuenemann commented 6 years ago

I had the same thought yesterday – it would be interesting how it handles all the edge cases of the fiber implementation.

asterite commented 6 years ago

@straight-shoota I don't follow

Enumerator can't be implemented with channels and spawn. If you do that, the scheduler might start running the fiber before you call next, which is not desired behavior (I tried it, and of course that's what happens). The fiber must only run when you invoke next.

felixbuenemann commented 6 years ago

Good point, I didn't even think about that.

I'll add a spec to make sure that doesn't happen.

felixbuenemann commented 6 years ago

To make discussion of the implementation easier, I pushed a first version in #6385.

Let me know what you think!

I would also be grateful for a short example that can only be implemented with Enumerator.

I tried to describe the use cases for both Iterator and Enumerator as best as I can, but the current Fibonacci generator example is not so great, since it can also be implemented with Iterator.

asterite commented 6 years ago

@felixbuenemann There's no such example. Anything can be implemented with Iterator, and without needing channels at all. That's what C# does with IEnumerable and automatic rewriting of yield. So Enumerator is just a convenience when you don't want to write that Iterator. So... I'm not sure about the documentation you used for Enumerator. Channels, blocking, etc., shouldn't be mentioned at all. It should just mentioned that it uses a fiber under the hood, and context switching, which is the reason of the slowness.

straight-shoota commented 6 years ago

@asterite With your patch, this ensure block won't execute:

loop do
  e = Enumerator(Int32).new do |y|
    y << 1
  ensure
    puts "ensure"
  end
  e.next
  sleep 0.001
end

Such ensures are often used for cleaning up open resource handled, such as closing files, network connections etc.

The enumerating fiber shouldn't just be terminated without unwinding it's stack. It should work similarly like an exception was raised in Yielder#<< if the fiber is cancelled.

asterite commented 6 years ago

@straight-shoota I just tried it in Ruby, and the ensure is never called either. I don't think what you want can be implemented (otherwise, why Ruby doesn't implement it?)

For reference:

loop do
  e = Enumerator.new do |y|
    begin
      y << 1
    ensure
      puts "BYE"
    end
  end
  e.next
end

Or:

loop do
  e = Enumerator.new do |y|
    begin
      y << 1
    ensure
      puts "BYE"
    end
  end
end