apache / beam

Apache Beam is a unified programming model for Batch and Streaming data processing.
https://beam.apache.org/
Apache License 2.0
7.8k stars 4.22k forks source link

[Bug]: Difficult to write Go SDK pipelines that stay within memory constraints #21817

Open gonzojive opened 2 years ago

gonzojive commented 2 years ago

What happened?

I am not a beam developer, just a user.

I wrote a Beam pipeline to download zip files over HTTP and write them to disk in a record-oriented format (Riegeli).

Would it be possible to do something crude like return "Resource exhausted" codes from the Go harness after a certain memory threshold is reached? Would the Flink runner be smart enough to reduce the request rate and memory pressure?

memory graph part 1

memory graph part 2

memory profile

Launch commands:

Beam pipeline:

bazel run //:download_stuff -- --runner flink --endpoint localhost:8099 "--output_riegeli" "/tmp/map_output@20.riegeli" "--environment_type" "LOOPBACK" --alsologtostderr --beam_strict 

I can create a repo with a reproduction if the authors think that would be useful.

Flink

bazel run :job -- "--flink-conf-dir" /home/red/flink/

Where :job is defined by

java_binary(
    name = "job",
    main_class = "org.apache.beam.runners.flink.FlinkJobServerDriver",
    runtime_deps = [
        # Obtained by looking at results of
        # bazel query @maven//...
        "@maven//:org_apache_beam_beam_runners_flink_1_14",
        "@maven//:org_slf4j_slf4j_api",
        "@maven//:org_slf4j_slf4j_simple",
        "@maven//:org_apache_flink_flink_runtime_web_2_12",
    ],
)

See my flink-conf.yaml gist for how Flink is configured.

Issue Priority

Priority: 2

Issue Component

Component: sdk-go

lostluck commented 2 years ago

Thank you for filing the issue!

From your configuration, you've got 6 threads in parallel per worker

The short term fix is to process fewer bundles simultaneously, so reducing that number. The SDK is largely expecting the Runner to handle how to schedule work and similar, so it doesn't have any ability to deny the runner's request for processing, other than failing the bundle.

At present the SDK isn't aware at all about how much memory the system is using, as it's unclear how the runner, or the system can handle that.

After all, unless the downloaded files are being streamed straight to the output files in the same DoFn, they will have to be in memory for some time.


Is everything being executed on a single machine rather than a cluster? What does the pipeline look like? Separated into multiple DoFns? Any Aggregations?

How big are each of these files? I'll note that short of streaming a download directly to a file output, there's going to be buffering at least to the size of the file in question.


I will note that the segment of the heap graph you've provided shows none of the places where allocations are occurring.


That said, here's some areas to look into depending on the pipeline. TBH as described, neither of these are likely to help.

As implemented, the SDK will buffer some number of elements per bundle being processed. See datamgr.go after that, that additional elements will not be accepted from the Runner until something has processed through. This happens using standard channel blocking.

The other place where memory might "back up" is the Combiner Lifting Cache this currently use a map, and a fixed cap on eviction size. We would love to make that more memory aware, so that more or less memory pressure will evict elements and allow things to GC. A good mechanism for this hasn't been determined, as in general, there's value in keeping the cache as full as possible so that elements are combined before the shuffle.

gonzojive commented 2 years ago

Thank you for filing the issue!

From your configuration, you've got 6 threads in parallel per worker

The short term fix is to process fewer bundles simultaneously, so reducing that number. The SDK is largely expecting the Runner to handle how to schedule work and similar, so it doesn't have any ability to deny the runner's request for processing, other than failing the bundle.

Reducing parallelism (using the --parallelism flag) worked. Thanks!

The unfortunate side effect is processing slows down a lot since the earlier stages (which have no memory issues) don't get parallelized.

At present the SDK isn't aware at all about how much memory the system is using, as it's unclear how the runner, or the system can handle that.

After all, unless the downloaded files are being streamed straight to the output files in the same DoFn, they will have to be in memory for some time.

Is everything being executed on a single machine rather than a cluster? What does the pipeline look like? Separated into multiple DoFns? Any Aggregations?

Single machine.

Basically, the pipeline downloads ~1000 files, each of which has key/value pairs with almost identical sets of keys. Values are accumulated for each key for all the files to obtain what would be a map[key][]value in Go. One record is output to a record-oriented file (Riegeli) for each key that contains the key and all of its values. There are ~10,000 keys (each 10 bytes) and 1000*50 values per key, each value is maybe 10-50 bytes.

See below for more information and a failed attempt to reproduce.

How big are each of these files? I'll note that short of streaming a download directly to a file output, there's going to be buffering at least to the size of the file in question.

