snivilised / lorax

🌟 reactive extensions for go (a pseudo rxgo version 3)
MIT License
4 stars 0 forks source link

monitor output channel and shutdown if not consumed #263

Closed plastikfan closed 3 months ago

plastikfan commented 3 months ago

This is a continuation of item #68

Instead of canceling the parent context, we can use a separate channel to signal the worker pool to gracefully shut down when the output channel becomes full. Here's how it could work:

Here's some pseudocode to illustrate this approach:

func workerPool(parentCtx context.Context, jobs []Job, outputChan chan Result) error {
    ctx, cancel := context.WithCancel(parentCtx)
    defer cancel()

    shutdownChan := make(chan struct{})
    defer close(shutdownChan)

    // Start the monitoring goroutine
    go monitorOutputChannel(ctx, outputChan, shutdownChan)

    // Start the worker pool and workers
    // ...

    // Wait for either a shutdown signal or the parent context to be canceled
    select {
    case <-shutdownChan:
        // Output channel is full, gracefully shut down the worker pool
        // and return an error to the client
        return fmt.Errorf("no consumer is reading from the channel")
    case <-ctx.Done():
        // Parent context was canceled, gracefully shut down the worker pool
        return ctx.Err()
    }
}

func monitorOutputChannel(ctx context.Context, outputChan chan Result, shutdownChan chan<- struct{}) {
    for {
        select {
        case <-ctx.Done():
            return
        default:
            if len(outputChan) == cap(outputChan) {
                // Output channel is full, signal the worker pool to shut down
                shutdownChan <- struct{}{}
                return
            }
            // Sleep for a short duration before checking again
            time.Sleep(100 * time.Millisecond)
        }
    }
}

In this approach, the monitorOutputChannel goroutine sends a signal to the shutdownChan if the output channel becomes full. The workerPool function listens on both the shutdownChan and the parent context's Done() channel. If a signal is received on shutdownChan, it gracefully shuts down the worker pool and returns an error to the client indicating that no consumer is reading from the channel.

This solution doesn't require passing a cancel function to a child goroutine, nor does it involve a child goroutine canceling the parent context. The worker pool itself is responsible for gracefully shutting down based on the signals it receives from the monitoring goroutine or the parent context.

By using this approach, you can gracefully shut down the worker pool and provide a helpful error message to the client if they forget to define a consumer for the output channel, without violating the principle of not allowing child goroutines to cancel the parent context.

plastikfan commented 3 months ago

I have just discovered another library that has extensive battle hardened worker pool functionality: ants, so it would probably be best to abandon the functionality in boost.

The problem with the boost worker pool was that it was based on the idea that it is ok to cancel a parent context from a child, which I have just discovered is not a good practice, so all existing code needs to be junked.

Instead of starting from scratch it would be better to use a package that is already well tested and already in production. Currrently I'm not 100% sure that ants will provide all the features I require, but the intention will be to create a new sub-package in lorax (awe) which is a facade wrapper around ants. All snivilised projects will not use ants directly, only lorax will.

Also see:

plastikfan commented 3 months ago

Abandoned