gleam-lang / erlang

🐙 Extra code for when running on Erlang
Apache License 2.0
85 stars 35 forks source link

Adding possibility to remove a ProcessDown monitor from a selector #58

Closed karlsson closed 4 days ago

karlsson commented 1 week ago

Hi,

One can add process down messages from monitored processes to a selector with process.selecting_process_down(selector, monitor, fn(proc_down){...}) but there is no way to remove it. If you start temporary worker processes, upon incoming requests, that you monitor and add to the selector it will grow all the time. Would it be good to have a process.selecting_process_down_remove(selector, monitor) as well? Since you also can demonitor the process it could maybe be good for that case too.

I am not sure this is a common case and maybe it can be solved in some other way avoiding the problem. As an example:

import gleam/erlang
import gleam/erlang/process.{type Subject}
import gleam/function
import gleam/int
import gleam/io
import gleam/list

// -------- Messages ----------
type StartReq {
  Start(index: Int)
  SomethingFromWorker(WorkerReq)
  Down(monitor: process.ProcessMonitor, index: Int)
}

type WorkerReq {
  VeryImportantMessage(mess: String, index: Int)
}

type Manager {
  StartSubject(service_subject: Subject(StartReq))
}

// -------------------------------

pub fn main() {
  let manager_subject = process.new_subject()

  process.start(fn() { service(manager_subject) }, True)
  let assert Ok(StartSubject(sss)) = process.receive(manager_subject, 1000)

  list.map(list.range(1, 5), fn(x) {
    process.start(fn() { client(sss, x, 10) }, True)
    process.sleep(1000)
  })
  process.sleep(20_000)
}

fn service(manager_subject) {
  let start_subject: Subject(StartReq) = process.new_subject()
  process.send(manager_subject, StartSubject(start_subject))
  let worker_subject: Subject(WorkerReq) = process.new_subject()

  let selector =
    process.new_selector()
    |> process.selecting(start_subject, function.identity)
    |> process.selecting(worker_subject, fn(a) { SomethingFromWorker(a) })
  loop(selector, worker_subject)
}

fn loop(selector, worker_subject) {
  io.println(erlang.format(selector))
  case selector |> process.select_forever {
    Start(index) -> {
      io.println(int.to_string(index) <> ": Service starts worker")

      let monitor =
        process.start(fn() { worker(worker_subject, index) }, False)
        |> process.monitor_process

      selector
      |> process.selecting_process_down(monitor, fn(procdown) {
        let process.ProcessDown(pid, _) = procdown
        io.println(
          int.to_string(index) <> ": ProcessDown from " <> erlang.format(pid),
        )
        Down(monitor, index)
      })
      |> loop(worker_subject)
    }

    SomethingFromWorker(VeryImportantMessage(mess, index)) -> {
      io.println(int.to_string(index) <> ": NOTICE - " <> mess)
      loop(selector, worker_subject)
    }
    // Remove monitor from selector whenever process is down
    // otherwise will it grow by one item for every new process.
    Down(monitor, index) -> {
      io.println(int.to_string(index) <> ": Got Down")
      selector
      |> process.selecting_process_down_remove(monitor)
      |> loop(worker_subject)
    }
  }
}

fn worker(worker_subject, index) {
  process.sleep(1200)
  process.send(
    worker_subject,
    VeryImportantMessage("Worker sent: Hey there!", index),
  )
}

// ------- Client -----------
fn client(service: Subject(StartReq), index: Int, n: Int) {
  case n > 0 {
    True -> {
      io.println(int.to_string(index) <> ": Client sends start")
      process.send(service, Start(index))
      process.sleep(2000)
      client(service, index, n - 1)
    }
    False -> Nil
  }
}

Adding to process.gleam:

pub fn selecting_process_down_remove(
  selector: Selector(payload),
  monitor: ProcessMonitor,
) -> Selector(payload) {
  remove_selector_handler(selector, monitor.tag)
}

@external(erlang, "gleam_erlang_ffi", "remove_selector_handler")
fn remove_selector_handler(
  a: Selector(payload),
  for for: tag,
) -> Selector(payload)

And to gleam_erlang_ffi.erl:

remove_selector_handler({selector, Handlers}, Tag) ->
        {selector, maps:remove(Tag, Handlers)}.
lpil commented 1 week ago

Sounds like a useful capability to have!

karlsson commented 1 week ago

Yes, doesn't it!? :smiley: That said, I do appreciate gleams restrictive approach to pull in "features", but I think you can argue that this is "almost" necessary... For the naming I guess that a function name remove_selecting_process_down is more natural, but if the functions are presented in alphabetical order in the documentation selecting_process_down_remove will be better placed. I can make a pull request, let me know if you prefer the other name of the function.

lpil commented 1 week ago

I'm not entirely sure about the name, but we can bike-shed on that one later!

karlsson commented 4 days ago

I made a pull request and took the liberty to use yet another function name, so that you can have more options to choose between when bike-shedding! Let me know if I missed something else in the PR.

lpil commented 4 days ago

Love the name!