googleapis / google-cloud-go

Google Cloud Client Libraries for Go.
https://cloud.google.com/go/docs/reference
Apache License 2.0
3.66k stars 1.24k forks source link

datastore: Parallel Transaction on same kind with unique keys giving concurrent transaction error #10068

Open mehulparmariitr opened 2 months ago

mehulparmariitr commented 2 months ago

Client

datastore

Environment

Go executable on Linux

Go Environment

$ go version - go version go1.21.0 darwin/amd64 $ go env -

GO111MODULE='on' GOARCH='amd64' GOBIN='' GOCACHE='/Users/mehparmar/Library/Caches/go-build' GOENV='/Users/mehparmar/Library/Application Support/go/env' GOEXE='' GOEXPERIMENT='' GOFLAGS='' GOHOSTARCH='amd64' GOHOSTOS='darwin' GOINSECURE='' GOMODCACHE='/Users/mehparmar/go/pkg/mod' GONOPROXY='' GONOSUMDB='' GOOS='darwin' GOPATH='/Users/mehparmar/go' GOPRIVATE='' GOPROXY='https://proxy.golang.org,direct' GOROOT='/usr/local/go' GOSUMDB='sum.golang.org' GOTMPDIR='' GOTOOLCHAIN='auto' GOTOOLDIR='/usr/local/go/pkg/tool/darwin_amd64' GOVCS='' GOVERSION='go1.21.0' GCCGO='gccgo' GOAMD64='v1' AR='ar' CC='clang' CXX='clang++' CGO_ENABLED='1' GOMOD='/Users/mehparmar/Repos/mehparmar/abcserv/go.mod' GOWORK='' CGO_CFLAGS='-O2 -g' CGO_CPPFLAGS='' CGO_CXXFLAGS='-O2 -g' CGO_FFLAGS='-O2 -g' CGO_LDFLAGS='-O2 -g' PKG_CONFIG='pkg-config' GOGCCFLAGS='-fPIC -arch x86_64 -m64 -pthread -fno-caret-diagnostics -Qunused-arguments -fmessage-length=0 -ffile-prefix-map=/var/folders/6n/kcp2mfmx0s9_lqw9y30yqq500000gq/T/go-build1900003096=/tmp/go-build -gno-record-gcc-switches -fno-common'

Code

e.g.


func processBatch(ctx context.Context, logger *l.Logger, desQSp *gds.QuerySpecs, tableName string, batch []dataanalytics.AggregationEntity, wg *sync.WaitGroup) {
    defer wg.Done() // Signal that this goroutine is done

        //Calling GetMulti on keys passed here, then checking if its insert or update call.
    err, _, _, _ := dataanalytics.Upsert(ctx, logger, (*gds2.QuerySpecs)(desQSp), tableName, batch)
    if err != nil {
        // Handle the error appropriately (logging, retries, etc.)
        log.Printf("Error during Upsert: %v", err)
    }
}

// code in main function
// go routines to write data in parallel for unique keys
var wg sync.WaitGroup // To wait for goroutines to finish
        //At max 1000 keys can be queried from Google Datastore, Hence creating batches of 1000
        for i := 0; i < len(msgs); i += 1000 {
            end := i + 1000
            if end > len(msgs) {
                end = len(msgs)
            }
            batch := msgs[i:end]
            // Process the batch here
            //wg.Add(1)
            go processBatch(ctx, l.New("abc-services"), (*gds.QuerySpecs)(desQSp), preAggregationRequest.DestinationKind, batch, &wg)
        }

        wg.Wait() // Wait for all goroutines to complete

Expected behavior

All entities should be written. There are around 275,000

Actual behavior

2024/04/30 05:06:42 Error during Upsert: datastore: concurrent transaction UUID-NOT-COLLECTED API-NOT-COLLECTED class.Upsert tx.Commit: datastore: concurrent transaction 2024/04/30 05:06:43 Error during Upsert: datastore: concurrent transaction UUID-NOT-COLLECTED API-NOT-COLLECTED class.Upsert tx.Commit: rpc error: code = InvalidArgument desc = Invalid transaction. 2024/04/30 05:06:43 Error during Upsert: rpc error: code = InvalidArgument desc = Invalid transaction. UUID-NOT-COLLECTED API-NOT-COLLECTED class.Upsert tx.Commit: rpc error: code = InvalidArgument desc = Invalid transaction. 2024/04/30 05:06:43 Error during Upsert: rpc error: code = InvalidArgument desc = Invalid transaction.

