joncrlsn / dque

dque is a fast, embedded, durable queue for Go
MIT License
767 stars 46 forks source link

invalid memory address or nil pointer dereference on dequeue #22

Open dllz opened 4 years ago

dllz commented 4 years ago

Hey, just started using this library and I have run into an issue dequeuing a queue.


[signal SIGSEGV: segmentation violation code=0x1 addr=0x58 pc=0x181f8b1]

goroutine 10 [running]:
sync.(*Mutex).Lock(...)
    /usr/local/Cellar/go/1.13.6/libexec/src/sync/mutex.go:74
github.com/joncrlsn/dque.(*DQue).Dequeue(0x0, 0x0, 0x0, 0x0, 0x0)
    /Users/dlotz/go/pkg/mod/github.com/joncrlsn/dque@v0.0.0-20200308203223-fe4f81ffd1dc/queue.go:240 +0x51

I am not sure if you have any thoughts on the cause of this, my googling does not seem to turn anything up.  
joncrlsn commented 4 years ago

Hi, is this something you can recreate at will? That would help.

@Kriechi

dllz commented 4 years ago

I can only get this error, I do not seem able to successfully dequeue. The program starts up, all the queues are created. Then I have set up a blocking pop on the queue along the lines of:

Try dequeue, if nothing in the queue sleep for 0.5 seconds and then try dequeue again. Repeat till you can get something off the queue.

And I immediately get that error and the program crashes

joncrlsn commented 4 years ago

Until we get this figured out, can you use the v2.1 tag? I'm at work today and won't be able to try until later this evening.

dllz commented 4 years ago

No problem, if it helps debuging I am using go 1.14 but I downgraded to 1.12 and the issue persisted

Kriechi commented 4 years ago

@dllz would you mind sharing a snippet of your code? How do you open the queue? Maybe you even got a minimal reproducible example that shows the error? Or is it deep-down in your application?

dllz commented 4 years ago

Happy to share some, I tried a bunch of other releases and I ran into the issue so I think this is very much a me being stupid.

type Dq struct {
    low  *dque.DQue
    med  *dque.DQue
    high *dque.DQue
    now  *dque.DQue
}

type Holder struct {
    Item interface{}
    Time int64
    Id   string
}

func HolderBuilder() interface{} {
    return &Holder{}
}

func (d Dq) Push(id string, item interface{}, priority Priority) error {
    holder := Holder{
        Item: item,
        Time: time.Now().Unix(),
    }
    switch priority {
    case Now:
        err := d.now.Enqueue(holder)
        if err != nil {
            util.HandleError(fmt.Sprintf("Unable to enqueue item: [%v] to queue %v", item, priority), err)
            return err
        }
        break
    case High:
        err := d.high.Enqueue(holder)
        if err != nil {
            util.HandleError(fmt.Sprintf("Unable to enqueue item: [%v] to queue %v", item, priority), err)
            return err
        }
        break
    case Medium:
        err := d.med.Enqueue(holder)
        if err != nil {
            util.HandleError(fmt.Sprintf("Unable to enqueue item: [%v] to queue %v", item, priority), err)
            return err
        }
        break
    case Low:
        err := d.low.Enqueue(holder)
        if err != nil {
            util.HandleError(fmt.Sprintf("Unable to enqueue item: [%v] to queue %v", item, priority), err)
            return err
        }
        break
    default:
        return util.HandleErrorErr("No matching priority", errors.New("No matching priority for "+string(priority)))
    }
    return nil
}

func (d Dq) Pop() interface{} {
    for {
        temp, err := d.now.Dequeue()
        if err != nil && err != dque.ErrEmpty {
            util.HandleError("Error fetching now priority", err)
        }
        if temp != nil && err == nil {
            return temp
        }
        temp, err = d.high.Dequeue()
        if err != nil && err != dque.ErrEmpty {
            util.HandleError("Error fetching high priority", err)
        }
        if temp != nil && err == nil {
            return temp
        }
        temp, err = d.med.Dequeue()
        if err != nil && err != dque.ErrEmpty {
            util.HandleError("Error fetching med priority", err)
        }
        if temp != nil && err == nil {
            return temp
        }
        temp, err = d.low.Dequeue()
        if err != nil && err != dque.ErrEmpty {
            util.HandleError("Error fetching low priority", err)
        }
        if temp != nil && err == nil {
            return temp
        }
        time.Sleep(time.Millisecond * time.Duration(viper.GetInt(util.QueueDequeueEmptyWaitTime)))
    }
}

