airbytehq / airbyte

The leading data integration platform for ETL / ELT data pipelines from APIs, databases & files to data warehouses, data lakes & data lakehouses. Both self-hosted and Cloud-hosted.
https://airbyte.com
Other
15.45k stars 3.98k forks source link

[EPIC] scale warehouse destination connectors to handle arbitrary number of streams #10260

Open sherifnada opened 2 years ago

sherifnada commented 2 years ago

Tell us about the problem you're trying to solve

Today destination warehouses or blob storages like S3 hold a constant-size, in-memory buffer for each stream it expects to see from the input catalog. For example, this comment in the S3 code illustrates that the more streams we have coming in, the more memory the connector must hold for buffers.

This has two obvious problems. First is that it does not scale. Second, it's unnecessary because we often write files whose max size is no more than 1GB (because it's optimal for consuming warehouses). So, it doesn't really make sense overall that this works the way it does, and we stand to gain a lot of benefit from changing it.

This current behavior also really hurts resource utilization when running a scaling airbyte instance, because you can't ever reliably predict how much memory a worker will need.

Describe the solution you’d like

I want the destination connectors to be able to work with a preset amount of memory e.g: I want to decide that if I have only 1.5GB to give the connector, then it should function correctly with that, maybe with a tradeoff of speed. But at least it shouldn't fail completely. Only hold space for a buffer if it is being used. We can even lazy-allocate space as records come in and release that space when the buffer is flushed, then allocate it again if new records come in for that buffer.

The most important destinations to do this for, in order, are:

Remaining pieces

Following the work on the first destination in Snowflake and S3, an abstraction was made to serialize records and compress them before buffering while monitoring for reasonable memory/file handlers limits.

This mechanism can now be replicated and deployed on all other java destinations by inheriting the same base classes while providing the proper BlobStorageOperations or SqlOperations to be interacted with.

The benefits can be shared for any destination that needs to buffer a batch of records before sending them instead of writing one by one.

Warehouse & blob storage destinations

It is a standard Warehouse best practice to use some staging areas where data is first buffered before being loaded to the final destinations. By nature, blob storage also follows the same requirement since they manipulate files.

Other destinations requiring buffering

The destinations in this section seem to already be using or needing some buffering mechanisms. These may be implemented on a case-by-case basis. Thus, refactoring them to follow the same pattern will reduce the maintenance cost of the connectors and unify the buffering options while getting memory consumptions under control in a common and more predictable manner:

Other destination that may benefit from buffering

The destination connectors below are sending records to the destination as they flow in. So, they may not have memory consumptions issues with large streams, but it might be useful to verify that batching in a buffer before sending a larger collection of records together may be beneficial:

( this may not be a completely exhaustive list, since we (airbyte or community) regularly add more destination connectors, so some destinations may not be listed in here, they may need to be added to the correct section or disregarded because buffering with that particular connector does not represent a benefit/issue) ( Python connectors are not listed in this issue)

Related areas

These are related areas that don't do so well with large batch or an arbitrary number of streams:

┆Issue is synchronized with this Asana task by Unito

tuliren commented 2 years ago
tuliren commented 2 years ago

The snowflake internal staging has stable memory usage for multiple streams.

One stream with tons of large records:

Screen Shot 2022-02-16 at 15 03 33

10 streams with smaller load:

Screen Shot 2022-02-16 at 15 04 11

I still need to figure out why it needs so much memory initially.

tuliren commented 2 years ago

Memory Usage from Byte Arrays

One major contributor to the high memory usage is ByteUtils#getSizeInBytesForUTF8CharSet. This method creates a byte array from a string just to get a precise byte size. Along the way, it creates lots of byte arrays.

In practice, we only need an estimation of the string size. Since UTF-8 character is 1-4 bytes length, we can just use the string length times 4 to approximate the byte size. This will be an upper bound, which is even safer.

Memory usage with getSizeInBytesForUTF8CharSet

Screen Shot 2022-02-16 at 21 29 41 Screen Shot 2022-02-16 at 21 30 19

We can see that:

Memory usage with simple estimation (string length x 4)

Screen Shot 2022-02-16 at 21 41 18 Screen Shot 2022-02-16 at 21 40 58