Additional context

if keys are unique then why transaction is failing?

bhshkh commented 2 months ago

On which version of Datastore client library are you facing this issue?

mehulparmariitr commented 2 months ago

@bhshkh I see its 1.0.0.

image

But we have replaced also this lib in go.mod

replace cloud.google.com/go/datastore => github.com/googleapis/google-cloud-go/datastore v1.0.0
mehulparmariitr commented 2 months ago

Even with the latest version the error is there. Seems like a logical error. But if keys are unique then it shouldnt throw concurrent transaction error right?

go 1.12

require (
    cloud.google.com/go/bigquery v1.60.0
    cloud.google.com/go/datastore v1.16.0
    cloud.google.com/go/logging v1.9.0
    cloud.google.com/go/profiler v0.1.0
    cloud.google.com/go/pubsub v1.37.0
    cloud.google.com/go/storage v1.39.1
    github.com/dimfeld/httptreemux v5.0.1+incompatible
    github.com/dimfeld/httptreemux/v5 v5.0.2
    github.com/evanphx/json-patch v4.5.0+incompatible
    github.com/getlantern/deepcopy v0.0.0-20160317154340-7f45deb8130a
    github.com/go-redis/redis v6.15.7+incompatible
    github.com/gomodule/redigo v2.0.0+incompatible
    github.com/kelseyhightower/envconfig v1.4.0
    github.com/mitchellh/mapstructure v1.1.2
    github.com/oleiade/reflections v1.0.1
    github.com/openzipkin/zipkin-go v0.2.5
    github.com/pborman/uuid v1.2.1
    github.com/pkg/errors v0.9.1
    github.com/schwarmco/go-cartesian-product v0.0.0-20180515110546-d5ee747a6dc9
    github.com/signalfx/golib/v3 v3.3.19
    github.com/spf13/cobra v1.5.0
    github.com/stretchr/testify v1.9.0
    github.com/thedevsaddam/gojsonq v2.3.0+incompatible
    go.opencensus.io v0.24.0
    google.golang.org/api v0.176.1
    google.golang.org/genproto v0.0.0-20240401170217-c3f982113cda
    gopkg.in/go-playground/assert.v1 v1.2.1 // indirect
    gopkg.in/go-playground/validator.v8 v8.18.2
    gopkg.in/yaml.v2 v2.4.0
)

replace cloud.google.com/go/datastore => github.com/googleapis/google-cloud-go/datastore v1.16.0

The error -

 UUID-NOT-COLLECTED API-NOT-COLLECTED abc.Upsert tx.Commit: datastore: concurrent transaction
2024/05/01 02:46:59 Error during Upsert: datastore: concurrent transaction
 UUID-NOT-COLLECTED API-NOT-COLLECTED abc.Upsert tx.Commit: datastore: concurrent transaction
2024/05/01 02:46:59 Error during Upsert: datastore: concurrent transaction
 UUID-NOT-COLLECTED API-NOT-COLLECTED abc.Upsert tx.Commit: rpc error: code = InvalidArgument desc = The referenced transaction has expired or is no longer valid.
2024/05/01 02:47:00 Error during Upsert: rpc error: code = InvalidArgument desc = The referenced transaction has expired or is no longer valid.
mehulparmariitr commented 2 months ago

This is the method I am using to upsert into DB -

