aerospike / aerospike-client-go

Aerospike Client Go
Apache License 2.0
432 stars 198 forks source link

Why is ClientPolicy.Timeout used to create new connections during reads instead of basePolicy.connectTimeout #408

Open juanma9613 opened 1 year ago

juanma9613 commented 1 year ago

Hi, I would like to know why the clientPolicy.Timeout is applied when creating new connections for the pool during the read operation when the pool is empty or exhausted. This is the line where it is happening.

It's happening exactly when this line is executed and there are no available connections. record, err := c.Aerospike.Get(readPolicy, key)

I ask because the java client in its read policy has a very specific connectTimeout that is used to create new connections during read operations.

Is there a way to achieve this same behavior without modifying the clientPolicy.Timeout?

Thank you very much for your response.

khaf commented 1 year ago

Hi, the link you posted goes to the Go Client v4, while the latest version is v6, residing in the v6 branch. Just a heads up.

Go client creates connections asynchronously, meaning that the connections are not created inline in the transaction. That's why it does not take the connectTimeout option in the base policy.

What are you trying to achieve that this is a problem?

juanma9613 commented 1 year ago

Hi, I'm indeed using go client v6 and reading code and docs from v6, maybe I sent the wrong one by error, could yo please tell me which link points to v4?, I'll fix it.

My problem is that during some spikes of traffic in my app, I'm experiencing that the number of opened/closed connections per second increases too much in the aerospike cluster (check image 1) and this causes that the p99 latency of the aerospike reads goes up way beyond 5ms inside my app (check image 3) and never goes does again (kind of a snowball effect, check image 2, I even had to restart my app at 2am).

I suspect that what's causing the issue is the sudden spike in writes at 23:30pm together with my Basepolicy.Totaltimeout of only 3ms. During this spike, maybe the aerospike server becomes slightly slower, causing too many connections to close, as described here. The problem is that I need my app to respond below 3ms. Something I don't understand is why the number of opening and closing connections in image1 doesn't stabilizes when the writes from the other app stop and why there's a snowball effect.

Additional context:

image 1: increasing number of connections during latency spike image

image2: fetch latency inside my app image

image3: reads from aerospike cluster from my app + other apps image

image4: writes to aerospike cluster from other apps image

These are the parameters I have used for my initial tests:

ClientPolicy.ConnectionQueueSize = 1024
ClientPolicy.LoginTimeout = time.Duration(6) * time.Second
ClientPolicy.OpeningConnectionThreshold = 256

policy = MyClient.DefaultPolicy // base read policy
policy.TotalTimeout = 3 * time.Millisecond // I blame this very short timeout for the opening connections problem
policy.SocketTimeout = time.Duration(100) * time.Millisecond
policy.ReplicaPolicy = aerospike.MASTER_PROLES // read from any replica to mitigate hotkey a little bit
juanma9613 commented 1 year ago

@khaf, we observed the server side and indeed found some timeouts when the issue started. So, according to this it can be hotkey + too aggressive timeout in our client side. I'm gonna increase the timeout in my side.

But it doesn't explain why in the image 2 above the latency went up forever. (I saw the CPU usage during that time got to 100%, so I guess it was creating a lot of goroutines trying to create TCP connections but It couldn't do it on time.

