airbytehq / airbyte

The leading data integration platform for ETL / ELT data pipelines from APIs, databases & files to data warehouses, data lakes & data lakehouses. Both self-hosted and Cloud-hosted.
https://airbyte.com
Other
15.7k stars 4.03k forks source link

Source SFTP Bulk: Timeout while reading large file #26021

Open jurgispods opened 1 year ago

jurgispods commented 1 year ago

Connector Name

source-sftp-bulk

Connector Version

0.1.2

What step the error happened?

During the sync

Revelant information

When using the SFTP bulk source to read a fairly large CSV file (close to 1GB), I run into a timeout, which itself causes another error, since the exception does not seem to be handled correctly.

It would be nice if this could be fixed. I would also be very interested in a workaround, i.e. being able to configure the timeout to be higher than the default.

Relevant log output

'SFTPFile' object is not subscriptable", "stack_trace": "Traceback (most recent call last):
  File "/usr/local/lib/python3.9/site-packages/paramiko/channel.py", line 699, in recv
    out = self.in_buffer.read(nbytes, self.timeout)
  File "/usr/local/lib/python3.9/site-packages/paramiko/buffered_pipe.py", line 164, in read
    raise PipeTimeout()
paramiko.buffered_pipe.PipeTimeout

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/airbyte/integration_code/source_sftp_bulk/client.py", line 190, in fetch_file
    df = pd.read_csv(f, engine="python", sep=separator)
  File "/usr/local/lib/python3.9/site-packages/pandas/util/_decorators.py", line 211, in wrapper
    return func(*args, **kwargs)
  File "/usr/local/lib/python3.9/site-packages/pandas/util/_decorators.py", line 317, in wrapper
    return func(*args, **kwargs)
  File "/usr/local/lib/python3.9/site-packages/pandas/io/parsers/readers.py", line 950, in read_csv
    return _read(filepath_or_buffer, kwds)
  File "/usr/local/lib/python3.9/site-packages/pandas/io/parsers/readers.py", line 611, in _read
    return parser.read(nrows)
  File "/usr/local/lib/python3.9/site-packages/pandas/io/parsers/readers.py", line 1772, in read
    ) = self._engine.read(  # type: ignore[attr-defined]
  File "/usr/local/lib/python3.9/site-packages/pandas/io/parsers/python_parser.py", line 251, in read
    content = self._get_lines(rows)
  File "/usr/local/lib/python3.9/site-packages/pandas/io/parsers/python_parser.py", line 1124, in _get_lines
    new_row = self._next_iter_line(row_num=self.pos + rows + 1)
  File "/usr/local/lib/python3.9/site-packages/pandas/io/parsers/python_parser.py", line 787, in _next_iter_line
    line = next(self.data)
  File "/usr/local/lib/python3.9/site-packages/paramiko/file.py", line 125, in __next__
    line = self.readline()
  File "/usr/local/lib/python3.9/site-packages/paramiko/file.py", line 291, in readline
    new_data = self._read(n)
  File "/usr/local/lib/python3.9/site-packages/paramiko/sftp_file.py", line 185, in _read
    t, msg = self.sftp._request(
  File "/usr/local/lib/python3.9/site-packages/paramiko/sftp_client.py", line 822, in _request
    return self._read_response(num)
  File "/usr/local/lib/python3.9/site-packages/paramiko/sftp_client.py", line 852, in _read_response
    t, data = self._read_packet()
  File "/usr/local/lib/python3.9/site-packages/paramiko/sftp.py", line 201, in _read_packet
    x = self._read_all(4)
  File "/usr/local/lib/python3.9/site-packages/paramiko/sftp.py", line 185, in _read_all
    x = self.sock.recv(n)
  File "/usr/local/lib/python3.9/site-packages/paramiko/channel.py", line 701, in recv
    raise socket.timeout()
socket.timeout

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/airbyte/integration_code/main.py", line 13, in <module>
    launch(source, sys.argv[1:])
  File "/usr/local/lib/python3.9/site-packages/airbyte_cdk/entrypoint.py", line 131, in launch
    for message in source_entrypoint.run(parsed_args):
  File "/usr/local/lib/python3.9/site-packages/airbyte_cdk/entrypoint.py", line 122, in run
    for message in generator:
  File "/usr/local/lib/python3.9/site-packages/airbyte_cdk/sources/abstract_source.py", line 99, in read
    stream_instances = {s.name: s for s in self.streams(config)}
  File "/airbyte/integration_code/source_sftp_bulk/source.py", line 133, in streams
    json_schema = self._infer_json_schema(config, conn)
  File "/airbyte/integration_code/source_sftp_bulk/source.py", line 58, in _infer_json_schema
    df = connection.fetch_file(fn=files[-1], file_type=config["file_type"], separator=config.get("separator"))
  File "/usr/local/lib/python3.9/site-packages/backoff/_sync.py", line 94, in retry
    ret = target(*args, **kwargs)
  File "/airbyte/integration_code/source_sftp_bulk/client.py", line 206, in fetch_file
    logger.warning("Skipping %s file because it is unable to be read.", f["filepath"])
TypeError: 'SFTPFile' object is not subscriptable

Contribute

octavia-squidington-iii commented 3 months ago

At Airbyte, we seek to be clear about the project priorities and roadmap. This issue has not had any activity for 180 days, suggesting that it's not as critical as others. It's possible it has already been fixed. It is being marked as stale and will be closed in 20 days if there is no activity. To keep it open, please comment to let us know why it is important to you and if it is still reproducible on recent versions of Airbyte.