pola-rs / polars

Dataframes powered by a multithreaded, vectorized query engine, written in Rust
https://docs.pola.rs
Other
26.9k stars 1.65k forks source link

parquet file created by .sink_parquet got Invalid thrift: protocol error #16262

Open dui1234 opened 2 weeks ago

dui1234 commented 2 weeks ago

Checks

Reproducible example

valid_trxn = (pl.scan_parquet(data_path)
              .filter(pl.col("block_number").is_between(2912406,9354105))
              .filter((pl.col("success") == True) & (pl.col("to_address").is_not_null()))
              .sink_parquet(f"valid_trxn_160wk.parquet", compression='lz4')
             )

regular_sender = (pl.scan_parquet(data_path)
                  .filter(pl.col("block_number").is_between(2912406,9354105))
                  .filter((pl.col("success") == True) & (pl.col("to_address").is_not_null()))
                  .groupby(pl.col("from_address"))
                  .agg(pl.count())
                  .filter(pl.col('count') > 1)
                  .sink_parquet(f"reg_sender_160wk.parquet", compression='lz4')
       )

regular_receiver = (pl.scan_parquet(data_path)
                  .filter(pl.col("block_number").is_between(2912406,9354105))
                  .filter((pl.col("success") == True) & (pl.col("to_address").is_not_null()))
                  .groupby(pl.col("to_address"))
                  .agg(pl.count())
                  .filter(pl.col('count') > 1)
                  .sink_parquet(f"reg_receiver_160wk.parquet", compression='lz4')
       )

considered_trxn = (pl.scan_parquet("valid_trxn_160wk.parquet")
              .join(pl.scan_parquet("reg_sender_160wk.parquet"), left_on="from_address", right_on="from_address", how="inner")
              .join(pl.scan_parquet("reg_receiver_160wk.parquet"), left_on="to_address", right_on="to_address", how="inner")
              .sink_parquet(f"considered_trxn_160wk.parquet", compression='lz4')
)

Log output

No response

Issue description

I am trying to get a join parquet file where the column "from_address" and "to_address" of "valid_trxn_160wk.parquet" match with the respective column in ""reg_sender_160wk.parquet" and ""reg_sender_160wk.parquet", repectively. However, when I try to access the result file, "considered_trxn_160wk.parquet", it shows me "Invalid thrift: protocol error", I tried to check whether all three initial files are corrupted by trying to read it and it works. Also, accessing the three files and make query with

considered_trxn = (pl.scan_parquet("valid_trxn_160wk.parquet")
              .join(pl.scan_parquet("reg_sender_160wk.parquet"), left_on="from_address", right_on="from_address", how="inner")
              .join(pl.scan_parquet("reg_receiver_160wk.parquet"), left_on="to_address", right_on="to_address", how="inner")
              .collect(streaming=True)
)

works perfectly find. Except the memory usage is huge and almost consume all of my memory capacity. That's why I sink it. So, I am not sure what happended here.

Error


ComputeError Traceback (most recent call last) Cell In[4], line 1 ----> 1 considered_trxn_r = (pl.scan_parquet("considered_trxn_160wk.parquet") 2 .with_columns(('0x'+ pl.col("transaction_hash").bin.encode("hex")).alias("encoded_th")) 3 .with_columns(('0x'+ pl.col("from_address").bin.encode("hex")).alias("from")) 4 .with_columns(('0x'+ pl.col("to_address").bin.encode("hex")).alias("to")) 5 .select(pl.col("block_number","encoded_th", "value_f64", "gas_used", "from", "to", 6 "n_input_bytes", "n_input_zero_bytes", "n_input_nonzero_bytes")) 7 .collect(streaming=True) 8 )

File ~/jax_env/lib/python3.10/site-packages/polars/_utils/deprecation.py:134, in deprecate_renamed_parameter..decorate..wrapper(*args, kwargs) 129 @wraps(function) 130 def wrapper(*args: P.args, *kwargs: P.kwargs) -> T: 131 _rename_keyword_argument( 132 old_name, new_name, kwargs, function.name, version 133 ) --> 134 return function(args, kwargs)

File ~/jax_env/lib/python3.10/site-packages/polars/_utils/deprecation.py:134, in deprecate_renamed_parameter..decorate..wrapper(*args, kwargs) 129 @wraps(function) 130 def wrapper(*args: P.args, *kwargs: P.kwargs) -> T: 131 _rename_keyword_argument( 132 old_name, new_name, kwargs, function.name, version 133 ) --> 134 return function(args, kwargs)

File ~/jax_env/lib/python3.10/site-packages/polars/io/parquet/functions.py:394, in scan_parquet(source, n_rows, row_index_name, row_index_offset, parallel, use_statistics, hive_partitioning, hive_schema, rechunk, low_memory, cache, storage_options, retries) 391 else: 392 source = [normalize_filepath(source) for source in source] --> 394 return _scan_parquet_impl( 395 source, 396 n_rows=n_rows, 397 cache=cache, 398 parallel=parallel, 399 rechunk=rechunk, 400 row_index_name=row_index_name, 401 row_index_offset=row_index_offset, 402 storage_options=storage_options, 403 low_memory=low_memory, 404 use_statistics=use_statistics, 405 hive_partitioning=hive_partitioning, 406 hive_schema=hive_schema, 407 retries=retries, 408 )

File ~/jax_env/lib/python3.10/site-packages/polars/io/parquet/functions.py:454, in _scan_parquet_impl(source, n_rows, cache, parallel, rechunk, row_index_name, row_index_offset, storage_options, low_memory, use_statistics, hive_partitioning, hive_schema, retries) 450 else: 451 # Handle empty dict input 452 storage_options = None --> 454 pylf = PyLazyFrame.new_from_parquet( 455 source, 456 sources, 457 n_rows, 458 cache, 459 parallel, 460 rechunk, 461 parse_row_index_args(row_index_name, row_index_offset), 462 low_memory, 463 cloud_options=storage_options, 464 use_statistics=use_statistics, 465 hive_partitioning=hive_partitioning, 466 hive_schema=hive_schema, 467 retries=retries, 468 ) 469 return wrap_ldf(pylf)

ComputeError: parquet: File out of specification: Invalid thrift: protocol error

Expected behavior

The "considered_trxn_160wk.parquet" can be read and further processed with pl.scan_parquet or pl.read_parquet

Installed versions

``` --------Version info--------- Polars: 0.20.20 Index type: UInt32 Platform: Linux-6.5.0-28-generic-x86_64-with-glibc2.35 Python: 3.10.12 (main, Nov 20 2023, 15:14:05) [GCC 11.4.0] ----Optional dependencies---- adbc_driver_manager: cloudpickle: connectorx: deltalake: fastexcel: fsspec: gevent: hvplot: matplotlib: 3.8.4 nest_asyncio: 1.6.0 numpy: 1.23.5 openpyxl: pandas: 1.4.4 pyarrow: 15.0.2 pydantic: pyiceberg: pyxlsb: sqlalchemy: 2.0.29 xlsx2csv: xlsxwriter: ```