reproio / columnify

Make record oriented data to columnar format.
Apache License 2.0
38 stars 6 forks source link

Implement stream-based input record decoder's instead of batch-based #52

Closed syucream closed 4 years ago

syucream commented 4 years ago

It should reduce memory usage on most cases! I checked RSS memory usage with 270 MB sized msgpack encoded file, the highest usage was 341624 kB. Contrary it's 2373248 kB at the latest release version v0.0.4 It's a dramatic reduction effect, the value was about 0.15%!

Additionally it possibly accelerate the parquet encoding(it just might be affected by the memory reduction I think):

# v0.0.4
$ time ./columnify -schemaType avro -schemaFile ./rails-small.avsc -recordType msgpack ./tmp.msgpack > /dev/null
./columnify -schemaType avro -schemaFile ./rails-small.avsc -recordType   >   53.01s user 2.15s system 132% cpu 41.541 total

# with this patch
$ time ./columnify -schemaType avro -schemaFile ./rails-small.avsc -recordType msgpack ./tmp.msgpack > /dev/null
./columnify -schemaType avro -schemaFile ./rails-small.avsc -recordType   >   38.34s user 0.98s system 108% cpu 36.281 total

NOTE it has a breaking change; Columnifier interface stops supporting io.WriterCloser

syucream commented 4 years ago

It's from https://github.com/reproio/columnify/issues/43

codecov-commenter commented 4 years ago

Codecov Report

Merging #52 into master will increase coverage by 12.71%. The diff coverage is 75.73%.

Impacted file tree graph

@@             Coverage Diff             @@
##           master      #52       +/-   ##
===========================================
+ Coverage   70.05%   82.77%   +12.71%     
===========================================
  Files          19       15        -4     
  Lines         875      505      -370     
===========================================
- Hits          613      418      -195     
+ Misses        203       65      -138     
+ Partials       59       22       -37     
Flag Coverage Δ
#unittests 82.77% <75.73%> (+12.71%) :arrow_up:

Flags with carried forward coverage won't be shown. Click here to find out more.

Impacted Files Coverage Δ
columnifier/columnifier.go 0.00% <ø> (ø)
parquet/stdio.go 80.00% <ø> (ø)
record/record.go 19.35% <27.27%> (-5.65%) :arrow_down:
record/avro.go 77.14% <63.63%> (+4.41%) :arrow_up:
columnifier/parquet.go 83.60% <78.57%> (-1.58%) :arrow_down:
record/jsonl.go 83.33% <83.33%> (+27.08%) :arrow_up:
record/ltsv.go 92.85% <90.00%> (+10.71%) :arrow_up:
record/csv.go 90.00% <94.28%> (+10.45%) :arrow_up:
record/msgpack.go 100.00% <100.00%> (+36.84%) :arrow_up:

Continue to review full report at Codecov.

Legend - Click here to learn more Δ = absolute <relative> (impact), ø = not affected, ? = missing data Powered by Codecov. Last update f6d5b93...d2d8591. Read the comment docs.

t2y commented 4 years ago

Wait to merge this until other reviewers will approve since the patch size is large.

syucream commented 4 years ago

