aerospike / aerospike-client-go

Aerospike Client Go
Apache License 2.0
429 stars 199 forks source link

different iteration over scan results give different result #346

Closed Boklazhenko closed 3 years ago

Boklazhenko commented 3 years ago
package main

import (
    as "github.com/aerospike/aerospike-client-go"
    "log"
)

func f(handler func(rs *as.Recordset) int) int {
    asClient, err := as.NewClientWithPolicy(nil, "localhost", 3000)
    if err != nil {
        panic(err)
    }

    defer asClient.Close()

    rs, err := asClient.ScanAll(nil, "test", "pairs", "resp")
    if err != nil {
        panic(err)
    }

    //log.Printf("recs cap %v, len %v", cap(rs.Results()), len(rs.Results()))

    return handler(rs)
}

func main() {
    log.Println(f(func(rs *as.Recordset) int {
        counter := 0
        for range rs.Results() {
            counter++
        }
        return counter
    }))

    log.Println(f(func(rs *as.Recordset) int {
        counter := 0
        for {
            _, ok := <-rs.Results()
            if !ok {
                break
            }
            counter++
        }
        return counter
    }))
}

Hi, it is all code

simple test. aerospike containts 1 set with 10 records

first call return 10 ( always) second call return 3-4(stable). not 7, not 8. always 3-4

but that's not all if uncomment row with logging in 'f' function first call 3-4 second 2-3

i am confused... seems it is matter only go language, but i see first time this with aerospike

Boklazhenko commented 3 years ago
func (rcs *Recordset) Results() <-chan *Result {
    recCap := cap(rcs.Records)
    if recCap < 1 {
        recCap = 1
    }
    res := make(chan *Result, recCap)

    select {
    case <-rcs.cancelled:
        // Bail early and give the caller a channel for nothing -- it's
        // functionally wasted memory, but the caller did something
        // after close, so it's their own doing.
        close(res)
        return res
    default:
    }

    go func(cancelled <-chan struct{}) {
        defer close(res)
        for {
            record, err := rcs.Read()
            if err == types.ErrRecordsetClosed {
                return
            }

            result := &Result{Record: record, Err: err}
            select {
            case <-cancelled:
                return
            case res <- result:

            }
        }
    }(rcs.cancelled)

    return res
}

with logging - understanding Results() create each time new results channel maybe it's wrong , but understandable

But with different cycles steel need help

Boklazhenko commented 3 years ago

damn! this is the same thing!. i have not questions. thank you

khaf commented 3 years ago

Did your issue resolve? The total number of the record received should be correct. Keep in mind that opening a client every time is very expensive and should be avoided. It pools the connections internally.

Boklazhenko commented 3 years ago

yes. few times Results() call was issue wrong behavior

khaf commented 3 years ago

I'm going to go ahead and close this issue. Feel free to reopen or file new tickets if you encounter any problems or have new questions.