aerospike / aerospike-client-go

Aerospike Client Go
Apache License 2.0
430 stars 198 forks source link

ScanAllObjects error processing #319

Closed un000 closed 3 years ago

un000 commented 3 years ago

Hi!

  1. Whats the idiomatic way to process errors using ScanAllObjects method? We have an objChan and the returned Recordset, which contains the rs.Results(). So how to combine receiving results from the objChan and deprecated .Errors from the Recordset?

  2. When a record fails to unmarshal the library will throw a panic like:

    panic: interface conversion: interface {} is int, not string

    Maybe it shouldn't, because we have ways to handle errors via returned err from the ScanAllObjects and from rs.Results() or rs.Errors. Any corrupted data inside KV can break all an application functions by a thrown panic

khaf commented 3 years ago
  1. Here's how you use the results:

        for res := range recordset.Results() {
            id res.Err != nil {
                             // handle error here
                        }
            rec := res.Record
                        // your logic goes here
        }
  2. Could you paste the whole panic stack trace so I can find where the error is coming from? Have you verified that this is due to a corrupted record in the database?

un000 commented 3 years ago

@khaf

  1. ScanAllObjects has the objChan which receives records. Should I iterate over recordset too?

    func (c *Client) ScanAllObjects(ctx context.Context, namespace, set string, objectChan interface{}) (<-chan error, error) {
    rs, err := c.client.ScanAllObjects(nil, objectChan, namespace, set)
    if err != nil {
        return nil, fmt.Errorf("error executing ScanAllObjects: %w", err)
    }
    
    errs := make(chan error)
    go func() {
        for {
            select {
            case <-ctx.Done():
                _ = rs.Close()
                return
            case res, ok := <-rs.Results():
                if !ok {
                    return
                }
                if res.Err != nil {
                    errs <- err
                }
            }
        }
    }()
    
    return errs, nil
    }
    
    // ...
    for rec := range objectChan {
        // process
    }
  2. I have changed a type of a bin in my objectChan's struct.
    LastMessageAt int64             `as:"lm"`
    -> 
    LastMessageAt string             `as:"lm"`
panic: interface conversion: interface {} is int, not string

goroutine 49 [running]:
github.com/aerospike/aerospike-client-go.setValue(0x181f1c0, 0xc0003241a0, 0x198, 0x181e6c0, 0x1ef63c0, 0x1, 0x181f1c0, 0xc0003241a0)
        /Users/un0/go/pkg/mod/github.com/aerospike/aerospike-client-go@v3.1.0+incompatible/read_command_reflect.go:182 +0x4719
github.com/aerospike/aerospike-client-go.setObjectField(0xc0003642d0, 0x1913920, 0xc000324150, 0x199, 0xc0004a5758, 0x2, 0x181e6c0, 0x1ef63c0, 0x1, 0x0, ...)
        /Users/un0/go/pkg/mod/github.com/aerospike/aerospike-client-go@v3.1.0+incompatible/read_command_reflect.go:139 +0xff
github.com/aerospike/aerospike-client-go.batchParseObject(0xc000306000, 0x17f7f20, 0xc000324150, 0x16, 0xa, 0x3, 0x27829c00000001, 0xc00053e100, 0xc0004de000)
        /Users/un0/go/pkg/mod/github.com/aerospike/aerospike-client-go@v3.1.0+incompatible/batch_command_reflect.go:82 +0x2d4
github.com/aerospike/aerospike-client-go.(*baseMultiCommand).parseRecordResults(0xc000306000, 0x1a758c0, 0xc000306000, 0x1f8, 0x203000000000100, 0x0, 0x0)
        /Users/un0/go/pkg/mod/github.com/aerospike/aerospike-client-go@v3.1.0+incompatible/multi_command.go:319 +0x230
github.com/aerospike/aerospike-client-go.(*baseMultiCommand).parseResult(0xc000306000, 0x1a758c0, 0xc000306000, 0xc0000b0500, 0x0, 0x0)
        /Users/un0/go/pkg/mod/github.com/aerospike/aerospike-client-go@v3.1.0+incompatible/multi_command.go:164 +0x27c
github.com/aerospike/aerospike-client-go.(*scanObjectsCommand).parseResult(0xc000306000, 0x1a758c0, 0xc000306000, 0xc0000b0500, 0x5b, 0x0)
        /Users/un0/go/pkg/mod/github.com/aerospike/aerospike-client-go@v3.1.0+incompatible/scan_objects_command.go:65 +0x4b
github.com/aerospike/aerospike-client-go.(*baseCommand).executeAt(0xc000306000, 0x1a758c0, 0xc000306000, 0xc00037c000, 0xc00037c001, 0x0, 0x0, 0x0, 0x0, 0x0, ...)
        /Users/un0/go/pkg/mod/github.com/aerospike/aerospike-client-go@v3.1.0+incompatible/command.go:1920 +0x8a8
github.com/aerospike/aerospike-client-go.(*baseCommand).execute(0xc000306000, 0x1a758c0, 0xc000306000, 0x1010001, 0x140, 0x18eb720)
        /Users/un0/go/pkg/mod/github.com/aerospike/aerospike-client-go@v3.1.0+incompatible/command.go:1795 +0xda
github.com/aerospike/aerospike-client-go.(*baseMultiCommand).execute(0xc000306000, 0x1a758c0, 0xc000306000, 0xc000306001, 0x1965dda, 0x9)
        /Users/un0/go/pkg/mod/github.com/aerospike/aerospike-client-go@v3.1.0+incompatible/multi_command.go:361 +0x149