// Upsert - insert in gds aggregation and update using transaction if entries previously exist with same MdInstanceHashKey it gets updated
func Upsert(ctx context.Context, logger *slog.Logger, qSp *gds.QuerySpecs, tableName string, msgs []AggregationEntity) (error, []*datastore.Key, []*datastore.Key, []*AggregationEntity) {
    ctx, span := trace.StartSpan(ctx, "internal.dataanalytics.Update")
    defer span.End()
    logger.Infof(ctx, "Started putting entries in GDS(aggregation) in %v table", tableName)

    client, err := gds.GetDatastoreClient(ctx, logger, qSp)
    if err != nil {
        logger.Errorf(ctx, "Failed to create GDS client: %v", err)
        return err, nil, nil, nil
    }

    // make keys array use MdInstanceHashKey column
    var keys []*datastore.Key
    for id := 0; id < len(msgs); id++ {
        newKey := datastore.NameKey(tableName, msgs[id].MdInstanceHashKey, nil)
        newKey.Namespace = qSp.Namespace
        keys = append(keys, newKey)
    }
    logger.Infof(ctx, "Length of keys to insert in data store are : %v", len(keys))
    tx, err := client.NewTransaction(ctx)
    if err != nil {
        logger.Errorf(ctx, "client.NewTransaction: %v", err)
        return err, nil, nil, nil
    }
    gdsEntry := make([]AggregationEntity, len(msgs))
    if err := tx.GetMulti(keys, gdsEntry); err == datastore.ErrNoSuchEntity {
        logger.Errorf(ctx, "error in update in GDS %v", err)
        return err, nil, nil, nil
    }

    var createKeys []*datastore.Key
    var updateKeys []*datastore.Key
    var createValues []*AggregationEntity
    // checking if entry with same key exists or not if it exists then update it
    for i := 0; i < len(msgs); i++ {

        if gdsEntry[i].GitTestRepoURL == "" {
            //create
            gdsEntry[i] = msgs[i]
            createKeys = append(createKeys, keys[i])
            createValues = append(createValues, &msgs[i])
        } else {
            //update
            if gdsEntry[i].Type == Pass {
                gdsEntry[i] = AggregateUpdateSum(logger, gdsEntry[i], msgs[i], Passed)
                updateKeys = append(updateKeys, keys[i])
            }
            if gdsEntry[i].Type == Fail {
                gdsEntry[i] = AggregateUpdateSum(logger, gdsEntry[i], msgs[i], Failed)
                updateKeys = append(updateKeys, keys[i])
            }
            if gdsEntry[i].Type == Skip {
                gdsEntry[i] = AggregateUpdateSum(logger, gdsEntry[i], msgs[i], Skipped)
                updateKeys = append(updateKeys, keys[i])
            }
            if gdsEntry[i].Type == MostCommonFailure {
                gdsEntry[i] = AggregateUpdateSum(logger, gdsEntry[i], msgs[i], Failed)
                updateKeys = append(updateKeys, keys[i])
            }
            if gdsEntry[i].Type == MostFailed {
                gdsEntry[i] = AggregateUpdateSum(logger, gdsEntry[i], msgs[i], Failed)
                updateKeys = append(updateKeys, keys[i])
            }
            if gdsEntry[i].Type == MostSkipped {
                gdsEntry[i] = AggregateUpdateSum(logger, gdsEntry[i], msgs[i], Skipped)
            }
            if gdsEntry[i].Type == MostDuration {
                gdsEntry[i] = AggregateUpdateMax(logger, gdsEntry[i], msgs[i], Duration)
                updateKeys = append(updateKeys, keys[i])
            }
        }
    }

    // batch operation to put in GDS using transaction
    if _, e := tx.PutMulti(keys, gdsEntry); e != nil {
        tx.Rollback()
        logger.Errorf(ctx, "tx.PutMulti: %v", e)
        //logger.Errorf(ctx, "gdsEntry1 are: %v", gdsEntry)
        //logger.Errorf(ctx, "keys1 are: %v", gdsEntry)
        return e, nil, nil, nil
    }
    if _, er := tx.Commit(); er != nil {
        tx.Rollback()
        logger.Errorf(ctx, "tx.Commit: %v", er) // error: concurrent transaction
        //logger.Errorf(ctx, "gdsEntry2 are: %v", gdsEntry)
        //logger.Errorf(ctx, "keys2 are: %v", gdsEntry)
        return er, nil, nil, nil
    }

    logger.Infof(ctx, "Entries stored successfully")
    return nil, createKeys, updateKeys, createValues
}