apache / airflow

Apache Airflow - A platform to programmatically author, schedule, and monitor workflows
https://airflow.apache.org/
Apache License 2.0
36.92k stars 14.26k forks source link

HttpToS3Operator throws exception if s3_bucket parameter is not passed #43379

Open kostiantyn-lab opened 5 days ago

kostiantyn-lab commented 5 days ago

Apache Airflow version

Other Airflow 2 version (please specify below)

If "Other Airflow 2 version" selected, which one?

2.10.1

What happened?

When using the HttpToS3Operator operator without s3_bucket parameter, I get this error:

[2024-10-25, 15:05:43 EEST] {local_task_job_runner.py:123} ▶ Pre task execution logs
[2024-10-25, 15:05:43 EEST] {http_to_s3.py:165} INFO - Calling HTTP method
[2024-10-25, 15:05:43 EEST] {base.py:84} INFO - Retrieving connection 'http_conn'
[2024-10-25, 15:05:44 EEST] {base.py:84} INFO - Retrieving connection 'aws_conn'
[2024-10-25, 15:05:44 EEST] {taskinstance.py:3310} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 767, in _execute_task
    result = _execute_callable(context=context, **execute_callable_kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 733, in _execute_callable
    return ExecutionCallableRunner(
           ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/utils/operator_helpers.py", line 252, in run
    return self.func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/models/baseoperator.py", line 406, in wrapper
    return func(self, *args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/providers/amazon/aws/transfers/http_to_s3.py", line 168, in execute
    self.s3_hook.load_bytes(
  File "/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/providers/amazon/aws/hooks/s3.py", line 158, in wrapper
    return func(*bound_args.args, **bound_args.kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/providers/amazon/aws/hooks/s3.py", line 132, in wrapper
    return func(*bound_args.args, **bound_args.kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/providers/amazon/aws/hooks/s3.py", line 1205, in load_bytes
    self._upload_file_obj(f, key, bucket_name, replace, encrypt, acl_policy)
  File "/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/providers/amazon/aws/hooks/s3.py", line 1255, in _upload_file_obj
    client.upload_fileobj(
  File "/usr/local/airflow/.local/lib/python3.11/site-packages/boto3/s3/inject.py", line 635, in upload_fileobj
    future = manager.upload(
             ^^^^^^^^^^^^^^^
  File "/usr/local/airflow/.local/lib/python3.11/site-packages/s3transfer/manager.py", line 323, in upload
    self._validate_if_bucket_supported(bucket)
  File "/usr/local/airflow/.local/lib/python3.11/site-packages/s3transfer/manager.py", line 492, in _validate_if_bucket_supported
    match = pattern.match(bucket)
            ^^^^^^^^^^^^^^^^^^^^^
TypeError: expected string or bytes-like object, got 'NoneType'
[2024-10-25, 15:05:44 EEST] {taskinstance.py:1225} INFO - Marking task as UP_FOR_RETRY. dag_id=test, task_id=download, run_id=manual__2024-10-25T12:05:38.785000+00:00, execution_date=20241025T120538, start_date=20241025T120543, end_date=20241025T120544
[2024-10-25, 15:05:44 EEST] {taskinstance.py:340} ▶ Post task execution logs

What you think should happen instead?

the operator worked without errors, since S3Hook gets the S3 bucket name from the service_config in the extra connection information

How to reproduce

Create and run this simple DAG

from datetime import datetime, timedelta

from airflow import DAG
from airflow.providers.amazon.aws.transfers.http_to_s3 import HttpToS3Operator

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(year=2019, month=1, day=1),
    'email': ['noreply@example.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5)
}

with DAG(
        dag_id='http-to-s3-test',
        default_args=default_args,
        description='http-to-s3-test',
        catchup=False,
        schedule_interval=None) as dag:

    download = HttpToS3Operator(
        task_id='download',
        aws_conn_id='aws_conn',
        http_conn_id='http_conn',
        method='GET',
        extra_options={'check_response': True},
        endpoint='/test.txt',
        s3_key='test.txt',
        replace=True,
    )

Operating System

Amazon Linux 2023.5.20240916

Versions of Apache Airflow Providers

apache-airflow-providers-amazon==8.28.0
apache-airflow-providers-apache-spark==4.10.0
apache-airflow-providers-atlassian-jira==2.7.0
apache-airflow-providers-celery==3.8.1
apache-airflow-providers-common-compat==1.2.0
apache-airflow-providers-common-io==1.4.0
apache-airflow-providers-common-sql==1.16.0
apache-airflow-providers-fab==1.3.0
apache-airflow-providers-ftp==3.11.0
apache-airflow-providers-http==4.13.0
apache-airflow-providers-imap==3.7.0
apache-airflow-providers-microsoft-mssql==3.9.0
apache-airflow-providers-postgres==5.12.0
apache-airflow-providers-sftp==4.11.0
apache-airflow-providers-slack==8.9.0
apache-airflow-providers-smtp==1.8.0
apache-airflow-providers-sqlite==3.9.0
apache-airflow-providers-ssh==3.13.1

Deployment

Amazon (AWS) MWAA

Deployment details

No response

Anything else?

No response

Are you willing to submit PR?

Code of Conduct

potiuk commented 5 days ago

Maybe you would like to fix it and contribute PR @kostiantyn-lab ? If not then I marked it as a good first issue and it will wait for someon who would like to fix it.

ANIR1604 commented 5 days ago

@potiuk I would like work on this, if that's okay.

potiuk commented 5 days ago

@potiuk I would like work on this, if that's okay.

Feel free.

kostiantyn-lab commented 4 days ago

@potiuk I would like work on this, if that's okay.

it looks like we need to change this condition to something like this

before

if "bucket_name" not in bound_args.arguments:

after

# check that bucket_name is passed to parameters and does not have NULL or empty value
if not bound_args.arguments.get("bucket_name"):
potiuk commented 4 days ago

just make a PR - discussing it on the changed code is so much easier.

ANIR1604 commented 3 days ago

@potiuk I would like work on this, if that's okay.

it looks like we need to change this condition to something like this

before

if "bucket_name" not in bound_args.arguments:

after

# check that bucket_name is passed to parameters and does not have NULL or empty value
if not bound_args.arguments.get("bucket_name"):

Can you help me in Creating the Unit Test Case, I had already Modified the Code ...

potiuk commented 3 days ago

Can you help me in Creating the Unit Test Case, I had already Modified the Code ...

Generally - look at the other, realated unit test cases. Use them as learning examples. Adding unit tests as part of the fix is "integral" part of code change.

kostiantyn-lab commented 2 days ago

Can you help me in Creating the Unit Test Case, I had already Modified the Code ...

You can use this test as an example. https://github.com/apache/airflow/blob/main/providers/tests/amazon/aws/hooks/test_s3.py#L1143

ANIR1604 commented 2 days ago

Can you help me in Creating the Unit Test Case, I had already Modified the Code ...

Generally - look at the other, realated unit test cases. Use them as learning examples. Adding unit tests as part of the fix is "integral" part of code change.

Plz review and merge my code I had also added unit test cases. @potiuk

potiuk commented 2 days ago

Can you help me in Creating the Unit Test Case, I had already Modified the Code ...

Generally - look at the other, realated unit test cases. Use them as learning examples. Adding unit tests as part of the fix is "integral" part of code change.

Plz review and merge my code I had also added unit test cases. @potiuk

Once you fix failing test cases, sure. Also if you want to get review, ping in general "can you please review" in the PR or ask for review in #new-contributors slack channel. There are many reviewers here who can review and approve your code. It does not have to be me.

ANIR1604 commented 1 day ago

Can you help me in Creating the Unit Test Case, I had already Modified the Code ...

Generally - look at the other, realated unit test cases. Use them as learning examples. Adding unit tests as part of the fix is "integral" part of code change.

Plz review and merge my code I had also added unit test cases. @potiuk

Once you fix failing test cases, sure. Also if you want to get review, ping in general "can you please review" in the PR or ask for review in #new-contributors slack channel. There are many reviewers here who can review and approve your code. It does not have to be me.

You have already Reviewed and approved my code when will the PR be merged actually its my first PR....

potiuk commented 1 day ago

You have already Reviewed and approved my code when will the PR be merged actually its my first PR....

The tests need to pass. As a first time contributor - someone needs to approve your "CI" workflows - that's why pinging it in your PR is the best way to drag attention to it. Simply "please approve and run my workflow here" in the PR will do. And again - any one of maintainers can do it - it does not have to be me. I might be on holidays, have vacations or simply be busy with 100 other prs :)

And it will be merged when it gets green - then you can also write "Hey my PR is green, can it be merged" - either in PR or in slack - with link to your PR. Doing it in PR makes it ... easier than an issue - because in here you might not easily see which PR you are asking for - you need to scroll the conversation all the way up and see "may be fixed by ...." so pinging in the PR is just ... more efficient.