xitongsys / parquet-go

pure golang library for reading/writing parquet file
Apache License 2.0
1.27k stars 293 forks source link

Parallelize reading parquet files #445

Closed TechGeeky closed 2 years ago

TechGeeky commented 2 years ago

I am working with parquet file where I am reading each file one by one sequentially in my below outer for loop and populating concurrent map after going through all those rows inside each file. Total number of files are 50 and each file size is around 60MB max.

I need to parallelize my for loop which read all these parquet files in parallel and also populate map by reading all these parquet files in parallel. These concurrent maps inside data struct will be read by multiple reader threads concurrently and also written by multiple writers in parallel inside for loop. I want to make sure they are safe and operation is atomic. I also got getter method to access those concurrent map.

Below is the code I got but I am not sure if this is the right way to parallelize it or I missing something very basics?

    import (
    //....
    pars3 "github.com/xitongsys/parquet-go-source/s3"
    "github.com/xitongsys/parquet-go/reader"
    cmap "github.com/orcaman/concurrent-map"
    //....
    )

    type Data interface {
        GetCustomerMap() *cmap.ConcurrentMap
        GetCustomerMetricsMap() *cmap.ConcurrentMap
    }

    type data struct {
        // will have getter methods to access these below map
        customers          *cmap.ConcurrentMap
        customerMetrics    *cmap.ConcurrentMap
    }

    //loadParquet.. This will be called by background thread periodically
    func (r *data) loadParquet(path string, bucket string, key string) error {
        var err error
        var files []string
        files, err = r.s3Client.ListObjects(bucket, key, ".parquet")
        if err != nil {
            return err
        }

        var waitGroup sync.WaitGroup 
        // Set number of effective goroutines we want to wait upon 
        waitGroup.Add(len(files)) 

        // parallelize below for loop in such a way so that I can populate my map in thread safe way?
        // And same map will be accessed by multiple reader threads too.
        // This writes to our map happens from background thread but there are lot of reader threads reading from the map.
        for i, file := range files {
            err = func() error {
                fr, err := pars3.NewS3FileReader(context.Background(), bucket, file, r.s3Client.GetSession().Config)
                if err != nil {
                    return errs.Wrap(err)
                }
                defer xio.CloseIgnoringErrors(fr)

                pr, err := reader.NewParquetReader(fr, nil, 4)
                if err != nil {
                    return errs.Wrap(err)
                }

                // confuse on this for loop?
                // do we need to parallelize here too?
                for {
                    rows, err := pr.ReadByNumber(100)
                    if err != nil {
                        return errs.Wrap(err)
                    }

                    if len(rows) <= 0 {
                        break
                    }

                    byteSlice, err := json.Marshal(rows)
                    if err != nil {
                        return errs.Wrap(err)
                    }

                    var rows []ParquetProduct
                    err = json.Unmarshal(byteSlice, &rows)
                    if err != nil {
                        return errs.Wrap(err)
                    }

                    // read rows struct and put inside concurrent map.
                    // Need to populate map in such a way so that it is atomic and thread safe
                    // from multiple parallel writes inside this for loop 
                    // and multiple reads will happen from reader threads on these maps
                    for i := range rows {
                      // ...
                      // ...
                      r.customers.Set(.....)
                      r.customerMetrics.Set(....)
                    }
                }
                return nil
            }()

            if err != nil {
                return err
            }

            go task(&waitGroup) // Achieving maximum concurrency 
        }
        // Wait until all goroutines have completed execution. 
        waitGroup.Wait()  
        return nil
    }

    //GetCustomerMap.. These will be accessed by multiple reader threads to get data out of map.
    func (r *data) GetCustomerMap() *cmap.ConcurrentMap {
        return r.customers
    }

    //GetCustomerMetricsMap.. These will be accessed by multiple reader threads to get data out of map.
    func (r *data) GetCustomerMetricsMap() *cmap.ConcurrentMap {
        return r.customerMetrics
    }
TechGeeky commented 2 years ago

@xitongsys Will you be able to help me here? I am just trying to see if it makes sense to parallelize reading parquet files and if it does then what I got is correct or anything else can be improved?