olivere / elastic

Deprecated: Use the official Elasticsearch client for Go at https://github.com/elastic/go-elasticsearch
https://olivere.github.io/elastic/
MIT License
7.42k stars 1.15k forks source link

fatal error: concurrent map read and map write #1629

Closed ppg-coder closed 2 years ago

ppg-coder commented 2 years ago

I use github.com/olivere/elastic/v7 v7.0.22 and go 1.15

I use bulk processor to batch index and update doc, my setup code like this

func (i *SpuEs) BulkProcessor(ctx context.Context) (*elastic.BulkProcessor, error) {
    return i.EsClient.BulkProcessor().
        Name(i.EsIndexName).
        Workers(5).
        BulkActions(4096).
        BulkSize(2 << 20).
        FlushInterval(2 * time.Second).
        After(func(executionId int64, requests []elastic.BulkableRequest, response *elastic.BulkResponse, err error) {
            if err != nil {
                failedNum := 0
                if response != nil {
                    failedNum = len(response.Failed())
                }
                xlog.WithEvent(i.EsIndexName).Errorf("BulkProcessor failed, id:%v, req_num:%v, failed_num:%v, err:%v", executionId, len(requests), failedNum, err)
                return
            }
            if response == nil || !response.Errors {
                return
            }
            for _, item := range response.Items {
                for key, val := range item {
                    if val.Error == nil {
                        continue
                    }
                    xlog.WithEvent(i.EsIndexName).Errorf("BulkProcessor failed, key:%v, res:%v, err:%v", key, val.Result, val.Error.Reason)
                }
            }
        }).
        Do(ctx)
}

atfer my service start, the fatal error occurred with probability

fatal error: concurrent map read and map write

goroutine 455 [running]:
runtime.throw(0x1d408db, 0x21)
    /usr/local/go/src/runtime/panic.go:1116 +0x72 fp=0xc0034599c8 sp=0xc003459998 pc=0x437cb2
runtime.mapaccess2(0x1a1ac80, 0xc005cb8ff0, 0xc00aea08c0, 0xc00aea08c0, 0xc0055fe160)
    /usr/local/go/src/runtime/map.go:469 +0x25b fp=0xc003459a08 sp=0xc0034599c8 pc=0x40f91b
reflect.mapaccess(0x1a1ac80, 0xc005cb8ff0, 0xc00aea08c0, 0x1d2b0ae)
    /usr/local/go/src/runtime/map.go:1309 +0x3f fp=0xc003459a40 sp=0xc003459a08 pc=0x469d5f
reflect.Value.MapIndex(0x1a1ac80, 0xc005cb8ff0, 0x15, 0x196f860, 0xc00aea08c0, 0x98, 0x19e70e0, 0xc00aea09c0, 0x94)
    /usr/local/go/src/reflect/value.go:1189 +0x16e fp=0xc003459ab8 sp=0xc003459a40 pc=0x4a3d0e
encoding/json.mapEncoder.encode(0x1d88598, 0xc006186100, 0x1a1ac80, 0xc005cb8ff0, 0x15, 0x1a10100)
    /usr/local/go/src/encoding/json/encode.go:801 +0x30d fp=0xc003459c30 sp=0xc003459ab8 pc=0x75bdad
encoding/json.mapEncoder.encode-fm(0xc006186100, 0x1a1ac80, 0xc005cb8ff0, 0x15, 0x2bd0100)
    /usr/local/go/src/encoding/json/encode.go:777 +0x65 fp=0xc003459c70 sp=0xc003459c30 pc=0x768405
encoding/json.(*encodeState).reflectValue(0xc006186100, 0x1a1ac80, 0xc005cb8ff0, 0x15, 0xc003450100)
    /usr/local/go/src/encoding/json/encode.go:358 +0x82 fp=0xc003459ca8 sp=0xc003459c70 pc=0x758f42
encoding/json.(*encodeState).marshal(0xc006186100, 0x1a1ac80, 0xc005cb8ff0, 0x100, 0x0, 0x0)
    /usr/local/go/src/encoding/json/encode.go:330 +0xf4 fp=0xc003459d08 sp=0xc003459ca8 pc=0x758b34
encoding/json.Marshal(0x1a1ac80, 0xc005cb8ff0, 0x45, 0xc000f29950, 0x45, 0x0, 0x0)
    /usr/local/go/src/encoding/json/encode.go:161 +0x52 fp=0xc003459d80 sp=0xc003459d08 pc=0x757fb2
github.com/olivere/elastic/v7.(*BulkIndexRequest).Source(0xc006bbaee0, 0xc003459ee0, 0x2, 0x0, 0x1, 0x1)
    /go/pkg/mod/github.com/olivere/elastic/v7@v7.0.22/bulk_index_request.go:240 +0x39a fp=0xc003459e68 sp=0xc003459d80 pc=0x147efba
github.com/olivere/elastic/v7.(*bulkWorker).work(0xc001994b80, 0x1f72580, 0xc0003c1a80)
    /go/pkg/mod/github.com/olivere/elastic/v7@v7.0.22/bulk_processor.go:484 +0x46b fp=0xc003459fc8 sp=0xc003459e68 pc=0x148260b
runtime.goexit()
    /usr/local/go/src/runtime/asm_amd64.s:1374 +0x1 fp=0xc003459fd0 sp=0xc003459fc8 pc=0x4707e1
created by github.com/olivere/elastic/v7.(*BulkProcessor).Start
    /go/pkg/mod/github.com/olivere/elastic/v7@v7.0.22/bulk_processor.go:336 +0x1ce
ppg-coder commented 2 years ago

image