reproio / columnify

Make record oriented data to columnar format.
Apache License 2.0
38 stars 6 forks source link

Measure memprofile #44

Closed syucream closed 4 years ago

syucream commented 4 years ago

Current version requires many ram resources. I'd like to investigate high ram consumers and try to reduce it. ref. https://golang.org/pkg/runtime/pprof/

syucream commented 4 years ago

I just tried such patch to use pprof to observe mem usage on formatters.

diff --git a/cmd/columnify/columnify.go b/cmd/columnify/columnify.go
index 7ba91c9..7282536 100644
--- a/cmd/columnify/columnify.go
+++ b/cmd/columnify/columnify.go
@@ -4,6 +4,9 @@ import (
        "flag"
        "fmt"
        "log"
+       "os"
+       "runtime"
+       "runtime/pprof"

        "github.com/reproio/columnify/columnifier"
 )
@@ -23,8 +26,21 @@ func columnify(c columnifier.Columnifier, files []string) (err error) {
                err = c.Close()
        }()

+       // pprof
+       f, err := os.Create("/tmp/columnify.mem.prof")
+       if err != nil {
+               return err
+       }
+       defer f.Close()
+
        _, err = c.WriteFromFiles(files)

+       runtime.GC()
+       err = pprof.WriteHeapProfile(f)
+       if err != nil {
+               return err
+       }
        return
 }

With a large Avro file.

$ java -jar ~/tools/avro-tools-1.8.2.jar random --schema-file examples/primitives.avsc --count 1000000 tmp.avro
log4j:WARN No appenders could be found for logger (org.apache.hadoop.metrics2.lib.MutableMetricsFactory).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
test.seed=1594128766010
[ryo@Macintosh] $ ls -lah tmp.avro
-rw-r--r-- 1 ryo staff 80M  7  7 22:32 tmp.avro

Then I ran columnify and finally got such result.

$ ./columnify -schemaType avro -schemaFile examples/primitives.avsc -recordType avro tmp.avro > /dev/null
$ go tool pprof /tmp/columnify.mem.prof
Type: inuse_space
Time: Jul 7, 2020 at 10:27pm (JST)
Entering interactive mode (type "help" for commands, "o" for options)
(pprof) top
Showing nodes accounting for 115.24MB, 97.43% of 118.27MB total
Dropped 41 nodes (cum <= 0.59MB)
Showing top 10 nodes out of 36
      flat  flat%   sum%        cum   cum%
   99.99MB 84.54% 84.54%   107.50MB 90.89%  github.com/xitongsys/parquet-go/layout.(*Page).DataPageCompress
    4.11MB  3.47% 88.02%     4.11MB  3.47%  github.com/klauspost/compress/zstd.encoderOptions.encoder
    2.50MB  2.11% 90.13%     4.58MB  3.87%  bytes.(*Buffer).grow
    2.08MB  1.76% 91.89%     2.08MB  1.76%  bytes.makeSlice
    1.50MB  1.27% 93.16%     1.50MB  1.27%  github.com/xitongsys/parquet-go/parquet.NewStatistics (inline)
    1.50MB  1.27% 94.43%     1.50MB  1.27%  github.com/xitongsys/parquet-go/parquet.NewPageHeader (inline)
    1.03MB  0.87% 95.30%     1.03MB  0.87%  fmt.Sprintf
       1MB  0.85% 96.15%        1MB  0.85%  github.com/xitongsys/parquet-go/parquet.NewDataPageHeader (inline)
       1MB  0.85% 96.99%        1MB  0.85%  strings.genSplit
    0.52MB  0.44% 97.43%     1.03MB  0.87%  github.com/xitongsys/parquet-go/writer.(*ParquetWriter).Write

The highest consumer was a struct in parquet-go but the consumption is not so high. Through that I guess 2 possibility 1) during Write() calls, intermediate representation consumes many memory, 2) after that, WriteStop() or Clouse() consumes.

