apache / airflow

Apache Airflow - A platform to programmatically author, schedule, and monitor workflows
https://airflow.apache.org/
Apache License 2.0
37.13k stars 14.31k forks source link

Airflow SFTP Operator cannot connect to a remote endpoint due to no using "expanduser" to read file directory #35377

Open MemphisMeng opened 1 year ago

MemphisMeng commented 1 year ago

Apache Airflow version

Other Airflow 2 version (please specify below)

What happened

I tried to set up a task that uploads files to an endpoint via SFTP using SFTP Operator from one of Airflow providers, sftp. My code is like:

sftp_task = SFTPOperator(
      task_id="sftp",
      sftp_hook=SFTPHook(
          remote_host="my-host.net",
          username="username",
          password="password",
          port=22),
      local_filepath=["my-file.csv"],
      remote_filepath=["/"]
  )

It failed all the time due to the built-in intelligibility of Python when the file path includes "~" as the abbreviation for the MacOS user path.

This is my detail traceback:

155.1.168.192.in-addr.arpa
*** Found local files:
***   * /Users/memphismeng/airflow/logs/dag_id=ads_report-f2289a39-ae88-4a2c-880b-2db79c150990/run_id=scheduled__2023-10-30T00:00:00+00:00/task_id=deliver.sftp/attempt=1.log
[2023-10-30, 20:42:40 EDT] {logging_mixin.py:150} WARNING - /Users/memphismeng/.pyenv/versions/3.8.16/envs/airflow/lib/python3.8/site-packages/airflow/utils/sqlalchemy.py:124 DeprecationWarning: The sql_alchemy_conn option in [core] has been moved to the sql_alchemy_conn option in [database] - the old setting has been used, but please update your config.
[2023-10-30, 20:42:40 EDT] {taskinstance.py:1103} INFO - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: ads_report-f2289a39-ae88-4a2c-880b-2db79c150990.deliver.sftp scheduled__2023-10-30T00:00:00+00:00 [queued]>
[2023-10-30, 20:42:40 EDT] {taskinstance.py:1103} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: ads_report-f2289a39-ae88-4a2c-880b-2db79c150990.deliver.sftp scheduled__2023-10-30T00:00:00+00:00 [queued]>
[2023-10-30, 20:42:40 EDT] {taskinstance.py:1308} INFO - Starting attempt 1 of 1
[2023-10-30, 20:42:40 EDT] {taskinstance.py:1327} INFO - Executing <Task(SFTPOperator): deliver.sftp> on 2023-10-30 00:00:00+00:00
[2023-10-30, 20:42:40 EDT] {standard_task_runner.py:57} INFO - Started process 8261 to run task
[2023-10-30, 20:42:40 EDT] {standard_task_runner.py:84} INFO - Running: ['airflow', 'tasks', 'run', 'ads_report-f2289a39-ae88-4a2c-880b-2db79c150990', 'deliver.sftp', 'scheduled__2023-10-30T00:00:00+00:00', '--job-id', '219', '--raw', '--subdir', 'DAGS_FOLDER/ads_report_dag.py', '--cfg-path', '/var/folders/x_/p80z9h2x3z1b2lsgrk4p3q2h0000gn/T/tmps80qracv']
[2023-10-30, 20:42:40 EDT] {standard_task_runner.py:85} INFO - Job 219: Subtask deliver.sftp
[2023-10-30, 20:42:40 EDT] {logging_mixin.py:150} WARNING - /Users/memphismeng/.pyenv/versions/3.8.16/envs/airflow/lib/python3.8/site-packages/airflow/settings.py:195 DeprecationWarning: The sql_alchemy_conn option in [core] has been moved to the sql_alchemy_conn option in [database] - the old setting has been used, but please update your config.
[2023-10-30, 20:42:40 EDT] {task_command.py:410} INFO - Running <TaskInstance: ads_report-f2289a39-ae88-4a2c-880b-2db79c150990.deliver.sftp scheduled__2023-10-30T00:00:00+00:00 [running]> on host 155.1.168.192.in-addr.arpa
[2023-10-30, 20:42:40 EDT] {logging_mixin.py:150} WARNING - /Users/memphismeng/.pyenv/versions/3.8.16/envs/airflow/lib/python3.8/site-packages/airflow/settings.py:346 DeprecationWarning: The sql_engine_encoding option in [core] has been moved to the sql_engine_encoding option in [database] - the old setting has been used, but please update your config.
[2023-10-30, 20:42:40 EDT] {logging_mixin.py:150} WARNING - /Users/memphismeng/.pyenv/versions/3.8.16/envs/airflow/lib/python3.8/site-packages/airflow/utils/sqlalchemy.py:124 DeprecationWarning: The sql_alchemy_conn option in [core] has been moved to the sql_alchemy_conn option in [database] - the old setting has been used, but please update your config.
[2023-10-30, 20:42:40 EDT] {taskinstance.py:1545} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNER='airflow' AIRFLOW_CTX_DAG_ID='ads_report-f2289a39-ae88-4a2c-880b-2db79c150990' AIRFLOW_CTX_TASK_ID='deliver.sftp' AIRFLOW_CTX_EXECUTION_DATE='2023-10-30T00:00:00+00:00' AIRFLOW_CTX_TRY_NUMBER='1' AIRFLOW_CTX_DAG_RUN_ID='scheduled__2023-10-30T00:00:00+00:00'
[2023-10-30, 20:42:40 EDT] {sftp.py:187} INFO - Starting to transfer file from output/f2289a39-ae88-4a2c-880b-2db79c150990/ads_report_results.csv to /
[2023-10-30, 20:42:40 EDT] {ssh.py:300} WARNING - No Host Key Verification. This won't protect against Man-In-The-Middle attacks
[2023-10-30, 20:42:41 EDT] {transport.py:1893} INFO - Connected (version 2.0, client AWS_SFTP_1.1)
[2023-10-30, 20:42:41 EDT] {ssh.py:342} INFO - Failed to connect. Sleeping before retry attempt 1
[2023-10-30, 20:42:45 EDT] {transport.py:1893} INFO - Connected (version 2.0, client AWS_SFTP_1.1)
[2023-10-30, 20:42:45 EDT] {ssh.py:342} INFO - Failed to connect. Sleeping before retry attempt 2
[2023-10-30, 20:52:39 EDT] {transport.py:1893} INFO - Connected (version 2.0, client AWS_SFTP_1.1)
[2023-10-30, 20:52:40 EDT] {taskinstance.py:1824} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/Users/memphismeng/.pyenv/versions/3.8.16/envs/airflow/lib/python3.8/site-packages/airflow/providers/sftp/operators/sftp.py", line 188, in execute
    self.sftp_hook.store_file(_remote_filepath, _local_filepath, confirm=self.confirm)
  File "/Users/memphismeng/.pyenv/versions/3.8.16/envs/airflow/lib/python3.8/site-packages/airflow/providers/sftp/hooks/sftp.py", line 249, in store_file
    conn = self.get_conn()
  File "/Users/memphismeng/.pyenv/versions/3.8.16/envs/airflow/lib/python3.8/site-packages/airflow/providers/sftp/hooks/sftp.py", line 120, in get_conn
    self.conn = super().get_conn().open_sftp()
  File "/Users/memphismeng/.pyenv/versions/3.8.16/envs/airflow/lib/python3.8/site-packages/airflow/providers/ssh/hooks/ssh.py", line 346, in get_conn
    for attempt in Retrying(
  File "/Users/memphismeng/.pyenv/versions/3.8.16/envs/airflow/lib/python3.8/site-packages/tenacity/__init__.py", line 347, in __iter__
    do = self.iter(retry_state=retry_state)
  File "/Users/memphismeng/.pyenv/versions/3.8.16/envs/airflow/lib/python3.8/site-packages/tenacity/__init__.py", line 325, in iter
    raise retry_exc.reraise()
  File "/Users/memphismeng/.pyenv/versions/3.8.16/envs/airflow/lib/python3.8/site-packages/tenacity/__init__.py", line 158, in reraise
    raise self.last_attempt.result()
  File "/Users/memphismeng/.pyenv/versions/3.8.16/lib/python3.8/concurrent/futures/_base.py", line 437, in result
    return self.__get_result()
  File "/Users/memphismeng/.pyenv/versions/3.8.16/lib/python3.8/concurrent/futures/_base.py", line 389, in __get_result
    raise self._exception
  File "/Users/memphismeng/.pyenv/versions/3.8.16/envs/airflow/lib/python3.8/site-packages/airflow/providers/ssh/hooks/ssh.py", line 353, in get_conn
    client.connect(**connect_kwargs)
  File "/Users/memphismeng/.pyenv/versions/3.8.16/envs/airflow/lib/python3.8/site-packages/paramiko/client.py", line 485, in connect
    self._auth(
  File "/Users/memphismeng/.pyenv/versions/3.8.16/envs/airflow/lib/python3.8/site-packages/paramiko/client.py", line 730, in _auth
    key = self._key_from_filepath(
  File "/Users/memphismeng/.pyenv/versions/3.8.16/envs/airflow/lib/python3.8/site-packages/paramiko/client.py", line 638, in _key_from_filepath
    key = klass.from_private_key_file(key_path, password)
  File "/Users/memphismeng/.pyenv/versions/3.8.16/envs/airflow/lib/python3.8/site-packages/paramiko/pkey.py", line 421, in from_private_key_file
    key = cls(filename=filename, password=password)
  File "/Users/memphismeng/.pyenv/versions/3.8.16/envs/airflow/lib/python3.8/site-packages/paramiko/rsakey.py", line 64, in __init__
    self._from_private_key_file(filename, password)
  File "/Users/memphismeng/.pyenv/versions/3.8.16/envs/airflow/lib/python3.8/site-packages/paramiko/rsakey.py", line 196, in _from_private_key_file
    data = self._read_private_key_file("RSA", filename, password)
  File "/Users/memphismeng/.pyenv/versions/3.8.16/envs/airflow/lib/python3.8/site-packages/paramiko/pkey.py", line 494, in _read_private_key_file
    with open(filename, "r") as f:
FileNotFoundError: [Errno 2] No such file or directory: '~/.ssh/id_rsa'
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
  File "/Users/memphismeng/.pyenv/versions/3.8.16/envs/airflow/lib/python3.8/site-packages/airflow/providers/sftp/operators/sftp.py", line 191, in execute
    raise AirflowException(f"Error while transferring {file_msg}, error: {e}")
airflow.exceptions.AirflowException: Error while transferring from output/f2289a39-ae88-4a2c-880b-2db79c150990/ads_report_results.csv to /, error: [Errno 2] No such file or directory: '~/.ssh/id_rsa'
[2023-10-30, 20:52:40 EDT] {taskinstance.py:1345} INFO - Marking task as FAILED. dag_id=ads_report-f2289a39-ae88-4a2c-880b-2db79c150990, task_id=deliver.sftp, execution_date=20231030T000000, start_date=20231031T004240, end_date=20231031T005240
[2023-10-30, 20:52:40 EDT] {standard_task_runner.py:104} ERROR - Failed to execute job 219 for task deliver.sftp (Error while transferring from output/f2289a39-ae88-4a2c-880b-2db79c150990/ads_report_results.csv to /, error: [Errno 2] No such file or directory: '~/.ssh/id_rsa'; 8261)
[2023-10-30, 20:52:40 EDT] {local_task_job_runner.py:225} INFO - Task exited with return code 1
[2023-10-30, 20:52:40 EDT] {taskinstance.py:2653} INFO - 0 downstream tasks scheduled from follow-on schedule check

What you think should happen instead

My guess is that should we modify somewhere in the codebase to always let paramiko receive the complete user path instead of "~", it would be resolved.

How to reproduce

Prepare a working SFTP portal, and the credentials including username, password and port that are required to connecting it. Set up a task like I did in the previous part. Trigger the DAG that includes it.

Operating System

macOS 14.0 (23A344)

Versions of Apache Airflow Providers

apache-airflow-providers-celery==3.3.4 apache-airflow-providers-common-sql==1.5.2 apache-airflow-providers-ftp==3.4.2 apache-airflow-providers-http==4.4.2 apache-airflow-providers-imap==3.2.2 apache-airflow-providers-sftp==4.7.0 apache-airflow-providers-sqlite==3.4.2 apache-airflow-providers-ssh==3.8.0

Deployment

Virtualenv installation

Deployment details

No response

Anything else

No response

Are you willing to submit PR?

Code of Conduct

jscheffl commented 1 year ago

Do I see it right that you machine does not have a ~/.ssh/id_rsa file and SSH client fail on this instead of directly making a fallback to provided user/password credentials? If the problem resolved as workaround if you create an empty ~/.ssh/id_rsa on your system? Bug then also might be within paramiko, or do you think Airflow should implement a workaround for such failure?

Bisk1 commented 1 year ago

@MemphisMeng I tried to reproduce this and in my case ~ is correctly replaced with user home dir by expanduser here: https://github.com/paramiko/paramiko/blob/66117732de6de03914308f9a21b05b50a781d13c/paramiko/client.py#L774 So I compared your stack trace with mine and it seems that your code reached https://github.com/paramiko/paramiko/blob/66117732de6de03914308f9a21b05b50a781d13c/paramiko/client.py#L730 which is only possible if you provided key_file in your connection params. Your code sample doesn't mention it - is it possible that some details are missing in your sample?

anthavio commented 1 year ago

I'm using apache-airflow-providers-sftp==4.2.4 I've just tried to move to SFTPHook instead of deprecated SSHHook and ran into this problem Rolling back to ssh_hook=SSHHook(...) in SFTPOperator "fixed" this. I hope this helps you before proper fix is released

potiuk commented 1 year ago

Marked it as good first issue - maybe someone will be able to reproduce and fix it.

MemphisMeng commented 10 months ago

@Bisk1 Probably I did miss something, can you tell me what key type is yours?

MemphisMeng commented 10 months ago

Do I see it right that you machine does not have a ~/.ssh/id_rsa file and SSH client fail on this instead of directly making a fallback to provided user/password credentials? If the problem resolved as workaround if you create an empty ~/.ssh/id_rsa on your system? Bug then also might be within paramiko, or do you think Airflow should implement a workaround for such failure?

I've had ~/.ssh/id_rsa in my system and it's unlikely for me to empty it. I'm sure this is a built-in bug in paramiko, given my experience and contributions to Airflow, I can't answer your second question.

Bisk1 commented 10 months ago

@Bisk1 Probably I did miss something, can you tell me what key type is yours?

I didn't provide this param at all when trying to reproduce. I meant that if would be helpful if you could update your example with relevant details.

pateash commented 4 months ago

@eladkal feel free to assign this to me, if no one is checking.

pateash commented 3 months ago

Was not able to reproduce either, @MemphisMeng could you please provide more details.