func (d Dq) Connect() interface{} {
    var err error
    d.low, err = dque.New(string(Low), viper.GetString(util.QueueDirectory), SegmentSize, HolderBuilder)
    if err != nil {
        d.now, err = dque.Open(string(Low), viper.GetString(util.QueueDirectory), SegmentSize, HolderBuilder)
        if err != nil {
            util.HandleError("Error creating low client", err)
            return nil
        }
    }
    d.med, err = dque.New(string(Medium), viper.GetString(util.QueueDirectory), SegmentSize, HolderBuilder)
    if err != nil {
        d.now, err = dque.Open(string(Medium), viper.GetString(util.QueueDirectory), SegmentSize, HolderBuilder)
        if err != nil {
            util.HandleError("Error creating med client", err)
            return nil
        }
    }
    d.high, err = dque.New(string(High), viper.GetString(util.QueueDirectory), SegmentSize, HolderBuilder)
    if err != nil {
        d.high, err = dque.Open(string(High), viper.GetString(util.QueueDirectory), SegmentSize, HolderBuilder)
        if err != nil {
            util.HandleError("Error creating high client", err)
            return nil
        }
    }
    d.now, err = dque.New(string(Now), viper.GetString(util.QueueDirectory), SegmentSize, HolderBuilder)
    if err != nil {
        d.now, err = dque.Open(string(Now), viper.GetString(util.QueueDirectory), SegmentSize, HolderBuilder)
        if err != nil {
            util.HandleError("Error creating now client", err)
            return nil
        }
    }
    return d
}

func (d Dq) ListenOnQueue(handler func(item interface{}), stop *bool) {
    go func() {
        for {
            if !*stop {
                item := d.Pop()
                handler(item)
            }
        }
    }()
}

And then it is being initialized with


        dq := queue.Dq{}
    qh := QueueHandler{
        Queue: dq,
    }
    dq.Connect()
    var stop bool
    dq.ListenOnQueue(func(item interface{}) {
        fmt.Println(item)
    }, &stop)
Kriechi commented 4 years ago

Quick one without thinking too much:

you might want to try using "methods on pointers":

