xitongsys / parquet-go

pure golang library for reading/writing parquet file
Apache License 2.0
1.25k stars 294 forks source link

Multiple ParquetWriters | concurrent map read and map write | error #510

Open shivakumarss opened 1 year ago

shivakumarss commented 1 year ago

I am facing a performance issue, for me the requirement is different types of requests being received concurrently via REST and for each type, i have a different schema.

On the implementation level, i have an internal map containing key as source.ParquetFile and value as *writer.ParquetWriter.

This is internally maintained in a map, based on the request type I first query source.ParquetFile, if not present I will create a one and then create a *writer.ParquetWriter, write the data and put it back in the map.

On average, we receive around 30 - 40 k requests per Second/machine, which includes both types of requests and request types will be more going forward.

With less load it works fine, but as and when the load is increased below exception is thrown. I believe I am doing something fundamentally wrong with multiple writers and need assistance here.

fatal error: concurrent map read and map write
 goroutine 1266 [running]:
 github.com/xitongsys/parquet-go/writer.(*ParquetWriter).flushObjs(0xc0197fa0d0)
 /home/runner/go/pkg/mod/github.com/xitongsys/parquet-go@v1.6.2/writer/writer.go:325 +0x4ea
 github.com/xitongsys/parquet-go/writer.(*ParquetWriter).Flush(0xc0197fa0d0, 0x0)
 /home/runner/go/pkg/mod/github.com/xitongsys/parquet-go@v1.6.2/writer/writer.go:345 +0x3d
 github.com/xitongsys/parquet-go/writer.(*ParquetWriter).Write(0xc0197fa0d0, {0xf426c0?, 0xc0345e7970?})
 /home/runner/go/pkg/mod/github.com/xitongsys/parquet-go@v1.6.2/writer/writer.go:226 +0x28c
 github.com/domain/application/logger._logParquetInternal({0x10d8858?, 0x7?}, {0xc00003e033?, 0x100?}, {0xf426c0, 0xc0345e7970}, {0x11126a7, 0x731}, {0x112b881, 0xedb2ea172, ...})
 /home/runner/work/application/package/logger/parquetLogger.go:116 +0x166
 github.com/domain/application/logger.LogrmoduleUserSyncP(...)
 /home/runner/work/application/package/logger/parquetLogger.go:68
 github.com/domain/application/controller.rmoduleUserSyncController({0x122e270, 0xc034595cc0}, 0xc0346a8300)
 /home/runner/work/application/package/controller/rmoduleUserSyncController.go:21 +0x17f
 net/http.HandlerFunc.ServeHTTP(0xc0346a8200?, {0x122e270?, 0xc034595cc0?}, 0x1221d28?)

Writer configuration

//write
    pw, err := writer.NewParquetWriter(fw, schema, 4)
    if err != nil {
        log.Println("Can't create parquet writer", err)
        return nil, err
    }
    pw.RowGroupSize = 128 * 1024 * 1024 //128M
    pw.CompressionType = parquet.CompressionCodec_SNAPPY

EDIT 1 : Go lang version

go1.19.4
shivakumarss commented 1 year ago

This bug is related to https://github.com/xitongsys/parquet-go/issues/241. I have multiple writers open and flush is being called internally writer.go when ObjsSize is more than criSize


    if pw.CheckSizeCritical <= ln {
        pw.ObjSize = (pw.ObjSize+common.SizeOf(val))/2 + 1
    }
    pw.ObjsSize += pw.ObjSize
    pw.Objs = append(pw.Objs, src)

    criSize := pw.NP * pw.PageSize * pw.SchemaHandler.GetColumnNum()

    if pw.ObjsSize >= criSize {
        err = pw.Flush(false)