airbytehq / airbyte

The leading data integration platform for ETL / ELT data pipelines from APIs, databases & files to data warehouses, data lakes & data lakehouses. Both self-hosted and Cloud-hosted.
https://airbyte.com
Other
16.2k stars 4.14k forks source link

Source S3: ZIP file reading feature breaks stream on corrupt zip files #35321

Open makz81 opened 9 months ago

makz81 commented 9 months ago

Connector Name

source-s3

Connector Version

4.5.2

What step the error happened?

During the sync

Relevant information

When I search an S3 bucket for specific file extensions (e.g., */.json) as a glob value in the JSONL stream, it also searches through ZIP files located anywhere in the search path. This cannot be disabled. Should there be a corrupt ZIP file somewhere in the bucket, the sync ends with an error.

This feature seems to have been introduced here: https://github.com/airbytehq/airbyte/pull/31340 (@tolik0)

Expected behavior: Skip corrupt ZIP files or make it configurable whether to search through ZIP archives.

Relevant log output

2024-02-15 12:37:02 source > Received 1000 objects from S3 for prefix 'None'.
2024-02-15 12:37:02 source > Received 1000 objects from S3 for prefix 'None'.
2024-02-15 12:37:03 source > Received 1000 objects from S3 for prefix 'None'.
2024-02-15 12:37:03 source > Received 1000 objects from S3 for prefix 'None'.
2024-02-15 12:37:03 source > Encountered an exception while reading stream Video Jsons
Traceback (most recent call last):
  File "/airbyte/integration_code/source_s3/v4/stream_reader.py", line 135, in get_matching_files
    for remote_file in self._page(s3, globs, self.config.bucket, current_prefix, seen, logger):
  File "/airbyte/integration_code/source_s3/v4/stream_reader.py", line 207, in _page
    for remote_file in self._handle_file(file):
  File "/airbyte/integration_code/source_s3/v4/stream_reader.py", line 222, in _handle_file
    yield from self._handle_zip_file(file)
  File "/airbyte/integration_code/source_s3/v4/stream_reader.py", line 228, in _handle_zip_file
    zip_members, cd_start = zip_handler.get_zip_files(file["Key"])
  File "/airbyte/integration_code/source_s3/v4/zip_reader.py", line 134, in get_zip_files
    central_dir_start = self._get_central_directory_start(filename)
  File "/airbyte/integration_code/source_s3/v4/zip_reader.py", line 117, in _get_central_directory_start
    central_dir_start = struct.unpack_from("
TypeError: a bytes-like object is required, not 'NoneType'
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
  File "/usr/local/lib/python3.9/site-packages/airbyte_cdk/sources/abstract_source.py", line 126, in read
    yield from self._read_stream(
  File "/usr/local/lib/python3.9/site-packages/airbyte_cdk/sources/abstract_source.py", line 203, in _read_stream
    for record in record_iterator:
  File "/usr/local/lib/python3.9/site-packages/airbyte_cdk/sources/abstract_source.py", line 239, in _read_incremental
    for record_data_or_message in stream_instance.read_incremental(
  File "/usr/local/lib/python3.9/site-packages/airbyte_cdk/sources/streams/core.py", line 137, in read_incremental
    slices = self.stream_slices(
  File "/usr/local/lib/python3.9/site-packages/airbyte_cdk/sources/file_based/stream/abstract_file_based_stream.py", line 112, in stream_slices
    return self.compute_slices()
  File "/usr/local/lib/python3.9/site-packages/airbyte_cdk/sources/file_based/stream/default_file_based_stream.py", line 65, in compute_slices
    all_files = self.list_files()
  File "/usr/local/lib/python3.9/site-packages/airbyte_cdk/sources/file_based/stream/abstract_file_based_stream.py", line 73, in list_files
    return list(self.get_files())
  File "/airbyte/integration_code/source_s3/v4/stream_reader.py", line 147, in get_matching_files
    self._raise_error_listing_files(globs, exc)
  File "/airbyte/integration_code/source_s3/v4/stream_reader.py", line 151, in _raise_error_listing_files
    raise ErrorListingFiles(
airbyte_cdk.sources.file_based.exceptions.ErrorListingFiles: Error listing files. Please check the credentials provided in the config and verify that they provide permission to list files. Contact Support if you need assistance.
source=s3 bucket=test-development globs=['*/*/**/final.json'] endpoint=
2024-02-15 12:37:03 source > Marking stream Video Jsons as STOPPED
2024-02-15 12:37:03 source > Finished syncing Video Jsons
2024-02-15 12:37:03 source > SourceS3 runtimes:
Syncing stream Video Jsons 0:04:06.203762
2024-02-15 12:37:03 source > Error listing files. Please check the credentials provided in the config and verify that they provide permission to list files. Contact Support if you need assistance.
source=s3 bucket=test-development globs=['*/*/**/final.json'] endpoint=
Traceback (most recent call last):
  File "/airbyte/integration_code/source_s3/v4/stream_reader.py", line 135, in get_matching_files
    for remote_file in self._page(s3, globs, self.config.bucket, current_prefix, seen, logger):
  File "/airbyte/integration_code/source_s3/v4/stream_reader.py", line 207, in _page
    for remote_file in self._handle_file(file):
  File "/airbyte/integration_code/source_s3/v4/stream_reader.py", line 222, in _handle_file
    yield from self._handle_zip_file(file)

Contribute

marcosmarxm commented 9 months ago

Thanks for reporting the issue @makz81 I added this to the connector team backlog. @tolik0 can you take a look in this issue? Is it something doable to implement?