github.com/aerosp(0xc000306000, 0x0, 0x0)
        /Users/un0/go/pkg/mod/github.com/aerospike/aerospike-client-go@v3.1.0+incompatible/scan_objects_command.go:70 +0x74
github.com/aerospike/aerospike-client-go.(*Client).scanNodeObjects(0xc000350000, 0xc00037c000, 0xc00042e000, 0xc000324070, 0x1965dda, 0x9, 0x1968756, 0xd, 0x98e40a138a056343, 0x0, ...)
        /Users/un0/go/pkg/mod/github.com/aerospike/aerospike-client-go@v3.1.0+incompatible/client_reflect.go:215 +0xdb
github.com/aerospike/aerospike-client-go.(*Client).ScanAllObjects.func1(0xc000350000, 0xc00037c000, 0xc000324070, 0x1965dda, 0x9, 0x1968756, 0xd, 0x98e40a138a056343, 0xc000114098, 0x0, ...)
        /Users/un0/go/pkg/mod/github.com/aerospike/aerospike-client-go@v3.1.0+incompatible/client_reflect.go:167 +0xeb
created by github.com/aerospike/aerospike-client-go.(*Client).ScanAllObjects
        /Users/un0/go/pkg/mod/github.com/aerospike/aerospike-client-go@v3.1.0+incompatible/client_reflect.go:165 +0x3d4

So I expect, that an error would be returned into the errors channel. Not a panic.

khaf commented 3 years ago

Sorry, I read that wrong. For ScanAllObjects:

    rs, err := c.client.ScanAllObjects(nil, objectChan, namespace, set)
    if err != nil {
        return nil, fmt.Errorf("error executing ScanAllObjects: %w", err)
    }

    for rec := range objectChan {
                 select {
                        case e := <- rs.Errors:
                             // handle error
                        default:
                 }

                // process record
    }

        // make sure you exhaust all the records in rs.Errors
        // in case the records channel closes but there are still errors left
        for err := range rs.Errors {
              // handle
        }
khaf commented 3 years ago

Regarding your type conversion issue, I'll have to take a look. It has been a gray area for me, since I'm not sure if the client should attempt an automatic type conversion, or just return an error. I welcome feedback and some discussion regarding the use cases and consequences of each choice.

un000 commented 3 years ago

Thank you! I have rewritten my wrapper to this:

func (c *Client) ScanAllObjects(ctx context.Context, namespace, set string, objectChan interface{}) (<-chan error, error) {
    rs, err := c.client.ScanAllObjects(nil, objectChan, namespace, set)
    if err != nil {
        return nil, fmt.Errorf("error executing ScanAll: %w", err)
    }

    errs := make(chan error)
    go func() {
        defer func() {
            for err := range rs.Errors {
                errs <- err
            }
            close(errs)
        }()
        for {
            select {
            case <-ctx.Done():
                _ = rs.Close()
                return
            case err, ok := <-rs.Errors:
                if !ok {
                    return
                }
                errs <- err
            }
        }
    }()

    return errs, nil
}

I was disappointed by this comment: image

By the second problem it's okay that the library does not the automatic type conversion because of an undefined behaviour and increased complexity. My point is that an application must not be shut down by the panic of type conversion errors. The panic must be handled by the rs.Errors.

Also GetObject panics too on a type error, but can return an error instead.

un000 commented 3 years ago

Also I got EOF here:

qp := aerospike.NewScanPolicy()
qp.IncludeBinData = true
qp.Priority = aerospike.HIGH
qp.RecordQueueSize = 512

rs, err := c.client.ScanAll(qp, namespace, set, bins...)
for res := range rs.Results() {
    if res.Err != nil { // << EOF at the first iteration

Why does this happen?

Like 3/10 Scans returns this error. I'm using AS 4.9.0.11, lib 3.1.0 on two different clusters.

khaf commented 3 years ago

This means at least one node experienced a connection error and the connection was dropped. Either you have a very unstable connection, or more likely you don't use your connections nearly enough and they end up staying in the queue while they are dropped on the server-side because they are idle. I think the default server idle timeout is 10 seconds.

un000 commented 3 years ago

@khaf I've got:

proto-fd-idle-ms                 :   60000                          60000

And my client policy looks like this:

    const minConnections = 10
    cp := aerospike.NewClientPolicy()
    cp.Timeout = 5 * time.Second
    cp.MinConnectionsPerNode = minConnections
    cp.IdleTimeout = 5 * time.Second
    cp.ConnectionQueueSize = 512

    _, err = c.client.WarmUp(minConnections)
    if err != nil {
        return fmt.Errorf("error warming up connections: %w", err)
    }

5s < 60s, so a connection might be alive when Scan is taking it from a pool. May be the client should reopen idle connections even if they are included inside a MinConnections range. https://github.com/aerospike/aerospike-client-go/blob/master/connection_heap.go#L229-L239

Removing MinConnectionsPerNode fixed this issue.

khaf commented 3 years ago

I suspect this happens because the scans are not retried and the connection is defective in the first place. I'll see if that's the case, and if so, will release a fix soon.

khaf commented 3 years ago

This should have been addressed via the new PartitionScans, supported by the Aerospike Servers v4.9+ and Client v4.2+. Let me know if that addresses your issue.

un000 commented 3 years ago

All is ok now. Thank you!