The differences are:

Conclusion

Removing the getSizeInBytesForUTF8CharSet method calls immediately halves the memory usage.

tuliren commented 2 years ago
tuliren commented 2 years ago

Sample the string size for every N records turn out to not have that big an effect.

The heap size is kept at 500 MB.

Byte size (OOME with 500 MB heap size)

byte size every record oome

Calculating byte size for every record message results in OOME within one minute. This result proves that the frequent byte conversion is the root cause of the large memory footprint.

String size (sample every record)

string size every record

String size (sample every 20 records)

string size every 20 records

String size (sample every 100 records)

string size every 100 records

Performing one sample every 100 records slightly reduces memory usage. There are fewer number of memory usage peaks in the above chart.

tuliren commented 2 years ago

When setting the max heap size to 300 MB, the sampling rate has visible impact on heap usage. As the sampling rate decreases, there are fewer heap peaks, especially when the rate is reduced from sampling every record to every 20 records. The impact is not that significant when the rate changes from every 20 to every 100 records.

String size (every record)

300mb string size every record

String size (every 20 records)

300mb string size every 20 records

String size (every 100 records)

300mb string size every 100 records

Conclusion

Will sample the string size every 20 records.

tuliren commented 2 years ago

300 MB seems like the minimum heap size. When setting a lower xmx, the connector went OOME within one minute:

tuliren commented 2 years ago

The S3 staging mode is not scalable. When syncing 50 streams, each with 1K records, the connector will encounter OOME with 500 MB max heap size.

Something is clearly wrong that the number of live threads just keeps going up. The majority of the heap is occupied by byte arrays. This mode requires more investigation.

Screen Shot 2022-02-23 at 07 20 34 Screen Shot 2022-02-23 at 07 20 41 Screen Shot 2022-02-23 at 07 20 50

It does work with fewer streams (e.g. 3 streams, each with 10K records).

tuliren commented 2 years ago

Summary

ChristopheDuong commented 2 years ago

Following up on @tuliren's investigation:

The S3 staging mode is not scalable. When syncing 50 streams, each with 1K records, the connector will encounter OOME with 500 MB max heap size. Something is clearly wrong that the number of live threads just keeps going up. The majority of the heap is occupied by byte arrays. This mode requires more investigation.

PR airbytehq/airbyte#10866 solves the non-scalable aspects that were discovered so far with the snowflake S3 staging mode.

So this statement is (partially?) not true anymore with the new implementation (destination-snowflake 0.4.21):

300 MB seems like the minimum heap size. When setting a lower xmx, the connector went OOME within one minute:

The new approach implements a compressed serialized buffer strategy with limits of memory per stream, memory globally, and # of simultaneous buffers. Since records are now compressed, estimating their size beforehand is now more complex. So, we count bytes being buffered only after actually writing it instead. Thus, the following issue does not concern us anymore:

One major contributor to the high memory usage is ByteUtils#getSizeInBytesForUTF8CharSet. This method creates a byte array from a string just to get a precise byte size. Along the way, it creates lots of byte arrays. In practice, we only need an estimation of the string size. Since UTF-8 character is 1-4 bytes length, we can just use the string length times 4 to approximate the byte size.

Below numbers are run with the new S3 staging mode, syncing 200 streams, each with 2K records (representing 4.24GB of data in total) running with 200MB max heap size.

Screenshot 2022-03-21 at 10 25 13 Screenshot 2022-03-21 at 10 25 22

S3-staging-200_streams_2000_records_1024mb_gzip_disk_buffer.log

This shows a more stable and reduced consumption of memory, thread (and CPU) resources regardless of the sync size or of the number of streams.

This also shows room for improvement in using more CPU resources to optimize speed performances in the future.

ChristopheDuong commented 2 years ago

PR https://github.com/airbytehq/airbyte/pull/10866 solves the non-scalable aspects that were discovered so far with the S3 staging mode. Below numbers are run with the new S3 staging mode, syncing 200 streams, each with 2K records (representing 4.24GB of data in total) running with 200MB max heap size.

However, these are not the optimal / recommended max heap sizes to run the new staging modes with. (this comment applies to modes using the StreamTransferManager)

