apache / arrow-adbc

Database connectivity API standard and libraries for Apache Arrow
https://arrow.apache.org/adbc/
Apache License 2.0
356 stars 86 forks source link

go/adbc/driver/flightsql: Default Value (10 MB) For adbc.snowflake.rpc.ingest_target_file_size Not Used In 1.1.0 #1997

Closed Zan-L closed 1 month ago

Zan-L commented 1 month ago

What happened?

Jobs calling adbc_ingestion() failed due to memory error. Upon checking, the data were split into {number of processor} parquet files, instead of those of ~10MB like 1.0.0.

Stack Trace

adbc_driver_manager.InternalError: INTERNAL: unknown error type: cannot allocate memory cursor.adbc_ingest(table, data, mode) File "/usr/local/lib/python3.12/site-packages/adbc_driver_manager/dbapi.py", line 937, in adbc_ingest return _blocking_call(self._stmt.execute_update, (), {}, self._stmt.cancel) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "adbc_driver_manager/_lib.pyx", line 1569, in adbc_driver_manager._lib._blocking_call_impl File "adbc_driver_manager/_lib.pyx", line 1562, in adbc_driver_manager._lib._blocking_call_impl File "adbc_driver_manager/_lib.pyx", line 1295, in adbc_driver_manager._lib.AdbcStatement.execute_update File "adbc_driver_manager/_lib.pyx", line 260, in adbc_driver_manager._lib.check_error

How can we reproduce the bug?

Unfortunately, I cannot share the data. However, it should be observed that in a four core VM, a dataset of moderate size (like 500 MB in parquet file size) will be split into four ~125MB files when adbc_ingest() is called to upload to Snowflake instead of fifty ~10MB files.

Environment/Setup

Packages: adbc-driver-manager==1.1.0 adbc-driver-snowflake==1.1.0

Operating system: Windows/Linux

Package manager: pip

lidavidm commented 1 month ago

@joellubi or @zeroshade, got any idea here?

joellubi commented 1 month ago

Just looking at the code changes between releases, it's not clear what would have caused this. I need to get a new snowflake environment set up and then I'll try to reproduce this.

@Zan-L Can you share the parameters that were used in the python code calling the driver? For example any options set on the adbc database, connection, or statement objects?

zeroshade commented 1 month ago

With the way the code is currently written there's the following scenario I can think of:

  1. The default writer concurrency is runtime.NumCPU(), that could explain the number of processor files being produced.
  2. The writer limit for the size is defined in the bulk_ingest.go by writing an entire record batch and then checking the size of the data after writing it. The only check on the writer side is for a max number of rows which isn't set by default. This means that if the stream of record batches is producing large enough records, the whole record batch will be written first to the file before we do the check for the file size. @Zan-L can you confirm if your record batch reader is producing significantly large batches? Would it be possible for the provided record batch reader to slice the batches before passing them on, making them smaller and allowing the files to be more easily limited?
Zan-L commented 1 month ago

@zeroshade I can confirm something a bit different than what you asked in 2. but more useful - the same data would be split correctly into parquet files of ~10 MB in ADBC 1.0.0, but only 4 in 1.1.0 in a VM of 4 cores. That should rule out the cause from the data side. To answer your specific question, the data object as you can see from the code below is usually a result of DeltaTable(path).to_pyarrow_dataset(). We have 180 tables, some of which could contain large batches, but definitely not all of them. And the behavior of splitting into only four parquet files in a 4-core VM is universal.

@joellubi I actually wanted to bring that up but held back to avoid complexity. First of all, no - I did not provide ad-hoc parameters. However, I did try to do so in an attempt to fix the bug myself but failed. Here are the two ways I tried:

1.

def _arrow_to_snowflake(table: str, conn_uri: str, data: ds.Dataset | pa.RecordBatchReader, mode: Literal['append', 'create', 'replace', 'create_append']):
    with adbc_driver_snowflake.dbapi.connect(
        conn_uri, 
        autocommit=True, 
        db_kwargs={'adbc.snowflake.rpc.ingest_target_file_size': str(2**14)}
    ) as conn, conn.cursor() as cursor:
        cursor.adbc_ingest(table, data, mode)