I will note that the segment of the heap graph you've provided shows none of the places where allocations are occurring.

I tried to reproduce at https://github.com/gonzojive/beam-go-bazel-example but didn't experience exactly the same issues.

Here is a table view of a similar memory problem. The memory explosion happens in the very last stage of the pipeline when trying to write to a sharded output file (Riegeli format):

Flat Flat% Sum% Cum Cum% Name Inlined?
0 0.00% 0.00% 32475.50MB 97.58% google.golang.org/protobuf/proto.UnmarshalOptions.unmarshal  
0 0.00% 0.00% 32474.97MB 97.57% google.golang.org/protobuf/internal/impl.(*MessageInfo).unmarshalPointer  
0 0.00% 0.00% 32474.97MB 97.57% google.golang.org/protobuf/internal/impl.(*MessageInfo).unmarshal  
0 0.00% 0.00% 32469.47MB 97.56% google.golang.org/protobuf/internal/impl.consumeMessageSliceInfo  
0 0.00% 0.00% 30612.01MB 91.98% github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/harness.Main.func4  
0 0.00% 0.00% 30612.01MB 91.98% github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/harness.(*control).handleInstruction  
0 0.00% 0.00% 30606.51MB 91.96% github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec.callNoPanic  
0 0.00% 0.00% 30606.51MB 91.96% github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec.(*Plan).Execute  
0 0.00% 0.00% 30605MB 91.96% github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec.(*DataSource).Process  
1.64MB 0.00% 0.00% 28125.61MB 84.51% github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec.(*DataSource).makeReStream  
0 0.00% 0.00% 28123.98MB 84.50% github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec.readStreamToBuffer  
0.50MB 0.00% 0.01% 28123.98MB 84.50% github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec.(*customDecoder).Decode  
0 0.00% 0.01% 28123.48MB 84.50% github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec.(*customDecoder).DecodeTo  
28107.60MB 84.45% 84.46% 28107.60MB 84.45% reflect.New  
0 0.00% 84.46% 27749.84MB 83.38% github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec/optimized.(*decoderTTypex_TE).Call2x2  
0 0.00% 84.46% 27749.84MB 83.38% github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec.(*decoder2x2).Decode  
0 0.00% 84.46% 27749.84MB 83.38% github.com/apache/beam/sdks/v2/go/pkg/beam.protoDec  
0 0.00% 84.46% 27749.34MB 83.38% google.golang.org/protobuf/proto.UnmarshalOptions.Unmarshal  
0 0.00% 84.46% 19103.76MB 57.40% google.golang.org/protobuf/internal/impl.consumeMessageInfo  
0 0.00% 84.46% 2660.91MB 7.99% google.golang.org/protobuf/proto.UnmarshalOptions.UnmarshalState  
0 0.00% 84.46% 2660.91MB 7.99% google.golang.org/grpc/encoding/proto.codec.Unmarshal

makeReStream looks like it is eating up a lot of memory. I think this is the function preparing data for the following DoFn:

type writeFileFn[T proto.Message] struct {
    Filename   string `json:"filename"`
    ShardCount int    `json:"shardCount"`
}

func (w *writeFileFn[T]) ProcessElement(ctx context.Context, shard int, protos func(*T) bool) error {
    fs, err := filesystem.New(ctx, w.Filename)
    if err != nil {
        return err
    }
    defer fs.Close()

    shardName := fmt.Sprintf("%05d-of-%05d", shard+1, w.ShardCount)

    fd, err := fs.OpenWrite(ctx, w.Filename+"-"+shardName)
    if err != nil {
        return err
    }
    buf := bufio.NewWriterSize(fd, 1000*1000*5) // use 5MB buffer
    recordWriter := riegeli.NewWriter(buf, nil)

    var elem T
    for protos(&elem) {
        if err := recordWriter.PutProto(elem); err != nil {
            return fmt.Errorf("error writing proto to riegeli file: %w", err)
        }
    }

    if err := recordWriter.Flush(); err != nil {
        return fmt.Errorf("error flushing bytes to riegeli file: %w", err)
    }

    if err := buf.Flush(); err != nil {
        return fmt.Errorf("error flushing bytes to riegeli file: %w", err)
    }
    if err := fd.Close(); err != nil {
        return fmt.Errorf("error closing riegeli file: %w", err)
    }
    return nil
}

I'm not sure how the streams work for the protos func(*T) bool iterator (though I'll dig into your comments more at some point to find out). I'm guessing either the runner or the harness is loading too many elements into the iterator stream.

