xitongsys / parquet-go

pure golang library for reading/writing parquet file
Apache License 2.0
1.27k stars 293 forks source link

Panic in reading file with goroutine #299

Closed blackrez closed 4 years ago

blackrez commented 4 years ago

Hello,

Thanks for the librairie, it works great.

I have an issue with my code but only with one file (on 10 file only the 9th failed!). I parsed the file without goroutine and it works.

go env

GO111MODULE=""
GOARCH="amd64"
GOBIN=""
GOCACHE="/home/nabil_servais_partner/.cache/go-build"
GOENV="/home/nabil_servais_partner/.config/go/env"
GOEXE=""
GOFLAGS=""
GOHOSTARCH="amd64"
GOHOSTOS="linux"
GOINSECURE=""
GOMODCACHE="/home/nabil_servais_partner/go/pkg/mod"
GONOPROXY=""
GONOSUMDB=""
GOOS="linux"
GOPATH="/home/nabil_servais_partner/go"
GOPRIVATE=""
GOPROXY="https://proxy.golang.org,direct"
GOROOT="/opt/go"
GOSUMDB="sum.golang.org"
GOTMPDIR=""
GOTOOLDIR="/opt/go/pkg/tool/linux_amd64"
GCCGO="gccgo"
AR="ar"
CC="gcc"
CXX="g++"
CGO_ENABLED="1"
GOMOD="/home/nabil_servais_partner/go/src/github.com/dkt/awscur2bq/go.mod"
CGO_CFLAGS="-g -O2"
CGO_CPPFLAGS=""
CGO_CXXFLAGS="-g -O2"
CGO_FFLAGS="-g -O2"
CGO_LDFLAGS="-g -O2"
PKG_CONFIG="pkg-config"
GOGCCFLAGS="-fPIC -m64 -pthread -fno-caret-diagnostics -Qunused-arguments -fmessage-length=0 -fdebug-prefix-map=/tmp/go-build367123898=/tmp/go-build -gno-record-gcc-switches"
package main

import (
    "context"
    //"os"
    //"io/ioutil"
    "fmt"
    "github.com/aws/aws-sdk-go/aws"
    "github.com/aws/aws-sdk-go/aws/session"
    s3aws "github.com/aws/aws-sdk-go/service/s3"
    curschema "github.com/dkt/awscur2bq/internal/schema"
    "github.com/google/uuid"
    "github.com/json-iterator/go"
    "github.com/xitongsys/parquet-go-source/s3"
    "github.com/xitongsys/parquet-go/reader"
    "log"
    "sync"
)

var (
    wg sync.WaitGroup
)

func deployWorker(ch <-chan []*genericRecord, workerID int, wg *sync.WaitGroup) {
    // Create BQ client and context
    log.Println("deployWorker")
    for {
        data, more := <-ch
        if !more {
            break
        } else {
            log.Println(data)
        }
    }
    client.Close()
    wg.Done()
}

func main() {
    var err error
    ctx := context.Background()
    // read the written parquet file
    // create new S3 file reader
    bucket := "report"
    sess, err := session.NewSession()
    svc := s3aws.New(sess)
        // test with problematic files
    prefix := "test-parquet-id/export-parquet/export-parquet/year=2020/month=7/export-parquet-00009.snappy.parquet"
    resp, err := svc.ListObjectsV2(&s3aws.ListObjectsV2Input{Bucket: aws.String(bucket), Prefix: &prefix})
    if err != nil {
        log.Println("Unable to list items in bucket %q, %v", bucket, err)
        return
    }
    fmt.Println("Found", len(resp.Contents), "items in bucket", bucket)
    for _, item := range resp.Contents {

        fmt.Println("Name:         ", *item.Key)
        fmt.Println("Last modified:", *item.LastModified)
        fmt.Println("Size:         ", *item.Size)

        fmt.Println("")
        key := *item.Key
        fr, err := s3.NewS3FileReader(ctx, bucket, key)
        if err != nil {
            log.Println("Can't open file", err)
            return
        }

        pr, err := reader.NewParquetReader(fr, nil, 4)
        if err != nil {
            log.Println("Can't create parquet reader", err)
            return
        }
        log.Println("read ok")
        num := int(pr.GetNumRows())
        log.Println("read num", num)

        c1 := make(chan []*genericRecord)
        go func(chanData chan<- []*genericRecord, pr *reader.ParquetReader, item *string) {
            n := 0
            for n <= num/100 {
                log.Println(n)
                log.Println(item)
                res, _ := pr.ReadByNumber(100)
                pr.SkipRowsByIndex(int64(n)*100, 100)
                jason, _ := jsoniter.Marshal(res)
                var data []*genericRecord
                jsoniter.Unmarshal(jason, &data)
                chanData <- data
                n++
            }
            pr.ReadStop()
            close(chanData)
        }(c1, pr, item.Key)

        for idx := 0; idx < 10; idx++ {
            wg.Add(1)
            go deployWorker(c1, &wg)
        }
        wg.Wait()

        fr.Close()
    }

}

The error :

