Araq / malebolgia

Malebolgia creates new spawns. Structured concurrency. Thread pools and related APIs.
MIT License
104 stars 10 forks source link

Control of non-blocking threads on the background (start and cancel) #6

Closed demetera closed 1 year ago

demetera commented 1 year ago

Any ideas to add support for spawning of children threads with non-blocking main thread? Currently implementation reminds me async, which blocks main thread with awaitAll.

With using threadpool I could send message to child thread and break out of the endless loop using Channels. For example, recording audio/video stream or endlessly perform periodical checks on the background.

I've managed to spawn thread without awaitAll but I can't stop it with abort() or panicStop() apparently because of joinThreads.

Araq commented 1 year ago

Start a background thread with createThread and inside that you use the master and the structured concurrency? Not sure what you're after.

demetera commented 1 year ago

I meant start (spawn) the thread, which runs, until it stopped manually from the main process. awaitAll blocking main thread currently

Araq commented 1 year ago

Well that's the point of structured concurrency. awaitAll doesn't block, use it like so:


m.awaitAll:
  m.spawn backgroundThread()
  mainThreadLogicHere()
  if cond: m.abort()
demetera commented 1 year ago

In this piece of code awaitAll blocking user's input. Without awaitAll it's not blocking and thread spawns, but abort proc doesn't trigger.

while true:
  stdout.write "Enter command: "
  let userInput = readLine(stdin)
  case userInput:
    of "start":
      m.awaitAll:
        m.spawn worker()
        if cond: m.abort()
    of "stop":
      echo "Trying to stop"
      cond = true

That's was my question, whether it's possible to implement this kind of approach in handling threads in malebolgia?

Araq commented 1 year ago

What about:


var alreadyStarted = false
m.awaitAll:
  while true:
    stdout.write "Enter command: "
    let userInput = readLine(stdin)
    case userInput:
      of "start":
        if alreadyStarted:
          echo "already started"
        else:
          m.spawn worker()
          alreadyStarted = true
      of "stop":
        echo "Trying to stop"
        m.abort()
demetera commented 1 year ago

Thank you! Unfortunately, "stop" doesn't react. I'm pretty sure, I'm wrong here cause I'm rookie, but trying to get "the rhythm"

import std/os
import malebolgia

proc worker()=
  while true:
    echo "Triggered"
    os.sleep(1000)

var m: Master = createMaster()

var alreadyStarted = false
m.awaitAll:
  while true:
    stdout.write "Enter command: "
    let userInput = readLine(stdin)
    case userInput:
      of "start":
        if alreadyStarted:
          echo "already started"
        else:
          m.spawn worker()
          alreadyStarted = true
      of "stop":
        echo "Trying to stop"
        m.abort()
Araq commented 1 year ago

Ah, this cannot work as you only have a single worker spawned which ignores the abort command. Needs an API extension.

demetera commented 1 year ago

Thanks a lot for your feedback. If possible, consider this to implement.

Araq commented 1 year ago

Too simple not to do it.

demetera commented 1 year ago

Very nice. Thanks! This is (controlling 2 independent threads) also working!

import std/os
import malebolgia

proc doer1(w: ptr Master)=
  while not cancelled(w):
    echo "1 Triggered"
    os.sleep(1000)

proc doer2(w: ptr Master)=
  while not cancelled(w):
    echo "2 Triggered"
    os.sleep(2000)

var m1: Master = createMaster()
var m2: Master = createMaster()

var alreadyStarted1 = false
var alreadyStarted2 = false

m1.awaitAll:
  m2.awaitAll:
    while true:
        stdout.write "Enter a command: "
        let userInput = readLine(stdin)
        case userInput
        of "start1":
          if alreadyStarted1:
            echo "1 already started"
          else:
            m1.spawn doer1(addr m1) # Starting doer1
            alreadyStarted1 = true
        of "start2":
          if alreadyStarted2:
            echo "2 already started"
          else:
            m2.spawn doer2(addr m2) # Starting doer2
            alreadyStarted2 = true
        of "stop1":
          echo "Trying to stop doer 1 "
          m1.cancel() # Stopping doer1
        of "stop2":
          echo "Trying to stop doer 2"
          m2.cancel() # Stopping doer2
        of "quit": break
Araq commented 1 year ago

Great, but use the new MasterHandle API for it.