defenseunicorns / pkg

Common Go modules maintained by Defense Unicorns
Apache License 2.0
5 stars 4 forks source link

Indefinite Blocking in WaitWithProgress Due to Insufficient Message Count #62

Closed naveensrinivasan closed 3 months ago

naveensrinivasan commented 7 months ago

Environment

Device and OS: App version: Kubernetes distro being used: Other:

Steps to reproduce

The WaitWithProgress function in our ConcurrencyTools struct has a potential issue where it can be blocked indefinitely if the number of messages sent to either ProgressChan or ErrorChan is less than routineCount. This situation arises when, for example, only one progress update is sent, but the function expects two based on the routineCount.

package main

import (
    "context"
    "fmt"
)

type ConcurrencyTools[P any, E any] struct {
    ProgressChan chan P
    ErrorChan    chan E
    context      context.Context
    Cancel       context.CancelFunc
    routineCount int
}

func NewConcurrencyTools[P any, E any](length int) *ConcurrencyTools[P, E] {
    ctx, cancel := context.WithCancel(context.Background())
    progressChan := make(chan P, length)
    errorChan := make(chan E, length)

    return &ConcurrencyTools[P, E]{
        ProgressChan: progressChan,
        ErrorChan:    errorChan,
        context:      ctx,
        Cancel:       cancel,
        routineCount: length,
    }
}

func (ct *ConcurrencyTools[P, E]) WaitWithProgress(onProgress func(P, int), onError func(E) error) error {
    for i := 0; i < ct.routineCount; i++ {
        select {
        case err := <-ct.ErrorChan:
            ct.Cancel()
            return onError(err)
        case progress := <-ct.ProgressChan:
            onProgress(progress, i)
        }
    }
    return nil
}

func main() {
    ct := NewConcurrencyTools[string, string](2) // Expecting 2 messages, but will only send 1

    go func() {
        ct.ProgressChan <- "50% complete"
    }()

    onProgress := func(progress string, index int) {
        fmt.Printf("Progress at index %d: %s\n", index, progress)
    }

    onError := func(err string) error {
        fmt.Println("Error:", err)
        return fmt.Errorf(err)
    }

    err := ct.WaitWithProgress(onProgress, onError)
    if err != nil {
        fmt.Println("Finished with error:", err)
    } else {
        fmt.Println("Finished successfully")
    }
}

Expected Behavior

The WaitWithProgress function should not block indefinitely and should have a mechanism to handle situations where fewer messages are sent than expected.

Actual Behavior

When fewer messages are sent than routineCount, the WaitWithProgress function blocks indefinitely, waiting for messages that will never arrive.

Possible Solution

A potential solution could involve implementing a timeout or cancellation mechanism that allows the function to exit gracefully when it becomes apparent that not all expected messages will be received.

Steps to Reproduce

  1. Initialize ConcurrencyTools with a routineCount of 2.
  2. Send only one progress update message.
  3. Observe that the WaitWithProgress function blocks indefinitely.

    Visual Proof (screenshots, videos, text, etc)

Severity/Priority

Additional Context

Add any other context or screenshots about the technical debt here.

naveensrinivasan commented 7 months ago

I want to help address this issue.

Noxsios commented 3 months ago

Closed by #70