syucream commented 4 years ago

Next, I tried the same examination for WriteStop() by the below patch:

diff --git a/columnifier/parquet.go b/columnifier/parquet.go
index 97f56a3..68b15bb 100644
--- a/columnifier/parquet.go
+++ b/columnifier/parquet.go
@@ -2,6 +2,9 @@ package columnifier

 import (
        "io/ioutil"
+       "os"
+       "runtime"
+       "runtime/pprof"

        "github.com/reproio/columnify/record"

@@ -117,9 +120,22 @@ func (c *parquetColumnifier) WriteFromFiles(paths []string) (int, error) {

 // Close stops writing parquet files ant finalize this conversion.
 func (c *parquetColumnifier) Close() error {
+       // pprof
+       f, err := os.Create("/tmp/columnify.mem.prof")
+       if err != nil {
+               return err
+       }
+       defer f.Close()
+
        if err := c.w.WriteStop(); err != nil {
                return err
        }

+       runtime.GC()
+       err = pprof.WriteHeapProfile(f)
+       if err != nil {
+               return err
+       }
+
        return c.w.PFile.Close()
 }

This part was not a higher consumer.

$ go tool pprof /tmp/columnify.mem.prof
Type: inuse_space
Time: Jul 7, 2020 at 10:41pm (JST)
Entering interactive mode (type "help" for commands, "o" for options)
(pprof) top
Showing nodes accounting for 4513.17kB, 100% of 4513.17kB total
Showing top 10 nodes out of 31
      flat  flat%   sum%        cum   cum%
 2802.47kB 62.10% 62.10%  2802.47kB 62.10%  github.com/klauspost/compress/zstd.encoderOptions.encoder
 1710.70kB 37.90%   100%  1710.70kB 37.90%  bytes.makeSlice
         0     0%   100%  1123.33kB 24.89%  bytes.(*Buffer).ReadFrom
         0     0%   100%   587.37kB 13.01%  bytes.(*Buffer).Write
         0     0%   100%  1710.70kB 37.90%  bytes.(*Buffer).grow
         0     0%   100%   587.37kB 13.01%  encoding/base64.(*encoder).Write
         0     0%   100%   587.37kB 13.01%  encoding/json.(*encodeState).marshal
         0     0%   100%   587.37kB 13.01%  encoding/json.(*encodeState).reflectValue
         0     0%   100%   587.37kB 13.01%  encoding/json.Marshal
         0     0%   100%   587.37kB 13.01%  encoding/json.encodeByteSlice
syucream commented 4 years ago

Hmm, at WriteStop(), the most of conversion should be finished. During Write() calls, intermediate data probably consumes memory I think.

syucream commented 4 years ago

One more try, to check a memory overhead on FormatToMap()

diff --git a/columnifier/parquet.go b/columnifier/parquet.go
index 97f56a3..4f9d79b 100644
--- a/columnifier/parquet.go
+++ b/columnifier/parquet.go
@@ -2,6 +2,9 @@ package columnifier

 import (
        "io/ioutil"
+       "os"
+       "runtime"
+       "runtime/pprof"

        "github.com/reproio/columnify/record"

@@ -66,6 +69,13 @@ func NewParquetColumnifier(st string, sf string, rt string, output string, confi

 // Write reads, converts input binary data and write it to buffer.
 func (c *parquetColumnifier) Write(data []byte) (int, error) {
+       // pprof
+       f, err := os.Create("/tmp/columnify.mem.prof")
+       if err != nil {
+               return -1, err
+       }
+       defer f.Close()
+
        // Intermediate record type is map[string]interface{}
        c.w.MarshalFunc = parquet.MarshalMap
        records, err := record.FormatToMap(data, c.schema, c.rt)
@@ -73,6 +83,12 @@ func (c *parquetColumnifier) Write(data []byte) (int, error) {
                return -1, err
        }

+       runtime.GC()
+       err = pprof.WriteHeapProfile(f)
+       if err != nil {
+               return -1, err
+       }
+
        beforeSize := c.w.Size
        for _, r := range records {
                if err := c.w.Write(r); err != nil {

So I found a high memory consumer! Some data members generated by FormatToMap() still consumes memory. It's an Avro case, but I guess there're same issues in other data formats. In an other words, expanded intermediate data typed map[string]interface{} consumes many memory. It's a natural scenario especially under Avro and msgpack. The binary encoded records by such formats are compressed, and sometimes have memory efficient schema info but that map[string]interface data is too naive.

$ go tool pprof /tmp/columnify.mem.prof
Type: inuse_space
Time: Jul 7, 2020 at 11:36pm (JST)
Entering interactive mode (type "help" for commands, "o" for options)
(pprof) top
Showing nodes accounting for 543.88MB, 99.72% of 545.42MB total
Dropped 10 nodes (cum <= 2.73MB)
Showing top 10 nodes out of 31
      flat  flat%   sum%        cum   cum%
  312.08MB 57.22% 57.22%   312.08MB 57.22%  github.com/reproio/columnify/record.flattenAvroUnion
  101.09MB 18.53% 75.75%   101.09MB 18.53%  bytes.makeSlice
   45.30MB  8.31% 84.06%    57.30MB 10.51%  github.com/linkedin/goavro/v2.stringNativeFromBinary
   24.50MB  4.49% 88.55%    37.50MB  6.88%  github.com/linkedin/goavro/v2.bytesNativeFromBinary
      23MB  4.22% 92.77%       23MB  4.22%  github.com/linkedin/goavro/v2.longNativeFromBinary
   11.50MB  2.11% 94.88%    11.50MB  2.11%  github.com/linkedin/goavro/v2.doubleNativeFromBinary
    8.67MB  1.59% 96.47%   541.14MB 99.22%  github.com/reproio/columnify/record.FormatAvroToMap
       8MB  1.47% 97.93%        8MB  1.47%  github.com/linkedin/goavro/v2.intNativeFromBinary
       7MB  1.28% 99.22%        7MB  1.28%  github.com/linkedin/goavro/v2.floatNativeFromBinary
    2.74MB   0.5% 99.72%     2.74MB   0.5%  github.com/klauspost/compress/zstd.encoderOptions.encoder
(pprof) list github.com/reproio/columnify/record.flattenAvroUnion
Total: 545.42MB
ROUTINE ======================== github.com/reproio/columnify/record.flattenAvroUnion in /Users/ryo/.go/src/github.com/reproio/columnify/record/avro.go
  312.08MB   312.08MB (flat, cum) 57.22% of Total
         .          .      9:   "github.com/linkedin/goavro/v2"
         .          .     10:)
         .          .     11:
         .          .     12:// flattenAvroUnion flattens nested map type has only 1 element.
         .          .     13:func flattenAvroUnion(in map[string]interface{}) map[string]interface{} {
   42.50MB    42.50MB     14:   out := make(map[string]interface{})
         .          .     15:
         .          .     16:   for k, v := range in {
         .          .     17:           if m, ok := v.(map[string]interface{}); ok {
         .          .     18:                   // Flatten because Avro-JSON representation has redundant nested map type.
         .          .     19:                   // see also https://github.com/linkedin/goavro#translating-from-go-to-avro-data
         .          .     20:                   if len(m) == 1 {
         .          .     21:                           for _, vv := range m {
         .          .     22:                                   out[k] = vv
         .          .     23:                                   break
         .          .     24:                           }
         .          .     25:                   } else {
         .          .     26:                           out[k] = flattenAvroUnion(m)
         .          .     27:                   }
         .          .     28:           } else {
  269.57MB   269.57MB     29:                   out[k] = v
         .          .     30:           }
         .          .     31:   }
         .          .     32:
         .          .     33:   return out
         .          .     34:}
syucream commented 4 years ago

I will

syucream commented 4 years ago

If we replace intermediate records typed in map[string]interface{} with Arrow records and use it directly ... ?

diff --git a/columnifier/parquet.go b/columnifier/parquet.go
index 97f56a3..6979635 100644
--- a/columnifier/parquet.go
+++ b/columnifier/parquet.go
@@ -1,7 +1,11 @@
 package columnifier

 import (
+       "fmt"
        "io/ioutil"
+       "os"
+       "runtime"
+       "runtime/pprof"

        "github.com/reproio/columnify/record"

@@ -66,6 +70,19 @@ func NewParquetColumnifier(st string, sf string, rt string, output string, confi

 // Write reads, converts input binary data and write it to buffer.
 func (c *parquetColumnifier) Write(data []byte) (int, error) {
+       // Intermediate record type is wrapped Apache Arrow record
+       func() {
+               f, _ := os.Create("/tmp/columnify.mem.prof")
+               defer f.Close()
+
+               records, _ := record.FormatToArrow(data, c.schema, c.rt)  // <- if we replace map's with arrow ?
+
+               runtime.GC()
+               pprof.WriteHeapProfile(f)
+
+               fmt.Println(records.Record.NumRows())
+       }()
+

The memory consumption will be smaller than current version! It's a rough examination but memory consumption changed 543.88MB -> 179.58MB, about 33%! I guess that the story is not so wired, Arrow data representation has aggregated schema info (doesn't have schema by record like map[string]interface{}), column-oriented and efficient memory layout.

$ ./columnify -schemaType avro -schemaFile examples/primitives.avsc -recordType avro tmp.avro > /dev/null
$ go tool pprof /tmp/columnify.mem.prof
Type: inuse_space
Time: Jul 9, 2020 at 11:09pm (JST)
Entering interactive mode (type "help" for commands, "o" for options)
(pprof) top
Showing nodes accounting for 179.58MB, 100% of 179.58MB total
Dropped 4 nodes (cum <= 0.90MB)
Showing top 10 nodes out of 49
      flat  flat%   sum%        cum   cum%
   95.51MB 53.19% 53.19%    95.51MB 53.19%  github.com/apache/arrow/go/arrow/memory.(*GoAllocator).Allocate (inline)
   78.59MB 43.77% 96.95%    78.59MB 43.77%  bytes.makeSlice
    5.47MB  3.05%   100%     5.47MB  3.05%  github.com/klauspost/compress/zstd.encoderOptions.encoder
         0     0%   100%    78.59MB 43.77%  bytes.(*Buffer).Grow
         0     0%   100%    78.59MB 43.77%  bytes.(*Buffer).grow
         0     0%   100%    72.03MB 40.11%  github.com/apache/arrow/go/arrow/array.(*BinaryBuilder).Append
         0     0%   100%     8.02MB  4.47%  github.com/apache/arrow/go/arrow/array.(*BinaryBuilder).Reserve
         0     0%   100%     8.02MB  4.47%  github.com/apache/arrow/go/arrow/array.(*BinaryBuilder).Resize
         0     0%   100%     3.82MB  2.13%  github.com/apache/arrow/go/arrow/array.(*Float32Builder).NewArray
         0     0%   100%     3.82MB  2.13%  github.com/apache/arrow/go/arrow/array.(*Float32Builder).NewFloat32Array

So I think we should finally switch to arrow for intermediate data representation. It helps to resolve https://github.com/reproio/columnify/issues/27, and also we might be able to contribute solutions to arrow official repo.

And as a short term workaround, we can do 1) writing a temp files in filesystem once, 2) compress map[string]interface{} typed data by faster codecs like lz4 or something like that. Any idea?

syucream commented 4 years ago

we should finally switch to arrow for intermediate data representation.

I created an another issue https://github.com/reproio/columnify/issues/45, to retry implementing Arrow intermediates with a migration plan!

syucream commented 4 years ago

Just a measure memusage, it's already finished :)