aerospike / aerospike-client-go

Aerospike Client Go
Apache License 2.0
430 stars 199 forks source link

Race condition in primary index query with pagination #438

Open noam-ma-ma opened 1 month ago

noam-ma-ma commented 1 month ago

While running a primary index query with pagination (page size = 1, total records = 2), I saw that there is a DATA RACE violation. Client version v7.1.0

Full example to reproduce:

package bla

import (
    "github.com/aerospike/aerospike-client-go/v7"
    "log"
    "testing"
)

func TestRace(t *testing.T) {
    // Establishes a connection to the server
    client, err := aerospike.NewClient("127.0.0.1", 3000)
    if err != nil {
        log.Fatal(err)
    }
    defer client.Close()
    policy := aerospike.NewWritePolicy(0, 0)
    // Create the record key
    key, err := aerospike.NewKey("myns", "ufodata", 5001)
    if err != nil {
        log.Fatal(err)
    }
    mybin := aerospike.NewBin("mybin", 20220531)
    err = client.PutBins(policy, key, mybin)
    if err != nil {
        log.Fatal(err)
    }
    key, err = aerospike.NewKey("myns", "ufodata", 5002)
    err = client.PutBins(policy, key, mybin)
    if err != nil {
        log.Fatal(err)
    }

    // Create query policy
    queryPolicy := aerospike.NewQueryPolicy()
    queryPolicy.FilterExpression = aerospike.ExpBinExists("mybin")
    queryPolicy.MaxRecords = 1
    // Create statement
    stmt := aerospike.NewStatement("myns", "ufodata")

    partitionFilter := aerospike.NewPartitionFilterAll()
    recordSet, err := client.QueryPartitions(queryPolicy, stmt, partitionFilter)
    if err != nil {
        log.Fatal(err)
    }

    for !partitionFilter.IsDone() {
        for result := range recordSet.Results() {
            if result.Err == nil && result.Record != nil {
                log.Printf("Record found: %v", result.Record.Bins)
            }
        }
        // Execute the query reusing the partitionFilter until all records returned
        recordSet, err = client.QueryPartitions(queryPolicy, stmt, partitionFilter)
        if err != nil {
            log.Fatal(err)
        }
    }

    recordSet.Close()
}

go test -v -race race_test.go

=== RUN   TestRace
2024/06/05 12:55:25 Record found: map[mybin:20220531]
2024/06/05 12:55:25 Record found: map[mybin:20220531]
==================
WARNING: DATA RACE
Write at 0x00c0000d8180 by goroutine 43:
  github.com/aerospike/aerospike-client-go/v7.(*partitionTracker).isComplete()
      /partition_tracker.go:355 +0x910
  github.com/aerospike/aerospike-client-go/v7.(*partitionTracker).isClusterComplete()
      /partition_tracker.go:311 +0x6c
  github.com/aerospike/aerospike-client-go/v7.(*Client).queryPartitions()
      /query_executor.go:51 +0x334
  github.com/aerospike/aerospike-client-go/v7.(*Client).QueryPartitions.gowrap1()
      /client.go:1143 +0x64

Previous read at 0x00c0000d8180 by goroutine 7:
  github.com/aerospike/aerospike-client-go/v7.(*PartitionFilter).IsDone()
      /partition_filter.go:78 +0xa80
  command-line-arguments.TestRace()
      race_test.go:46 +0xa8c
  testing.tRunner()
      1.22.0/go/src/testing/testing.go:1689 +0x180
  testing.(*T).Run.gowrap1()
      1.22.0/go/src/testing/testing.go:1742 +0x40

Goroutine 43 (running) created at:
  github.com/aerospike/aerospike-client-go/v7.(*Client).QueryPartitions()
      /client.go:1143 +0x508
  command-line-arguments.TestRace()
      race_test.go:53 +0xac4
  testing.tRunner()
      1.22.0/go/src/testing/testing.go:1689 +0x180
  testing.(*T).Run.gowrap1()
      1.22.0/go/src/testing/testing.go:1742 +0x40

