nsqio / go-diskqueue

A Go package providing a filesystem-backed FIFO queue
MIT License
471 stars 101 forks source link

proper handling of maxBytesPerFile for reads #23

Closed mreiferson closed 3 years ago

mreiferson commented 3 years ago

fixes up and replaces #15

ploxiln commented 3 years ago

I attempted to make a test that fails without this branch ... I can get the current master branch to log some warnings and errors, but it still works out, so I had the test check for *.bad files at the end:

    diskqueue_test.go:76: ERROR: DISKQUEUE(test_disk_queue_resize1609301793) reading at 112 of /tmp/nsq-test-1609301793668354309126785195/test_disk_queue_resize1609301793.diskqueue.000000.dat - EOF
    diskqueue_test.go:76: WARNING: DISKQUEUE(test_disk_queue_resize1609301793) jump to next file and saving bad file as /tmp/nsq-test-1609301793668354309126785195/test_disk_queue_resize1609301793.diskqueue.000000.dat.bad
    diskqueue_test.go:76: INFO: DISKQUEUE(test_disk_queue_resize1609301793): readOne() opened /tmp/nsq-test-1609301793668354309126785195/test_disk_queue_resize1609301793.diskqueue.000001.dat
    diskqueue_test.go:76: ERROR: DISKQUEUE(test_disk_queue_resize1609301793) reading at 140 of /tmp/nsq-test-1609301793668354309126785195/test_disk_queue_resize1609301793.diskqueue.000001.dat - EOF
    diskqueue_test.go:76: WARNING: DISKQUEUE(test_disk_queue_resize1609301793) jump to next file and saving bad file as /tmp/nsq-test-1609301793668354309126785195/test_disk_queue_resize1609301793.diskqueue.000001.dat.bad
    diskqueue_test.go:76: INFO: DISKQUEUE(test_disk_queue_resize1609301793): closing
    diskqueue_test.go:76: INFO: DISKQUEUE(test_disk_queue_resize1609301793): closing ... ioLoop
    diskqueue_test.go:23: diskqueue_test.go:184:

               []string{} (expected)

            != []string{"/tmp/nsq-test-1609301793668354309126785195/test_disk_queue_resize1609301793.diskqueue.000000.dat.bad", "/tmp/nsq-test-1609301793668354309126785195/test_disk_queue_resize1609301793.diskqueue.000001.dat.bad"} (actual)

and here's the test:

func TestDiskQueueResize(t *testing.T) {
    l := NewTestLogger(t)
    dqName := "test_disk_queue_resize" + strconv.Itoa(int(time.Now().Unix()))
    tmpDir, err := ioutil.TempDir("", fmt.Sprintf("nsq-test-%d", time.Now().UnixNano()))
    if err != nil {
        panic(err)
    }
    defer os.RemoveAll(tmpDir)
    msg := []byte{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}
    ml := int64(len(msg))
    dq := New(dqName, tmpDir, 8*(ml+4), int32(ml), 1<<10, 2500, time.Second, l)
    NotNil(t, dq)
    Equal(t, int64(0), dq.Depth())

    for i := 0; i < 8; i++ {
        msg[0] = byte(i)
        err := dq.Put(msg)
        Nil(t, err)
    }
    Equal(t, int64(1), dq.(*diskQueue).writeFileNum)
    Equal(t, int64(0), dq.(*diskQueue).writePos)
    Equal(t, int64(8), dq.Depth())

    dq.Close()
    dq = New(dqName, tmpDir, 10*(ml+4), int32(ml), 1<<10, 2500, time.Second, l)

    for i := 0; i < 10; i++ {
        msg[0] = byte(20 + i)
        err := dq.Put(msg)
        Nil(t, err)
    }
    Equal(t, int64(2), dq.(*diskQueue).writeFileNum)
    Equal(t, int64(0), dq.(*diskQueue).writePos)
    Equal(t, int64(18), dq.Depth())

    for i := 0; i < 8; i++ {
        msg[0] = byte(i)
        Equal(t, msg, <-dq.ReadChan())
    }
    for i := 0; i < 10; i++ {
        msg[0] = byte(20 + i)
        Equal(t, msg, <-dq.ReadChan())
    }
    Equal(t, int64(0), dq.Depth())
    dq.Close()

    // make sure there aren't "bad" files due to read logic errors
    files, err := filepath.Glob(filepath.Join(tmpDir, dqName + "*.bad"))
    Nil(t, err)
    // empty files slice is actually nil, length check is less confusing
    if len(files) > 0 {
        Equal(t, []string{}, files)
    }
}
mreiferson commented 3 years ago

thanks, added that test here