func (d Dq) Connect() interface{} {

vs.

func (d *Dq) Connect() interface{} {

Also, dque has a nice NewOrOpen which might save you a bit of error handling code.

dllz commented 4 years ago

Issue persists with pointers sadly, Thanks for the NewOrOpen, that will definitely come in handy

joncrlsn commented 4 years ago

Hi dllz, Nothing jumps out at me right away either. Could you maybe change your comment above to be the contents of a stand-alone go file that can be run to show the problem? Then I can copy it into test.go and run go run test.go. For example,

Whenever I have had to show a problem with the simplest possible problem demonstration program on any code I've written, I find it really helps my understanding of things, as well as helping others help me find the problem.

Thanks!

dllz commented 4 years ago

Hi Jon.

Sorry about that, I have cleaned up the code to hopefully solve your problem.

package queue

import (
    "errors"
    "fmt"
    "github.com/joncrlsn/dque"
    "time"
)

type Priority string

const (
    Low         Priority = "low"
    Medium      Priority = "med"
    High        Priority = "high"
    Now         Priority = "now"
    SegmentSize          = 50
)

type Dq struct {
    low  *dque.DQue
    med  *dque.DQue
    high *dque.DQue
    now  *dque.DQue
}

type Holder struct {
    Item interface{}
    Time int64
    Id   string
}

func HolderBuilder() interface{} {
    return &Holder{}
}

func (d *Dq) Push(id string, item interface{}, priority Priority) error {
    holder := Holder{
        Item: item,
        Time: time.Now().Unix(),
    }
    switch priority {
    case Now:
        err := d.now.Enqueue(holder)
        if err != nil {
            fmt.Println(fmt.Sprintf("Unable to enqueue item: [%v] to queue %v", item, priority), err)
            return err
        }
        break
    case High:
        err := d.high.Enqueue(holder)
        if err != nil {
            fmt.Println(fmt.Sprintf("Unable to enqueue item: [%v] to queue %v", item, priority), err)
            return err
        }
        break
    case Medium:
        err := d.med.Enqueue(holder)
        if err != nil {
            fmt.Println(fmt.Sprintf("Unable to enqueue item: [%v] to queue %v", item, priority), err)
            return err
        }
        break
    case Low:
        err := d.low.Enqueue(holder)
        if err != nil {
            fmt.Println(fmt.Sprintf("Unable to enqueue item: [%v] to queue %v", item, priority), err)
            return err
        }
        break
    default:
        fmt.Println("No matching priority")
        return errors.New("No matching priority for "+string(priority))
    }
    return nil
}

func (d *Dq) Pop() interface{} {
    for {
        temp, err := d.now.Dequeue()
        if err != nil && err != dque.ErrEmpty {
            fmt.Println("Error fetching now priority", err)
        }
        if temp != nil && err == nil {
            return temp
        }
        temp, err = d.high.Dequeue()
        if err != nil && err != dque.ErrEmpty {
            fmt.Println("Error fetching high priority", err)
        }
        if temp != nil && err == nil {
            return temp
        }
        temp, err = d.med.Dequeue()
        if err != nil && err != dque.ErrEmpty {
            fmt.Println("Error fetching med priority", err)
        }
        if temp != nil && err == nil {
            return temp
        }
        temp, err = d.low.Dequeue()
        if err != nil && err != dque.ErrEmpty {
            fmt.Println("Error fetching low priority", err)
        }
        if temp != nil && err == nil {
            return temp
        }
        time.Sleep(time.Millisecond * 500)
    }
}

func (d *Dq) Connect() interface{} {
    var err error
    d.low, err = dque.New(string(Low), "queue", SegmentSize, HolderBuilder)
    if err != nil {
        d.now, err = dque.Open(string(Low), "queue", SegmentSize, HolderBuilder)
        if err != nil {
            fmt.Println("Error creating low client", err)
            return nil
        }
    }
    d.med, err = dque.New(string(Medium), "queue", SegmentSize, HolderBuilder)
    if err != nil {
        d.now, err = dque.Open(string(Medium), "queue", SegmentSize, HolderBuilder)
        if err != nil {
            fmt.Println("Error creating med client", err)
            return nil
        }
    }
    d.high, err = dque.New(string(High), "queue", SegmentSize, HolderBuilder)
    if err != nil {
        d.high, err = dque.Open(string(High), "queue", SegmentSize, HolderBuilder)
        if err != nil {
            fmt.Println("Error creating high client", err)
            return nil
        }
    }
    d.now, err = dque.New(string(Now), "queue", SegmentSize, HolderBuilder)
    if err != nil {
        d.now, err = dque.Open(string(Now), "queue", SegmentSize, HolderBuilder)
        if err != nil {
            fmt.Println("Error creating now client", err)
            return nil
        }
    }
    return d
}

func (d *Dq) ListenOnQueue(handler func(item interface{}), stop *bool) {
    go func() {
        for {
            if !*stop {
                item := d.Pop()
                handler(item)
            }
        }
    }()
}

The calling code:

package webservice

import (
    "canvas-data-management-go/pkg/export"
    "canvas-data-management-go/pkg/queue"
    "canvas-data-management-go/pkg/util"
    "encoding/json"
    "fmt"
    "io"
    "io/ioutil"
    "net/http"
)

type QueueJson struct {
    Item     string         `json:item`
    Id       string         `json:id`
    Priority queue.Priority `json:Priority`
}

type QueueHandler struct {
    Queue queue.Dq
}
func Run(ip string, port string) error {
    dq := queue.Dq{}
    qh := QueueHandler{
        Queue: dq,
    }
    dq.Connect()

    http.HandleFunc("/queue", qh.handleQueue)
    hostAddress := "localhost" + ":" + "12345"
    var stop bool
    dq.ListenOnQueue(func(item interface{}) {
        fmt.Println(item)
    }, &stop)
        err := http.ListenAndServe(hostAddress, nil)
    if err != nil{
        fmt.Println("Error hosting http listener "+hostAddress)
        return err
    }
    return nil
}
func (qh *QueueHandler) handleQueue(w http.ResponseWriter, r *http.Request) {
    fmt.Println("Received queue request")
    var item QueueJson
    err := json.Unmarshal(getBody(r.Body, w), &item)
    if err != nil {
        fmt.Println("Error un-marshalling json from http server", err)
        http.Error(w, "Bad Request", http.StatusBadRequest)
        return
    }
    err = qh.Queue.Push(item.Id, item.Item, item.Priority)
    if err != nil {
        fmt.Println("Error queuing data: "+item.Id, err)
        http.Error(w, "Error queuing data for ID: "+item.Id+" "+err.Error(), http.StatusInternalServerError)
        return
    }
    response := ResponseJson{Message: "Successfully processed"}
    encoder := json.NewEncoder(w)
    err = encoder.Encode(response)
}

Webservice caller

package cmd

import (
    ws "canvas-data-management-go/pkg/webservice"
    "fmt"
    "github.com/spf13/cobra"
)

var (
    webserviceCommand = &cobra.Command{
        Use:   "web",
        Short: "Put the cli a mode that it receives commands from the internet",
        Run: func(cmd *cobra.Command, args []string) {
            err := webservice(ip, port)
            if err != nil {
                fmt.Print("Error running command", err)
            }
        },
    }
    port string
    ip   string
)

func webservice(ip string, port string) error {
    err := ws.Run()
    if err != nil {
        return err
    }
    return nil
}

root.go

package cmd

import (
    "github.com/spf13/cobra"
)

var (
    rootCmd = &cobra.Command{
        Use:              "cdm",
        TraverseChildren: true,
    }
)

func Execute() error {
    return rootCmd.Execute()
}

func init() {

    //Web
    webserviceCommand.Flags().StringVar(&ip, "ip", "127.0.0.1", "Specify the ip address to listen on")
    webserviceCommand.Flags().StringVar(&port, "port", "2364", "Specify the ip address to listen on")
    rootCmd.AddCommand(webserviceCommand)
}

main.go


package main

import (
    "canvas-data-management-go/cmd"
    "fmt"
)

func main() {
    err := cmd.Execute()
    if err != nil {
        fmt.Println("Error starting up", err)
    }
}