Goroutine 7 (running) created at:
  testing.(*T).Run()
      1.22.0/go/src/testing/testing.go:1742 +0x5e4
  testing.runTests.func1()
      1.22.0/go/src/testing/testing.go:2161 +0x80
  testing.tRunner()
      1.22.0/go/src/testing/testing.go:1689 +0x180
  testing.runTests()
      1.22.0/go/src/testing/testing.go:2159 +0x6e0
  testing.(*M).Run()
      1.22.0/go/src/testing/testing.go:2027 +0xb6c
  main.main()
      _testmain.go:47 +0x294
==================
    testing.go:1398: race detected during execution of test
--- FAIL: TestRace (0.10s)
FAIL
FAIL    command-line-arguments  0.362s
FAIL
khaf commented 1 month ago

PartitionFilter object is not designed nor intended to be used concurrently. It's a cursor, designed specifically to hold the state and progress of one ongoing query. You can reuse that object if the query is finished (for pagination) or if it errors out and terminates. Otherwise, the behavior is undefined. In your example, you reuse that object while the original query is ongoing, which is not how that object is intended to be used.

noam-ma-ma commented 1 month ago

PartitionFilter object is not designed nor intended to be used concurrently. It's a cursor, designed specifically to hold the state and progress of one ongoing query. You can reuse that object if the query is finished (for pagination) or if it errors out and terminates. Otherwise, the behavior is undefined. In your example, you reuse that object while the original query is ongoing, which is not how that object is intended to be used.

The example in aerospike docs reuses the partitionFilter the same way I used in the code block provided. Do you have an example for proper pagination then?

khaf commented 1 month ago

Thanks for bringing this to my attention. I don't know what that code is trying to achieve. I'll contact my colleagues and get it fixed. You can take a look at the correct way of using Query/Scan filters, including pagination here.

khaf commented 1 month ago

The following code should work without the race condition:

package bla

import (
    "github.com/aerospike/aerospike-client-go/v7"
    "log"
    "testing"
)

func TestRace(t *testing.T) {
    // Establishes a connection to the server
    client, err := aerospike.NewClient("127.0.0.1", 3000)
    if err != nil {
        log.Fatal(err)
    }
    defer client.Close()
    policy := aerospike.NewWritePolicy(0, 0)
    // Create the record key
    key, err := aerospike.NewKey("myns", "ufodata", 5001)
    if err != nil {
        log.Fatal(err)
    }
    mybin := aerospike.NewBin("mybin", 20220531)
    err = client.PutBins(policy, key, mybin)
    if err != nil {
        log.Fatal(err)
    }
    key, err = aerospike.NewKey("myns", "ufodata", 5002)
    err = client.PutBins(policy, key, mybin)
    if err != nil {
        log.Fatal(err)
    }

    // Create query policy
    queryPolicy := aerospike.NewQueryPolicy()
    queryPolicy.FilterExpression = aerospike.ExpBinExists("mybin")
    queryPolicy.MaxRecords = 1
    // Create statement
    stmt := aerospike.NewStatement("myns", "ufodata")

    partitionFilter := aerospike.NewPartitionFilterAll()
    recordSet, err := client.QueryPartitions(queryPolicy, stmt, partitionFilter)
    if err != nil {
        log.Fatal(err)
    }

    for {
        for result := range recordSet.Results() {
            if result.Err == nil && result.Record != nil {
                log.Printf("Record found: %v", result.Record.Bins)
            }
        }

                if partitionFilter.IsDone() {
                         break
                }

                // Execute the query reusing the partitionFilter until all records returned
        recordSet, err = client.QueryPartitions(queryPolicy, stmt, partitionFilter)
        if err != nil {
            log.Fatal(err)
        }
    }

    recordSet.Close()
}