In this setting, data are still split into 4 parquets but a small number of tables made it to Snowflake while the rest still failed due to OOM. So either this way of setting the parameter never works (it is set but not picked up by the code) or it interfered in certain ways (so a small number of jobs could succeed) but did not stop the final data from being split into the number of processors.

2.

def _arrow_to_snowflake(table: str, conn_uri: str, data: ds.Dataset | pa.RecordBatchReader, mode: Literal['append', 'create', 'replace', 'create_append']):
    with adbc_driver_snowflake.dbapi.connect(conn_uri, autocommit=True) as conn, conn.cursor() as cursor:
        cursor.adbc_statement.set_options(**{'adbc.snowflake.rpc.ingest_target_file_size': str(2**14)})
        cursor.adbc_ingest(table, data, mode)

It is actually the recommended way to tune using the AdbcStatement but it raises a NotImplemented error at set_options(), and thus contradicting the doc.

zeroshade commented 1 month ago

@zeroshade I can confirm something a bit different than what you asked in 2. but more useful - the same data would be split correctly into parquet files of ~10 MB in ADBC 1.0.0, but only 4 in 1.1.0 in a VM of 4 cores. That should rule out the cause from the data side.

Not necessarily. The big change we made between v1.0.0 and v1.1.0 was the switch from calling Write(rec) to WriteBuffered(rec) when writing record batches to the Parquet files (to work around a bug on Snowflake's side when handling empty row groups in a parquet file). Depending on the size of the row groups we're talking about, the issue could be stat related or otherwise. Though, it would be interesting if we could see where the memory is being used.

It is actually the recommended way to tune using the AdbcStatement but it raises a NotImplemented error at set_options(), and thus contradicting the doc.

This is because it appears that the doc is incorrect (@joellubi we should fix this!) the actual option in the code is adbc.snowflake.statement.ingest_target_file_size. Can you try that instead and see if that helps? You could also try using adbc.snowflake.statement.ingest_writer_concurrency to change the number of writers (which defaults to the number of CPUs available).

adbc_driver_manager.InternalError: INTERNAL: unknown error type: cannot allocate memory

Have you checked what the actual memory usage of the container is when this happens? It might be https://github.com/apache/arrow-adbc/issues/1283 showing up again in a different context if we're using calloc somewhere that doesn't do a fallback check to malloc + memset.

Zan-L commented 1 month ago

@zeroshade Thanks for the insights. Here is my feedback:

  1. adbc.snowflake.statement.ingest_target_file_size and adbc.snowflake.statement.ingest_writer_concurrency finally worked with cursor.adbc_statement.set_options() (so there is indeed a mistake in the documentation). However, the data is still split into number of concurrent writers despite adbc.snowflake.statement.ingest_target_file_size.
  2. I now tend to believe it is caused by the switch to WriteBuffered(rec). My data is written into PyArrow Dataset with Delta Lake PyArrow engine, which uses the default row group size of 1M rows. If this function causes to write at least one row group of 1M rows, for a dataset of millions of rows with lots of columns and especially string ones, it leads to all data being read into the memory uncompressed.
  3. I don't have a way to monitor the memory usage of the containers yet as they are ephemeral ECS tasks, though I did try to increase the memory size from 16 GB to 32, which didn't make things work. Data of jobs that failed are in the scale of hundreds of megabytes though, if that helps.
zeroshade commented 1 month ago

If the scale of the data that is failing is only in the hundreds of megabytes, this seems more related to the issues we've had in the past with calloc, though it's still weird that using the buffered writing is resulting in this issue with the file sizes. Since we shouldn't be getting a failure to allocate memory on the scale of hundreds of megabytes if you've got 16GB - 32GB of memory on the task available. (Unless that "hundreds of megabytes" is referring to the compressed size?)

Out of curiosity, would it be possible for you to have it somehow log the sizes of the record batches it is producing in terms of number of cols, number of rows, or otherwise estimating the memory size of the batches? Since you are unable to share the data itself, anything that could help us try to reproduce this would be beneficial.

adbc.snowflake.statement.ingest_target_file_size and adbc.snowflake.statement.ingest_writer_concurrency finally worked with cursor.adbc_statement.set_options() (so there is indeed a mistake in the documentation). However, the data is still split into number of concurrent writers despite adbc.snowflake.statement.ingest_target_file_size.

Do you get more files with adbc.snowflake.statement.ingest_writer_concurrency ?

Zan-L commented 1 month ago

"hundreds of megabytes" refers to parquet file size which is compressed.

'adbc.snowflake.statement.ingest_writer_concurrency' solely determines the number of splited files.

I'll try to log the row group. I don't know how yet cuz it's too low level for me but will take a look.

Zan-L commented 1 month ago

I managed to log the row group size info for one of our datasets that failed with 1.1.0. See below:

Row Group 0:
  Number of rows: 65536
  Total byte size: 3967521 bytes

Row Group 1:
  Number of rows: 65536
  Total byte size: 3912539 bytes

Row Group 2:
  Number of rows: 65536
  Total byte size: 4123387 bytes

Row Group 3:
  Number of rows: 65536
  Total byte size: 4160283 bytes

Row Group 4:
  Number of rows: 65536
  Total byte size: 4192396 bytes

Row Group 5:
  Number of rows: 65536
  Total byte size: 4207327 bytes

Row Group 6:
  Number of rows: 65536
  Total byte size: 4270003 bytes

Row Group 7:
  Number of rows: 65536
  Total byte size: 4331440 bytes

Row Group 8:
  Number of rows: 65536
  Total byte size: 4478716 bytes

Row Group 9:
  Number of rows: 65536
  Total byte size: 4504929 bytes

Row Group 10:
  Number of rows: 65536
  Total byte size: 4290176 bytes

Row Group 11:
  Number of rows: 65536
  Total byte size: 4112182 bytes

Row Group 12:
  Number of rows: 65536
  Total byte size: 4120589 bytes

Row Group 13:
  Number of rows: 65536
  Total byte size: 3832213 bytes

Row Group 14:
  Number of rows: 65536
  Total byte size: 4129690 bytes

Row Group 15:
  Number of rows: 65536
  Total byte size: 4139314 bytes

Row Group 16:
  Number of rows: 65536
  Total byte size: 4184890 bytes

Row Group 17:
  Number of rows: 65536
  Total byte size: 4205800 bytes

Row Group 18:
  Number of rows: 65536
  Total byte size: 4222022 bytes

Row Group 19:
  Number of rows: 65536
  Total byte size: 4332252 bytes

Row Group 20:
  Number of rows: 65536
  Total byte size: 4439200 bytes

Row Group 21:
  Number of rows: 65536
  Total byte size: 4502127 bytes

Row Group 22:
  Number of rows: 65536
  Total byte size: 4426164 bytes

Row Group 23:
  Number of rows: 65536
  Total byte size: 4138475 bytes

Row Group 24:
  Number of rows: 65536
  Total byte size: 4271269 bytes

Row Group 25:
  Number of rows: 65536
  Total byte size: 4316339 bytes

Row Group 26:
  Number of rows: 65536
  Total byte size: 4432660 bytes

Row Group 27:
  Number of rows: 65536
  Total byte size: 4007802 bytes

Row Group 28:
  Number of rows: 65536
  Total byte size: 4198239 bytes

Row Group 29:
  Number of rows: 65536
  Total byte size: 3885470 bytes

Row Group 30:
  Number of rows: 65536
  Total byte size: 4085597 bytes

Row Group 31:
  Number of rows: 65536
  Total byte size: 4125362 bytes

Row Group 32:
  Number of rows: 65536
  Total byte size: 4159128 bytes

Row Group 33:
  Number of rows: 65536
  Total byte size: 4154620 bytes

Row Group 34:
  Number of rows: 65536
  Total byte size: 4273763 bytes

Row Group 35:
  Number of rows: 65536
  Total byte size: 4339534 bytes

Row Group 36:
  Number of rows: 65536
  Total byte size: 4481726 bytes

Row Group 37:
  Number of rows: 46145
  Total byte size: 3236035 bytes
Zan-L commented 1 month ago

@zeroshade @joellubi I created a script to generate dummy data for reproducing the issue:

import polars as pl
import pyarrow.dataset as ds
import adbc_driver_snowflake.dbapi

num_rows = 10_000_000
parquet_path = 
conn_uri = 

lf = pl.LazyFrame({'id': range(num_rows)})
lf = lf.with_columns(pl.lit('This is just a dummy test string.').alias(f"dummy_string_{i}") for i in range(30))
lf.sink_parquet(parquet_path)
data = ds.dataset(parquet_path)

# Splits into 83 files in 1.0 but only 4 in 1.1
with adbc_driver_snowflake.dbapi.connect(conn_uri, autocommit=True) as conn, conn.cursor() as cursor:
    cursor.adbc_statement.set_options(**{'adbc.snowflake.statement.ingest_target_file_size': str(2**14), 'adbc.snowflake.statement.ingest_writer_concurrency': '4'})
    cursor.adbc_ingest('Test', data, 'replace')

With ADBC 1.0.0, the data were split into 83 parquet files: ADBC_1 0

But only 4 with ADBC 1.1.0: ADBC_1 1

zeroshade commented 1 month ago

despite the weird change in splitting, it looks like it's not erroring with out of memory anymore?

Zan-L commented 1 month ago

I tuned the number of string columns down to 30 to show you the write results. If you make it larger (say 100, depending on your RAM) you'll see OOM.

joellubi commented 1 month ago

I've been able to reproduce the issue, both via the script provided (thanks @Zan-L) and in a self-contained unit test as well. It does appear that switching to the BufferedWriter is in fact the culprit. The current approach counts the bytes written to the output buffer when deciding whether the writer should be closed and the file sent across the network. In the case of the buffered writer, no bytes are written to the output buffer at all until the writer's internal buffer is flushed. For this reason the initial set of concurrent writers will just keep writing until they hit EOF on the incoming record stream.

A better approach is to compare targetSize to pqWriter.RowGroupTotalBytesWritten() instead of the size of the output buffer. This unfortunately has some issues as well. It will stop the writer, but potentially after it's written much more than the targetSize. The reason for this is that the row group bytes only get updated after the constituent column writers flush their pages. The default data page size is 1MB, so with 100 columns this wouldn't occur until after 100MB were written to the file. I have successfully gotten this to flush closer to the targetSize by setting the max page size to a much lower value, so either exposing this setting to the end-user or coming up with a reasonable heuristic for page size seems like it could be a possible solution.

Another option that is perhaps simpler and will help further reduce resource usage would be to go back to the unbuffered writer and manually account for records with zero rows (the original issue that led is to use the buffered writer). I'm working this out and should have a PR with one of the approaches soon.

joellubi commented 1 month ago

@zeroshade I opened GH-43326 in Arrow to fix the byte counting when writing a parquet file. This will allow the PR that fixes this issue to drop the limitWriter we're using and make things a bit simpler.

joellubi commented 1 month ago

I've opened #2026 for review which fixes this

joellubi commented 1 month ago

By using the unbuffered writer it should resolve the output file size issue and also decrease the overall memory utilization because we skip allocating an extra buffer for the writer.

They may be some additional improvements to gain by tuning the data page size, especially for very wide tables. However I'd prefer to follow up with that change separately as this fix resolves the current issue and the tradeoffs of smaller data pages may not always be favorable. The current changes will allow users to limit memory usage by using smaller batches for the input stream, in case the memory available is still to low or the table too wide.