@t2y I added CHANGELOG.md file to record change history. :) (it's inspired by fluentd's changelog file

syucream commented 4 years ago

@abicky @okkez any comment?

okkez commented 4 years ago

Converting 223MB msgpack to parquest same as #43. Columnify consumes memory around 330MB (ps command's RSS).

syucream commented 4 years ago

@okkez It looks like a similar result to mine. It's still not ideal, but anyway better than the latest release. So why don't we apply this change? If you're ok, I would like to publish it as v0.1.0 improved release. For a long term improvement, I guess we need additional improvements with including a part encoding to parquet.

abicky commented 4 years ago

This change will reduce memory consumption a lot!

Before

% gtime -v columnify -schemaType avro -schemaFile docker_log.avsc -recordType jsonl input.jsonl > out.parquet
        Command being timed: "columnify -schemaType avro -schemaFile docker_log.avsc -recordType jsonl input.jsonl"
        User time (seconds): 26.14
        System time (seconds): 1.63
        Percent of CPU this job got: 120%
        Elapsed (wall clock) time (h:mm:ss or m:ss): 0:23.00
        Average shared text size (kbytes): 0
        Average unshared data size (kbytes): 0
        Average stack size (kbytes): 0
        Average total size (kbytes): 0
        Maximum resident set size (kbytes): 1542252
        Average resident set size (kbytes): 0
        Major (requiring I/O) page faults: 1179
        Minor (reclaiming a frame) page faults: 445384
        Voluntary context switches: 419
        Involuntary context switches: 121936
        Swaps: 0
        File system inputs: 0
        File system outputs: 0
        Socket messages sent: 0
        Socket messages received: 0
        Signals delivered: 2505
        Page size (bytes): 4096
        Exit status: 0

After

% gtime -v columnify-latest -schemaType avro -schemaFile docker_log.avsc -recordType jsonl input.jsonl > out.parquet
        Command being timed: "columnify-latest -schemaType avro -schemaFile docker_log.avsc -recordType jsonl input.jsonl"
        User time (seconds): 19.35
        System time (seconds): 0.71
        Percent of CPU this job got: 115%
        Elapsed (wall clock) time (h:mm:ss or m:ss): 0:17.39
        Average shared text size (kbytes): 0
        Average unshared data size (kbytes): 0
        Average stack size (kbytes): 0
        Average total size (kbytes): 0
        Maximum resident set size (kbytes): 281776
        Average resident set size (kbytes): 0
        Major (requiring I/O) page faults: 0
        Minor (reclaiming a frame) page faults: 70743
        Voluntary context switches: 388
        Involuntary context switches: 48274
        Swaps: 0
        File system inputs: 0
        File system outputs: 0
        Socket messages sent: 0
        Socket messages received: 0
        Signals delivered: 2538
        Page size (bytes): 4096
        Exit status: 0

Here is docker_log.avsc:

{
  "type": "record",
  "name": "DockerLog",
  "fields" : [
    {"name": "log_time",       "type": "long"},
    {"name": "container_id",   "type": "string"},
    {"name": "container_name", "type": "string"},
    {"name": "source",         "type": "string"},
    {"name": "log",            "type": "string"}
  ]
}

I created the file input.jsonl using the following script:

# frozen_string_literal: true

require 'json'
require 'logger'

require 'faker'

N = 711150

CONTAINER_IDS = 100.times.map { SecureRandom.hex(32) }
PATHS = %w[/v1/foo /v1/bar /v1/baz]

$stdout.sync = true
logger = Logger.new($stdout)

prng = Random.new(42)
Faker::Config.random = prng

logger.info 'Start sending events'

base_time = Time.now.yield_self do |t|
  Time.new(t.year, t.month, t.day + 1, 12)
end

delta = Rational(3600, 10_000_000)
File.open(ARGV[0], 'w') do |f|
  N.times do |i|
    now = base_time + delta * i
    event = {
      container_id: CONTAINER_IDS.sample(random: prng),
      container_name: '/test-container',
      source: 'stdout',
      log_time: now.to_i * 10**9 + now.nsec
    }
    if prng.rand(1000).zero?
      event[:log] = %Q|#{Faker::Internet.public_ip_v4_address} - - [#{now.strftime('%d/%b/%Y:%T %z')}] "GET /v2#{PATHS.sample(random: prng)} HTTP/1.1" 404 153 "-" "#{Faker::Internet.user_agent}" "-"|
    else
      event[:log] = %Q|#{Faker::Internet.public_ip_v4_address} - - [#{now.strftime('%d/%b/%Y:%T %z')}] "GET #{PATHS.sample(random: prng)} HTTP/1.1" 200 #{prng.rand(1000) + 100} "-" "#{Faker::Internet.user_agent}" "-"|
    end

    f.puts event.to_json
  end
end
logger.info 'Sending events is complete'
syucream commented 4 years ago

applied some feedbacks :)

abicky commented 4 years ago

I made some suggestions, but they were my misunderstanding 💦 I deleted them.

syucream commented 4 years ago

Thanks all! I merge it and release the next version!

syucream commented 4 years ago

🎉 https://github.com/reproio/columnify/releases/tag/v0.1.0