crystal-lang / crystal

The Crystal Programming Language
https://crystal-lang.org
Apache License 2.0
19.22k stars 1.61k forks source link

Gracefully stopping fibers #3561

Open Papierkorb opened 7 years ago

Papierkorb commented 7 years ago

Hi,

While working on the torrent shard, I came across the need to tell fibers to stop running forever regularly. Often fixed through other means, like a running bool, I came across the following use-case where this is not easily doable:

  1. Have a sorted list of requests to be made
  2. Spawn a fixed amount of fibers
  3. Each fiber takes out a request from the list. If the list is empty, stop the fiber. If all are stopped, break.
  4. Do the requests. Each response may yield either new requests (Which are then added to the list), OR the result.
  5. If a result was found, stop all fibers and return it. Else, each fiber goes 3.
  6. After each fiber ran a request, another "control" fiber gets sent (through a Channel) the most promising request from each fiber. Now it checks if the most promising request from these requests is better than the previously known best. If it is not, break with no result.

The control fiber is basically doing: fiber_count.times{ candidates.receive }

The issue is, if the result is found, or the list is empty, the control fiber will now wait in the loop above forever, which would basically leak it. Another approach would be having yet another Channel which is monitored by the control fiber (Through select), and if received, halts the control fiber.

However, I'd love to have a primitive which lets me send some kind of message to a Fiber, any Fiber, to please stop itself.

A solution employed by languages like Java is to inject an exception (java.lang.Thread.interrupt()). This is quite nice, as properly implemented Fibers would then release resources through ensure/rescue. It would also kick a sleeping/waiting fiber out of a blocking call like Channel#receive or IO::FileDescriptor#read.

A second solution could be a Fiber#close method, which sets a flag which is checked in Fiber#resume, and then runs all ensure blocks.

Ideas?

RX14 commented 7 years ago

My first impression would be to assume that the fiber in question is not currently running when we issue the stop request (as is always the case for singlethreaded) and simply attempt to stop it from resuming. Then the problem comes down to 2 things:

andrewhamon commented 7 years ago

Golang has a lot of good resources on concurrency patterns that you might be interested in reading. I believe the semantics of gouroutines and channels are similar enough to crystal to still apply.

Check out the pipelining blog post which I think would work well for you application. It also talks about cancelation.

DougEverly commented 7 years ago

You can send a sentinel value for each fiber on a channel and break out from there.

The done method in the example signals to the fiber to exit next time it reads from the channel.

enum Control
    Done
end

class Worker(T)
    def initialize
        @channel = Channel(T|Control).new(10)
        @done = Channel(Control).new
    end

    def add(item : T)
        @channel.send(item)
    end

    def run
        spawn {
            loop do
                item = @channel.receive
                case item
                when T
                    puts "I got #{item}"
                when Control
                    @done.send(item)
                    break
                end
            end
        }
    end

    def done
        @channel.send(Control::Done)
    end

    def join
        puts "waiting"
        @done.receive
    end
end

w = Worker(Int32).new
w.run
w.add 2
w.add 1
w.add 3
w.done
w.join
puts "Done"
HCLarsen commented 2 years ago

I know this is an old discussions, but I came across a need to stop a fibre externally from another fibre while working on chronos. I've found a way around it, but it still illustrates a need for this functionality.

In my use case, I sent a @fiber.resume call from one fibre. As the Fiber documentation states, the scheduler doesn't go back to the fibre that you make the resume call from, leaving it sitting there as a memory leak, unless you explicitly return to it with @other_fibre.enqueue. Being able to call other_fibre.cancel would certainly make this easier.

yxhuvud commented 2 years ago

@HCLarsen resume assumes there is already a return path to the originating fiber set up. In practice that means an enqueue, event loop interaction or channel waiting set up, and while resume is public there will be rare that there are any need to call it directly.

Is there any reason that it would not be good enough to do @fiber.enqueue instead and then destroying the source fiber by reaching the end of execution code (using return or whatever), as normal?

That said, your problem statement is simpler than what this issue is about - what you are asking about is destruction of a fiber that you know is not only not executing but also not going to be woken up by the event loop or whatever. To do that what would be needed is essentially to split Fiber#run into #run and #cleanup (and then cleanup would be what you are asking for). Basically Fiber#cleanup would be something like https://github.com/yxhuvud/nested_scheduler/blob/main/src/nested_scheduler/monkeypatch/fiber.cr#L24 but without line 27.

HCLarsen commented 2 years ago

@yxhuvud that does seem moot, since I've already solved my issue.

I merely shared it to show another use case for a Fiber#close method.

stakach commented 1 year ago

@caspiano should have a crack at this issue