When the same connector is run with the same configuration, but with a larger amount of data per stream, if we keep the max heap size of 200MB, then we encounter OOME...

For example, let's study a scenario where N stream with 100K records is representing 750-800MB of uncompressed data per stream. When compressed, it is about 283MB compressed (gzip) of data per stream.

(For comparison, the previous sync with 200 streams was manipulating only 5MB compressed data per stream)

Even though the data is now being buffered and compressed on-disk per stream, when a flush event occurs for a particular stream, the compressed data for that stream is fully loaded into memory (with extra memory overhead as described here).

Note that the connector is capable to buffer multiple streams at once in different file buffers on disk (with some known limitations such as https://github.com/airbytehq/airbyte-internal-issues/issues/496), but it will load at most one compressed file of max 200MB into memory at a time when triggered for uploading to staging (which represents at most 300-400MB in memory usage when using alex.mojaki.s3upload.StreamTransferManager class).

Memory consumption when buffering/processing the first 200K records on 2 stream:

Screenshot 2022-03-21 at 10 10 40

Memory consumption in a "stable" state when reaching 10+GB of data transferred (1000+K records on 10+ streams)

(with these sync settings, 200MB compressed data contains about 70K records, there would be 30K records left which is about 83MB compressed that are remaining in buffers after a flush for a stream, once we reach 10 streams in the buffers, a flush all event is triggered below, so these smaller files are uploaded during this phase, thus requiring less memory to load them for the uploading phase)

Screenshot 2022-03-21 at 11 18 45

The total run time for 15.59GB over 15 streams with 100000 records per stream was 2 hrs 11 min 08 sec: S3-staging-15_streams_100000_records_1024mb_gzip_disk_buffer.log

In conclusion, for the moment, it is better to run with at least somewhere around 600MB max heap size + 1GB disk storage

These recommended limits are tied to the following settings:

These limits are configured here and see comment for more details

In future versions, if the uploading to the stage area implementation would rely on multi-threading multiple StreamTransferManager in parallel, the connector may need further memory accordingly to process such simultaneous upload tasks.

ChristopheDuong commented 2 years ago

Destination snowflake with Internal Staging mode does not have the same memory requirement as the S3 staging mode

Because it is not relying on the alex.mojaki.s3upload.StreamTransferManager class. So it does not actually load the buffered file into memory in order to upload it to staging.

Scenario 1 of 4.24GB over 200 streams and 2K records (5MB compressed) per stream

Screenshot 2022-03-21 at 16 09 52

To compare against S3 staging mode

Scenario 2 of 15.59GB over 15 streams and 100K records (283MB compressed) per stream

Memory consumption when buffering/processing the first 200K records on 2 stream:

Screenshot 2022-03-21 at 15 59 17

To compare against S3 staging mode

Memory consumption in a "stable" state when reaching 10+GB of data transferred (1000+K records on 10+ streams)

Screenshot 2022-03-21 at 17 07 19

To compare against S3 staging mode

The total run time for 15.59GB over 15 streams with 100000 records per stream was 2 hrs 20 min 09 sec

internal-staging-15_streams_100000_records_1024mb_gzip_disk_buffer.log

Recap

Criteria Baseline (/dev/null) Snowflake external S3 Staging Snowflake internal Staging
Time to sync 200 streams of 2000 records each 29 min 54 min 1 hour 27 min
Time to sync 15 streams of 100000 records each 1 hour 55 min 2 hour 11 min 2 hour 20 min
Recommended max heap size 600 MB 200 MB
Recommended disk storage size 1+ GB 1+ GB

(* FYI these timings were collected over a single run, we should probably do multiple runs and average them to have a more accurate view)

olivermeyer commented 2 years ago

We're getting an OOM in the Redshift destination when trying to import a relatively large dataset (~6m rows, so nothing too wild). The logs are

2022-03-31 13:10:58 ERROR i.a.c.i.LineGobbler(voidCall):82 - Exception in thread "main" java.lang.OutOfMemoryError: Java heap space
2022-03-31 13:10:58 ERROR i.a.c.i.LineGobbler(voidCall):82 -    at java.base/java.io.ByteArrayOutputStream.<init>(ByteArrayOutputStream.java:81)
2022-03-31 13:10:58 ERROR i.a.c.i.LineGobbler(voidCall):82 -    at alex.mojaki.s3upload.ConvertibleOutputStream.<init>(ConvertibleOutputStream.java:20)
2022-03-31 13:10:58 ERROR i.a.c.i.LineGobbler(voidCall):82 -    at alex.mojaki.s3upload.MultiPartOutputStream.<init>(MultiPartOutputStream.java:74)
2022-03-31 13:10:58 ERROR i.a.c.i.LineGobbler(voidCall):82 -    at alex.mojaki.s3upload.StreamTransferManager.getMultiPartOutputStreams(StreamTransferManager.java:338)
2022-03-31 13:10:58 ERROR i.a.c.i.LineGobbler(voidCall):82 -    at io.airbyte.integrations.destination.jdbc.copy.s3.S3StreamCopier.prepareStagingFile(S3StreamCopier.java:123)
2022-03-31 13:10:58 ERROR i.a.c.i.LineGobbler(voidCall):82 -    at io.airbyte.integrations.destination.jdbc.copy.CopyConsumerFactory.lambda$recordWriterFunction$0(CopyConsumerFactory.java:90)
2022-03-31 13:10:58 ERROR i.a.c.i.LineGobbler(voidCall):82 -    at io.airbyte.integrations.destination.jdbc.copy.CopyConsumerFactory$$Lambda$178/0x0000000800e10c40.accept(Unknown Source)
2022-03-31 13:10:58 ERROR i.a.c.i.LineGobbler(voidCall):82 -    at io.airbyte.integrations.destination.buffered_stream_consumer.BufferedStreamConsumer.flushQueueToDestination(BufferedStreamConsumer.java:166)
2022-03-31 13:10:58 ERROR i.a.c.i.LineGobbler(voidCall):82 -    at io.airbyte.integrations.destination.buffered_stream_consumer.BufferedStreamConsumer.acceptTracked(BufferedStreamConsumer.java:148)
2022-03-31 13:10:58 ERROR i.a.c.i.LineGobbler(voidCall):82 -    at io.airbyte.integrations.base.FailureTrackingAirbyteMessageConsumer.accept(FailureTrackingAirbyteMessageConsumer.java:46)
2022-03-31 13:10:58 ERROR i.a.c.i.LineGobbler(voidCall):82 -    at io.airbyte.integrations.base.IntegrationRunner.consumeWriteStream(IntegrationRunner.java:147)
2022-03-31 13:10:58 ERROR i.a.c.i.LineGobbler(voidCall):82 -    at io.airbyte.integrations.base.IntegrationRunner.run(IntegrationRunner.java:128)
2022-03-31 13:10:58 ERROR i.a.c.i.LineGobbler(voidCall):82 -    at io.airbyte.integrations.destination.redshift.RedshiftDestination.main(RedshiftDestination.java:78)

I also see the memory usage by that pod hit the node's capacity just before it dies. Seems like it's related to this general issue, specifically this one. Any ETA for the Redshift destination fix? And any possible workarounds in the meantime?

ChristopheDuong commented 2 years ago

a relatively large dataset (~6m rows, so nothing too wild). I also see the memory usage by that pod hit the node's capacity just before it dies. Seems like it's related to this general issue,

How are the 6M rows split on how many tables? If it's coming from a large number of tables. then yes it might be related to what is being targeted by this epic issue and the one you linked.

A workaround would be to split your connection and select only a few tables at once because the connector is allocating a large buffer for each stream all at once.

Having a large number of rows on one single table should not run into OOM though (unless due to another bug)

How much memory do you allocate to the worker node running the connector?

For ETA, I don't know. maybe @grishick can answer that part. (but I believe we might address those issues for the warehouse destinations in the near future anyway)

olivermeyer commented 2 years ago

How are the 6M rows split on how many tables?

It's actually just one table, so perhaps it's another bug. Perhaps I should open a new issue?

ChristopheDuong commented 2 years ago

How are the 6M rows split on how many tables?

It's actually just one table, so perhaps it's another bug. Perhaps I should open a new issue?

Yes, please!

sherifnada commented 2 years ago

It could be a wide row issue potentially