rosedblabs / rosedb

Lightweight, fast and reliable key/value storage engine based on Bitcask.
https://rosedblabs.github.io
Apache License 2.0
4.48k stars 620 forks source link

panic: send on closed channel #283

Closed microyahoo closed 7 months ago

microyahoo commented 10 months ago
~/go/src/deeproute.ai/rosedb (main ✗) go run examples/watch/main.go
The db is closed, so the watch channel is closed.
panic: send on closed channel

goroutine 6 [running]:
github.com/rosedblabs/rosedb/v2.(*Watcher).sendEvent(0x0?, 0x0?)
    /root/go/src/deeproute.ai/rosedb/watch.go:70 +0x45
created by github.com/rosedblabs/rosedb/v2.Open
    /root/go/src/deeproute.ai/rosedb/db.go:123 +0x5cd
exit status 2

How to reproduce:

package main

import (
    "fmt"
    "time"

    "github.com/rosedblabs/rosedb/v2"
    "github.com/rosedblabs/rosedb/v2/utils"
)

// this file shows how to use the Watch feature of rosedb.

func main() {
    // specify the options
    options := rosedb.DefaultOptions
    options.DirPath = "/tmp/rosedb_watch"
    options.WatchQueueSize = 1000

    // open a database
    db, err := rosedb.Open(options)
    if err != nil {
        panic(err)
    }
    // defer func() { // <- comment it
    //  _ = db.Close()
    // }()

    // run a new goroutine to handle db event.
    go func() {
        eventCh, err := db.Watch()
        if err != nil {
            return
        }
        for {
            event := <-eventCh
            // when db closed, the event will receive nil.
            if event == nil {
                fmt.Println("The db is closed, so the watch channel is closed.")
                return
            }
            // events can be captured here for processing
            fmt.Printf("Get a new event: key%s \n", event.Key)
        }
    }()

    // write some data
    for i := 0; i < 10; i++ {
        _ = db.Put(utils.GetTestKey(i), utils.RandomValue(64))
    }
    // delete some data
    for i := 0; i < 10/2; i++ {
        _ = db.Delete(utils.GetTestKey(i))
    }

    _ = db.Close() // <- close db

    // wait for watch goroutine to finish.
    time.Sleep(1 * time.Second)
}
microyahoo commented 10 months ago

The PR https://github.com/rosedblabs/rosedb/pull/252/files mainly refactors from the following three aspects.

  1. fix potential panic: send on closed channel
  2. using wait group to wait for goroutine in example
  3. refactor event watch

Add Watcher interface, the interface mainly refers to etcd WatchStream and watchStream, it contains two methods, Watch and Close.

type Watcher interface {
    Watch() WatchChan
    Close() error
}

The watcher struct implements Watcher interface like follows.

type watcher struct {
    queue   eventQueue
    mu      sync.RWMutex
    watchCh chan *Event // new added, which is event channels. Watch() will return it.

    cond     *sync.Cond
    capacity uint64 // new added, which is used to record how many events currently, and does not exceed the capacity of the event queue

    closeMu    sync.Mutex
    notifyCh   chan struct{} // it is a notification channel, which means watch channel has been closed.
    cancelFunc func() // Close() will invoke it
}

@roseduan