joncrlsn / dque

dque is a fast, embedded, durable queue for Go
MIT License
767 stars 46 forks source link

Corrupted data when opening existing queue #33

Open baryluk opened 2 years ago

baryluk commented 2 years ago

I took an example_test.go and just run two goroutines one enqueueing consecutive integers, another doing blocking dequeue and just print them from time to time.

Segment size 50, then switches to 100000.

Interrupting the program, and the starting it again, causes it to read corrupted data:

2022/02/23 15:26:49 Error creating new dque unable to create queue segment in /tmp/item-queue: unable to load queue segment in /tmp/item-queue: segment file /tmp/item-queue/0000000000041.dque is corrupted: error reading gob data from file: EOF exit status 1

Source:

package main

import (
    "fmt"
    "log"

    "github.com/joncrlsn/dque"
)

func main() {
    ExampleDQue()
}

// Item is what we'll be storing in the queue.  It can be any struct
// as long as the fields you want stored are public.
type Item struct {
    Name string
    Id   int
}

// ItemBuilder creates a new item and returns a pointer to it.
// This is used when we load a segment of the queue from disk.
func ItemBuilder() interface{} {
    return &Item{}
}

// ExampleDQue shows how the queue works
func ExampleDQue() {
    qName := "item-queue"
    qDir := "/tmp"
    segmentSize := 100000

    q, err := dque.NewOrOpen(qName, qDir, segmentSize, ItemBuilder)
    if err != nil {
        log.Fatal("Error creating new dque ", err)
    }

    go func() {
        i := 0
        for {
            err := q.Enqueue(&Item{"Joe", i})
            if err != nil {
                log.Fatal("Error enqueueing", err)
            }

            i++
            //log.Println("Queue size:", q.Size())
        }
    }()

    func() {
        for {
            var iface interface{}

            // Dequeue the next item in the queue and block until one is available
            if iface, err = q.DequeueBlock(); err != nil {
                log.Fatal("Error dequeuing item ", err)
            }

            // Assert type of the response to an Item pointer so we can work with it
            item, ok := iface.(*Item)
            if !ok {
                log.Fatal("Dequeued object is not an Item pointer")
            }

            doSomething(item)
        }
    }()
}

func doSomething(item *Item) {
    if item.Id % 100000 == 0 {
        fmt.Println("Dequeued:", item)
    }
}