I also need my client.Get( function to return within 3ms, without leaking goroutines and without closing the connections in case there's no response, do you think that's possible? do you have any insight you can share?

image

khaf commented 1 year ago

Hi @juanma9613 , sorry for my late reply. It is this link that goes to master, which is the v4.x branch.

I suspect your issue is indeed the very short timeout, especially if your app is in a hosted datacenter (cloud environment). Over the years, we have observed a lot of inconsistency in both CPU and io scheduling on cloud instances. On top of that, other issues like hot keys can affect latency, as you suspect in your case.

Regarding your settings, ClientPolicy.OpeningConnectionThreshold = 256 by itself can cause issues, since the client will attempt to reopen up to 256 connections at the same time which could in theory put too much stress on the server node depending on the size and resources of the instance.

It seems to me that during the time of prolonged latency (image 2), you also had a corresponding connection churn period (image 1). I suspect that could explain the considerable latency increase at that same time frame. Connection churn is the root of all evil when it comes to the Aerospike clients, and a point against setting the timeout values too aggressively.

juanma9613 commented 1 year ago

@khaf , I'm gonna do the following then to make sure my timeout of 3ms is respected in my app

enclosing_fn_with_3ms_timeout():
    go call aerospike asyncronously (basepolicy.totaltimeout 100ms -> basepolicy.sockettimeout 100ms)
    wait for results up to 3ms
    return results or timeoutErr if aerospike didn't respond in 3ms

This may yield to some asyncronous fn / go routine leakage for certain time, but the maximum number of opened goroutines isn't too high.

Regarding the ClientPolicy.OpeningConnectionThreshold = 256, what do you recommend as a setting for this value? I see that the default value is 0, and I thought it was worse because It could try to create a lot of connections if the nodes don't acknowledge the opening connections quickly. That's why I set it as 256, to try to limit a little bit this edge case.

khaf commented 1 year ago

I have seen some users do this via wrapping the client inside API that takes context.Context. It's not a bad compromise as far as I can tell, though I believe every use case has to be evaluated in its own environment. I have so far resisted that solution myself since it imposes an extra goroutine on the user per API call which is a price some of our performance conscious customers would not like to pay.

Another thing you could do would be to setup the circuit breaker (ClientPolicy.MaxErrorRate and ClientPolicy.ErrorRateWindow) to divert your reads from the node with hotkeys to other nodes. This could potentially work if those nodes are not affected by the other program.

Regarding the value of ClientPolicy.OpeningConnectionThreshold, I cannot tell you an exact value. I'd start by limiting it to 32 or 64, and see how that affects my program. Increase only if necessary. The reality is, that the values in ClientPolicy have significant effect on the behavior of your overall application during the time of crisis, and you need to pay extra attention to them, and possibly research your environment (via instrumenting your live app) to find the optimal values for them.

We are open and very accommodating to user feedback and requests in improving the client behavior. Let us know about your discoveries and observations and we'll try our best to architect reasonable and general solutions into the client.

juanma9613 commented 1 year ago

@khaf I was able to solve the issue by first limiting the openingConnections to 64, by also increasing the timeouts and finally and much more important:

I had to stop using Error.Matches, Error.Is and Error.as in the case this library returns an error from Client.Get. I just took the string of your error and created a new error myself.

Do you think it's possible that there's some circular reference in your errors or a very long error stack?

khaf commented 1 year ago

@juanma9613 Thanks for the follow up. Do you mean that Error.Is and Error.As are taking so long that they affect the performance of your app?!

juanma9613 commented 1 year ago

@khaf , either what you said or there's an infinite error checking there. Unfortunately I found the fix of just not using the errors from the library for these error comparisons. It immediately solved the problem for me and I didn't change anything else in my app.

juanma9613 commented 1 year ago

@khaf For your debugging I can share a minimal working example of code that I used to reproduce the issue. Some of the TIC prints below don't have any corresponding print for TOC, which indicates a goroutine leakage when using your error. Specifically check the definition of the 'assing' variable under the second case statement of the getValuesfunction. The line Error(unwrappedErr) runs forever: :

package main

import (
    "errors"
    "fmt"
    "runtime"
    "strings"
    "sync"
    "time"

    "github.com/aerospike/aerospike-client-go"
    "github.com/aerospike/aerospike-client-go/v6/types"
)

type QueryInfo struct {
    Namespace string
    Set       string
    Key       string
}

type ResultType struct {
    id    int
    value map[string]interface{}
    err   error
}

func main() {
    concurrency := 40
    var wg sync.WaitGroup
    nTest := 100000000
    timeout := time.Duration(100) * time.Microsecond // time.Millisecond // timeout of my getvalues with timeout

    clientPolicy := aerospike.NewClientPolicy()
    clientPolicy.ConnectionQueueSize = 1024
    clientPolicy.LoginTimeout = time.Duration(1) * time.Second
    clientPolicy.OpeningConnectionThreshold = 128
    hosts := make([]*aerospike.Host, 0)
    // TODO: add host here
    hosts = append(hosts, aerospike.NewHost("localhost", 30000)) // 
    username := "asdasdsaddadasd"
    password := "adasdasdasdasdas"
    clientPolicy.User = username
    clientPolicy.Password = password

    aerospikeClient, err := aerospike.NewClientWithPolicyAndHost(clientPolicy, hosts...)

    if err != nil {
        fmt.Println("found error in creation", err)
    }

    go func() {
        for {
            fmt.Println("GOROUTINES: ", runtime.NumGoroutine())
            time.Sleep(2 * time.Second)
        }
    }()

    for i := 0; i < concurrency; i++ {
        wg.Add(concurrency)
        go runTest(aerospikeClient, i, nTest, timeout, &wg)
    }
    wg.Wait()
}

func runTest(client *aerospike.Client, idx int, nTest int, timeout time.Duration, wg *sync.WaitGroup) {
    queries := []QueryInfo{{Namespace: "test", Set: "asd", Key: "asdasdas"}} 
    for i := 0; i < nTest; i++ {
        // run fn for 2ms
        GetValuesWithTimeout(client, timeout, queries)
        // do other stuff
        time.Sleep(time.Duration(100) * time.Microsecond)
    }
    wg.Done()

}

// this is the fn with strong timeout of 3ms

func GetValuesWithTimeout(client *aerospike.Client, timeout time.Duration, queries []QueryInfo) ([]map[string]interface{}, error) {
    size := len(queries)
    result := make([]map[string]interface{}, size)
    resChan := make(chan *ResultType, size)
    for id, query := range queries {
        go func(id int, query QueryInfo) {
            lres := &ResultType{
                id:    id,
                value: nil,
            }
            key, _ := aerospike.NewKey(query.Namespace, query.Set, query.Key)

            //
            // IMPORTANT: READ POLICY PARAMETERS
            readPolicy := client.DefaultPolicy
            readPolicy.TotalTimeout = time.Duration(50) * time.Microsecond
            readPolicy.ReplicaPolicy = aerospike.MASTER_PROLES

            record, err := client.Get(readPolicy, key)
            if err != nil {
                fmt.Println("err getting key", err)
                switch {
                case err.Matches(types.KEY_NOT_FOUND_ERROR):
                    // Aerospike.KEY_NOT_FOUND_ERROR is treated as a nil value, no error passed.
                    lres.value = nil
                case err.Matches(types.TIMEOUT) || err.Matches(types.QUERY_TIMEOUT):
                    lres.value = nil
                    lres.err = errors.New("timeout aerospike " + err.Error())
                    unwrappedErr := err.Unwrap()
                    if unwrappedErr != nil {
                        fmt.Println("Unwrapped err aerospike time: ", unwrappedErr.Error()) // if this is the error, then try to do things on it
                        lres.err = unwrappedErr
                        fmt.Println("TIC")
                        assing := Error(unwrappedErr)
                        fmt.Println("TOC")
                        runtime.KeepAlive(assing)

                    } else {
                        fmt.Println("Error in aerospike: ", err.Error())
                        lres.err = errors.New("Error in aerospike" + err.Error())
                    }

                default:
                    lres.value = nil
                    unwrappedErr := err.Unwrap()
                    if unwrappedErr != nil {
                        fmt.Println("Unwrapped err aerospike: ", unwrappedErr.Error()) // if this is the error, then try to do things on it
                        lres.err = unwrappedErr
                        fmt.Println("TiC")
                        assing := Error(unwrappedErr)
                        fmt.Println("TOC")
                        runtime.KeepAlive(assing)

                    } else {
                        fmt.Println("Error in aerospike: ", err.Error())
                        lres.err = errors.New("Error in aerospike" + err.Error())
                    }
                }
                return
            }
            lres.value = map[string]interface{}(record.Bins)
        }(id, query)
    }

    count := 0
    t := time.NewTimer(timeout)
    for {
        select {
        case r := <-resChan:
            if r.err != nil {
                if strings.Contains(r.err.Error(), "timeout") {
                    return nil, errors.New("aerospike get timeout")
                }
                return nil, r.err
            }

            result[r.id] = r.value
            if count++; count >= size {
                return result, nil
            }

        case <-t.C:
            return nil, errors.New("this function timedout")
        }
    }
}

Also the additional serving error code I used in my app is below:

package main

import (
    "fmt"

    "github.com/go-errors/errors"
)

const (
    Unknown = -1
)

type ServingError struct {
    err   *errors.Error
    cause Cause
    code  int
    fatal bool
}

type Cause interface {
    Error() string
    Trace() string
    GetCode() int
    IsFatal() bool
}

type wrapError interface {
    Error() string
    Unwrap() error
}

func Error(v interface{}) *ServingError {
    return wrap(v, 1)
}

func Errorf(format string, a ...interface{}) *ServingError {
    return wrap(fmt.Errorf(format, a...), 1)
}

func wrap(v interface{}, skip int) *ServingError {
    if v == nil {
        return nil
    }

    res := &ServingError{
        cause: nil,
        err:   nil,
        code:  Unknown,
        fatal: false,
    }

    switch e := v.(type) {
    case *ServingError:
        return e

    case *errors.Error:
        res.err = errors.Wrap(e, skip+1)
        res.cause = Cause(res)

    case wrapError:
        var latest *ServingError
        res.err = errors.Wrap(e, skip+1)
        res.cause, latest = findCause(e, skip+1)
        if latest != nil {
            res.code = latest.code
            res.fatal = latest.fatal
        }

    default:
        res.err = errors.Wrap(e, skip+1)
        res.cause = Cause(res)
    }

    return res
}

func findCause(err error, skip int) (cause Cause, latest *ServingError) {
    cur := err

    for {
        switch err := cur.(type) {
        case *ServingError:
            return err.cause, err

        case wrapError:
            cur = err.Unwrap()
            if cur == nil {
                return Cause(wrap(err, skip+1)), nil
            }

        default:
            return Cause(wrap(cur, skip+1)), nil
        }
    }
}

func (e *ServingError) Cause() Cause {
    return e.cause
}

func (e *ServingError) Error() string {
    return e.err.Error()
}

func (e *ServingError) Unwrap() error {
    return e.err.Unwrap()
}

func (e *ServingError) Code(code int) *ServingError {
    if e.code < code {
        e.code = code
    }

    return e
}

func (e *ServingError) Fatal(enable bool) *ServingError {
    e.fatal = enable
    return e
}

func (e *ServingError) Trace() string {
    return string(e.err.Stack())
}

func (e *ServingError) GetCode() int {
    return e.code
}

func (e *ServingError) IsFatal() bool {
    return e.fatal
}
juanma9613 commented 1 year ago

https://github.com/golang/go/issues/34957

Hi @khaf, I think that something like this can be created by the type of errors that client.Get returns, which have the unwrap method as well, do you think there's a possibility of that happening?