That said, here's some areas to look into depending on the pipeline. TBH as described, neither of these are likely to help.

As implemented, the SDK will buffer some number of elements per bundle being processed. See datamgr.go after that, that additional elements will not be accepted from the Runner until something has processed through. This happens using standard channel blocking.

The other place where memory might "back up" is the Combiner Lifting Cache this currently use a map, and a fixed cap on eviction size. We would love to make that more memory aware, so that more or less memory pressure will evict elements and allow things to GC. A good mechanism for this hasn't been determined, as in general, there's value in keeping the cache as full as possible so that elements are combined before the shuffle.

Very cool. Do you have any sort of debugging visualizer for the harness? It'd be interesting to see what bundles are active, how many elements there are in each, what is known about the size of the elements.

Perhaps a structured log could be output and replayed using a visualizer.

gonzojive commented 2 years ago

Taking a look at datasource.go:

func (n *DataSource) Process(ctx context.Context) error {
                //...
        var valReStreams []ReStream
        for _, cv := range cvs {
            values, err := n.makeReStream(ctx, pe, cv, &dataReaderCounted)
            if err != nil {
                return err
            }
            valReStreams = append(valReStreams, values)
        }

        if err := n.Out.ProcessElement(ctx, pe, valReStreams...); err != nil {
            return err
        }
}

Does values, err := n.makeReStream(ctx, pe, cv, &dataReaderCounted) load all of the values in the iterator? It seems to:

func (n *DataSource) makeReStream(ctx context.Context, key *FullValue, cv ElementDecoder, bcr *byteCountReader) (ReStream, error) {
    // TODO(lostluck) 2020/02/22: Do we include the chunk size, or just the element sizes?
    size, err := coder.DecodeInt32(bcr.reader)
    if err != nil {
        return nil, errors.Wrap(err, "stream size decoding failed")
    }

    switch {
    case size >= 0:
        // Single chunk streams are fully read in and buffered in memory.
        buf := make([]FullValue, 0, size)
        buf, err = readStreamToBuffer(cv, bcr, int64(size), buf)
        if err != nil {
            return nil, err
        }
        return &FixedReStream{Buf: buf}, nil
// ...
}}

Could FixedReStream be replaced with an ReStream that streams directy from the bcr *byteCountReader?

lostluck commented 2 years ago

While that will probably help in your specific case, that's working as intended. It could have adverse effects on user jobs without benchmarking such a change at scale. The SDK is supposed to cache the iterable part of a GBK to improve throughput in the more common cases.

Beam does have a facility to have larger iterables paged a smaller chunk at a time in over the state API, but it needs to be supported by the runner in question. I know Dataflow supports it. I don't know about the open source runners. The paging in doesn't break the semantics, but does reduce the memory peak when applicable.

That's handled here: https://sourcegraph.com/github.com/apache/beam@de5c56a5b8a8a030e7e67323a696d52495e37f7f/-/blob/sdks/go/pkg/beam/core/runtime/exec/datasource.go?L219

I feel that the single machine / small scale case isn't one that Beam has put serious effort into, in both testing and production scenarios. Open source gets contributed to from the needs of it's users, so it's possible that need hasn't existed so far for the density of processing you're dealing with.

Personally, I'm hacking together a replacement for the Go Direct runner, but it has more test oriented goals rather than performance for the time being.

gonzojive commented 2 years ago

To support user override of the default buffering behavior, I updated the exec package and am able to spill to disk when runtime.ReadMemStats starts to exceed some threshold. Would something like what's in the pull request be acceptable? Hopefully it doesn't introduce too much complexity.

Justification for this approach: This change allows the harness to handle the disk spilling of a large iterator. It's probably better if the runner handles this sort of spilling, but practically I found this faster to implement because modifying the Flink runner is onerous. (It requires learning Flink concepts, gradle, and submitting a robust change to upstream that I don't have time to put together.) Having a hook into the harness code that constructs a ReStream is much faster to implement for me and should work with all runners. It also happens to achieve the same end result with for a single-machine use case.

lostluck commented 2 years ago

While it's not spilling to disk, I do have an approach that will decode elements on demand from a GBK stream, which has demonstrated heap reductions in the tried cases so far.

The main catch is that it only works for GBKs value iterators that are read once. This covers most GBK usage, and covers Reshuffle, and lifted combine usage. It cannot cover general CoGBK cases due to the current re-iterator requirement, and it cannot cover Post-GBK PCollections that are read by more than one DoFn. It would also not cover GBK re-iteration, but the Go SDK currently doesn't support that mode for GBKs, so that's a non issue.

That specific work will be tracked in #22900.