FunctionStream / function-stream

Function Stream - Event-streaming function platform based on Apache Pulsar and WebAssembly
Apache License 2.0
39 stars 16 forks source link

Multiple closes on the same result channle in MemoryQueueFactory #150

Closed wyyolo closed 6 months ago

wyyolo commented 6 months ago

Why put "defer close (result)" inside the goroutine? This may lead to multiple goroutines attempting to close the result, causing panic. https://github.com/FunctionStream/function-stream/blob/main/fs/contube/memory.go#L80 Perhaps it can be managed through "sync. WaitGroup", for example:

func (f *MemoryQueueFactory) NewSourceTube(ctx context.Context, configMap ConfigMap) (<-chan Record, error) {  
    config := NewSourceQueueConfig(configMap)  
    result := make(chan Record)  
    var wg sync.WaitGroup  
    for _, topic := range config.Topics {  
        wg.Add(1)   
        t := topic  
        go func() {  
            <-ctx.Done()      
            f.release(t)      
        }()  

        go func() {  
            defer wg.Done()   
            c := f.getOrCreateChan(t)  
            for {  
                select {  
                case <-ctx.Done():  
                    return   
                case event := <-c:  
                    result <- event   
                }  
            }  
        }()  
    }  

    go func() {  
        wg.Wait() 
        close(result) 
    }()  

    return result, nil  
}
RobertIndie commented 6 months ago

Thanks for reporting. This should be a bug. Using WaitGroup looks good to me.

Would you like to fix this issue?

wyyolo commented 6 months ago

Of course, I will fix this bug.