2020/08/12 09:44:02 2110

goroutine 6609196 [running]:
github.com/xitongsys/parquet-go/encoding.ReadPlainFIXED_LEN_BYTE_ARRAY(0xc039f547b0, 0x1, 0xffffffff50000000, 0xc02c67ded0, 0x0, 0x0, 0x1f61, 0x4)
        /home/nabil_servais_partner/go/pkg/mod/github.com/xitongsys/parquet-go@v1.5.2/encoding/encodingread.go:116 +0x99
github.com/xitongsys/parquet-go/encoding.ReadDeltaLengthByteArray(0xc039f547b0, 0xc02ea5c000, 0x1f60, 0x2600, 0x0, 0x0)
        /home/nabil_servais_partner/go/pkg/mod/github.com/xitongsys/parquet-go@v1.5.2/encoding/encodingread.go:336 +0x10d
github.com/xitongsys/parquet-go/encoding.ReadDeltaByteArray(0xc039f547b0, 0x1f600, 0xdcc940, 0xc000121a68, 0x1000000ca3b0a, 0x1f600)
        /home/nabil_servais_partner/go/pkg/mod/github.com/xitongsys/parquet-go@v1.5.2/encoding/encodingread.go:355 +0x85
github.com/xitongsys/parquet-go/layout.ReadDataPageValues(0xc039f547b0, 0x7, 0x6, 0x0, 0x1f60, 0x0, 0x0, 0x2, 0x0, 0xc000121b60, ...)
        /home/nabil_servais_partner/go/pkg/mod/github.com/xitongsys/parquet-go@v1.5.2/layout/page.go:680 +0x21b
github.com/xitongsys/parquet-go/layout.ReadPage(0xc000adc060, 0xc000baaa80, 0xc0002628f0, 0x448b53, 0xc02d1b60b8, 0x10, 0x0, 0xc000121e88)
        /home/nabil_servais_partner/go/pkg/mod/github.com/xitongsys/parquet-go@v1.5.2/layout/page.go:864 +0xac7
github.com/xitongsys/parquet-go/reader.(*ColumnBufferType).ReadPage(0xc000baaaf0, 0xc000121e88, 0x0)
        /home/nabil_servais_partner/go/pkg/mod/github.com/xitongsys/parquet-go@v1.5.2/reader/columnbuffer.go:108 +0x85
github.com/xitongsys/parquet-go/reader.(*ColumnBufferType).ReadRows(0xc000baaaf0, 0x64, 0xc0013e0d80, 0x59)
        /home/nabil_servais_partner/go/pkg/mod/github.com/xitongsys/parquet-go@v1.5.2/reader/columnbuffer.go:237 +0x4c
github.com/xitongsys/parquet-go/reader.(*ParquetReader).read.func1(0xc02d1b6060, 0xc0486bea80, 0xc00027a0a0, 0x64, 0xc02e117128, 0xc001235af0, 0xc02c7faf80)
        /home/nabil_servais_partner/go/pkg/mod/github.com/xitongsys/parquet-go@v1.5.2/reader/reader.go:284 +0x170
created by github.com/xitongsys/parquet-go/reader.(*ParquetReader).read
        /home/nabil_servais_partner/go/pkg/mod/github.com/xitongsys/parquet-go@v1.5.2/reader/reader.go:277 +0x2f2
blackrez commented 4 years ago

Also try with go1.4, same result.

xitongsys commented 4 years ago

hi, @blackrez

pr.SkipRowsByIndex(int64(n)*100, 100)

this row is wrong. But it may be not the panic cause. You can delete it and run again firstly.

blackrez commented 4 years ago

I remove the line and I have the same issue. I don't understand why I have to remove this line. How I can read the files by increments (this files are huge, I can't read the entire files).

xitongsys commented 4 years ago

Hi, @blackrez Sorry for late response. The reader is an increment reader. the skip functions are use to skip some rows(not read them but read the rows after them).

This issue is werid. Could you provide standalone example and parquet file to reproduce this issue? (something no need to connect s3 and read local file)

blackrez commented 4 years ago

Hello,

I'm on vacation now, I will access to the files next week. I will send one of them but in private way.

How I can share safely ?

Thanks in advance.

Sam 22 août 2020, à 05:27, xitongsys a écrit :

Hi, @blackrez https://github.com/blackrez Sorry for late response. The reader is an increment reader. the skip functions are use to skip some rows(not read them but read the rows after them).

This issue is werid. Could you provide standalone example and parquet file to reproduce this issue? (something no need to connect s3 and read local file)

— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/xitongsys/parquet-go/issues/299#issuecomment-678587121, or unsubscribe https://github.com/notifications/unsubscribe-auth/AAGC5IKS5A47X4QXISPBSY3SB43KVANCNFSM4P4QE7WA.

xitongsys commented 4 years ago

hi, @blackrez It's better if you can provide a small sample file to reproduce this issue. And you can send it to my email. If you can't get such a small file, maybe you can share it through Dropbox. And send the share link to my email.

my email: xitongsys@gmail.com

thanks.