hellonarrativ / spectrify

Export Redshift data and convert to Parquet for use with Redshift Spectrum or other data warehouses.
https://aws.amazon.com/blogs/big-data/narrativ-is-helping-producers-monetize-their-digital-content-with-amazon-redshift/
MIT License
116 stars 25 forks source link

EOFError: Compressed file ended before the end-of-stream marker was reached #62

Open userkv04 opened 4 years ago

userkv04 commented 4 years ago

Description

EOFError occurred while using ConcurrentManifestConverter for converting files to parquet (spectrify/spectrify/convert.py)

What I Did

After exporting csv files to s3, I tried converting exported files from gz to parquet format. Used a simple standalone python file and made calls to ConcurrentManifestConverter methods. Then I got EOFError randomly. Also, no errors occurred when using SimpleManifestConverter. So I guess the problem appears to be with multiprocessing pools.

convert = ConcurrentManifestConverter(sa_table, s3_config) convert.convert_manifest() Following error occured. Also, I am not getting this error all the time when I run the script. Sometime the script succeeds and sometime it throws this error.

Converting file [s3://aaaa/bbbb/cccc/partition_key=2004-080000_part_00.gz] to [s3://xxxx/yyyy/zzzz/partition_key=2004-08/partition_key=2004-080000_part_00.parq]
Converting file [s3://aaaa/bbbb/cccc/partition_key=2004-080001_part_00.gz] to [s3://xxxx/yyyy/zzzz/partition_key=2004-08/partition_key=2004-080001_part_00.parq]
Done converting file [s3://aaaa/bbbb/cccc/partition_key=2004-080001_part_00.gz] to [s3://xxxx/yyyy/zzzz/partition_key=2004-08/partition_key=2004-080001_part_00.parq]

Aborted!
multiprocessing.pool.RemoteTraceback:
"""
Traceback (most recent call last):
  File "/usr/lib/python3.7/multiprocessing/pool.py", line 121, in worker
    result = (True, func(*args, **kwds))
  File "/usr/lib/python3.7/multiprocessing/pool.py", line 44, in mapstar
    return list(map(*args))
  File "/usr/local/lib/python3.7/dist-packages/spectrify/convert.py", line 239, in _parallel_wrapper
    CsvConverter(sa_table, s3_config, delimiter, escapechar, quoting, unicode_csv).convert_csv(data_path)
  File "/usr/local/lib/python3.7/dist-packages/spectrify/convert.py", line 130, in convert_csv
    for chunk in self.columnar_data_chunks(file_path, self.sa_table, SPECTRIFY_ROWS_PER_GROUP):
  File "/usr/local/lib/python3.7/dist-packages/spectrify/convert.py", line 178, in columnar_data_chunks
    for row in reader:
  File "/usr/lib/python3.7/gzip.py", line 289, in read1
    return self._buffer.read1(size)
  File "/usr/lib/python3.7/_compression.py", line 68, in readinto
    data = self.read(len(byte_view))
  File "/usr/lib/python3.7/gzip.py", line 482, in read
    raise EOFError("Compressed file ended before the "
EOFError: Compressed file ended before the end-of-stream marker was reached