Closed NadavkOptimalQ closed 3 years ago
Hi, thank you for your report. Could you provide a simple gist so that I can reproduce the issue?
Hi,
I tried creating a simplified version of my code. I'll add some explanation to it so it can be clear. My system is a real time scoring engine. In order to be more efficient, we calculate the scores for 3 hours in the future. if I get any new data I calculate the scores 3 hours in the future again and overwrite any existing scores for that id. Since there is a size limit on maps we divide each timestamp into 10 buckets.
Please let me know if there is something else I can provide you with
func DeleteScores(name string, timestamps []int, externalIds []string){
bucketToExternalIds, _ := getBucketToExternalId(contextId, externalIds)
for _, timestamps := range timestamps{
go DeleteScoresForTimestamp(name, timestamp, bucketToExternalIds)
}
}
func DeleteScoresForTimestamp(name string, timestamp int, bucketToExternalIds map[int][]string) {
for bucket, externalIds := range bucketToExternalIds {
go deleteScoresForBucket(name, timestamp, bucket, externalIds)
}
}
func deleteScoresForBucket(name string, timestamp int, bucket int, externalIds []string, logger *log.Entry) {
deleteOperations := make([]*aero.Operation, 0)
for _, externalId := range externalIds {
deleteOperations = append(deleteOperations, aero.MapRemoveByKeyOp("externalIdScore", externalId, aero.MapReturnType.NONE))
}
key, _ := aero.NewKey("test", "scores", name)
aerospikeClient.Operate(aerospikeClient.DefaultWritePolicy, key, deleteOperations...)
}
func getBucket(contextId string, externalId string) (int, error) {
h := fnv.New32a()
h.Write([]byte(externalId))
return int(h.Sum32()) % 10, nil
}
func getBucketToExternalId(externalIds []string) (map[int][]string, error) {
bucketToExternalIds := make(map[int][]string, len(externalIds))
for _, externalId := range externalIds {
bucket, err := getBucket(contextId, externalId)
if err != nil {
return nil, err
}
externalIds, exists := bucketToExternalIds[bucket]
if !exists {
externalIds = make([]string, 0)
}
externalIds = append(externalIds, externalId)
bucketToExternalIds[bucket] = externalIds
}
return bucketToExternalIds, nil
I see. Since you are launching a new go routine for each timestamp, if you launch too many of them at the same time, you will exhaust the client's connection pool and will end up waiting for a new connection. You can use Client.WarmUp
to open all required connections right away.
You can also enlarge the size of the connection pool (ClientPolicy.ConnectionQueueSize
) to help mitigate the issue to some degree. If the issue persists, you need to rate limit your DB requests to ensure maximum performance.
You have not included all the timings from the pprof, so I don't have the context to compare the library time vs other parts of the code. Are these times cumulative?
I'm going to go ahead and close this ticket. Feel free to reopen or file a new issue.
Hi, I'm running using go version 1.15.3 and client v3.1.1+incompatible
I'm using aerospike to save a lot of data in maps and I am seeing some slowness when executing delete of keys from the maps.
I ran pprof and the parts that took the most were: 12.29s 20.27% github.com/aerospike/aerospike-client-go.(connectionHeap).Poll 21.81s 35.98% github.com/aerospike/aerospike-client-go.(baseCommand).executeAt 13.86s 22.86% github.com/aerospike/aerospike-client-go.(*singleCommand).getConnection
Any idea on something I need to do differently? More information I can provide?
Thank you