apache / airflow

Apache Airflow - A platform to programmatically author, schedule, and monitor workflows
https://airflow.apache.org/
Apache License 2.0
36.56k stars 14.16k forks source link

Status of testing Providers that were prepared on August 10, 2022 #25640

Closed potiuk closed 2 years ago

potiuk commented 2 years ago

Body

I have a kind request for all the contributors to the latest provider packages release. Could you please help us to test the RC versions of the providers?

Let us know in the comment, whether the issue is addressed.

Those are providers that require testing as there were some substantial changes introduced:

Provider amazon: 5.0.0rc3

The guidelines on how to test providers can be found in

Verify providers by contributors

Committer

gmcrocetti commented 2 years ago

All good regarding extra['host'] deprecation for amazon

phanikumv commented 2 years ago

Tested the below two and works fine with microsoft.azure: 4.2.0rc3

https://github.com/apache/airflow/pull/25235: @phanikumv

Screenshot 2022-08-11 at 2 12 49 PM

https://github.com/apache/airflow/pull/25362: @phanikumv

Screenshot 2022-08-11 at 2 31 24 PM
bharanidharan14 commented 2 years ago

Tested both Azure Service Bus (Update and Receive) Subscription Operator, working fine 👍

Screenshot 2022-08-11 at 3 16 52 PM

https://github.com/apache/airflow/pull/25029: @bharanidharan14

jgr-trackunit commented 2 years ago

Hi!

I've found an issue with databricks provider, at first glance it looks that it's related to: https://github.com/apache/airflow/pull/25115

@alexott I think it might be interesting for you.

More info below:

[2022-08-11, 11:10:43 UTC] {{standard_task_runner.py:91}} ERROR - Failed to execute job 34817 for task xxxxx
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/airflow/task/task_runner/standard_task_runner.py", line 85, in _start_by_fork
    args.func(args, dag=self.dag)
  File "/usr/local/lib/python3.7/site-packages/airflow/cli/cli_parser.py", line 48, in command
    return func(*args, **kwargs)
  File "/usr/local/lib/python3.7/site-packages/airflow/utils/cli.py", line 92, in wrapper
    return f(*args, **kwargs)
  File "/usr/local/lib/python3.7/site-packages/airflow/cli/commands/task_command.py", line 292, in task_run
    _run_task_by_selected_method(args, dag, ti)
  File "/usr/local/lib/python3.7/site-packages/airflow/cli/commands/task_command.py", line 107, in _run_task_by_selected_method
    _run_raw_task(args, ti)
  File "/usr/local/lib/python3.7/site-packages/airflow/cli/commands/task_command.py", line 184, in _run_raw_task
    error_file=args.error_file,
  File "/usr/local/lib/python3.7/site-packages/airflow/utils/session.py", line 70, in wrapper
    return func(*args, session=session, **kwargs)
  File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1332, in _run_raw_task
    self._execute_task_with_callbacks(context)
  File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1458, in _execute_task_with_callbacks
    result = self._execute_task(context, self.task)
  File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1514, in _execute_task
    result = execute_callable(context=context)
  File "/usr/local/airflow/.local/lib/python3.7/site-packages/airflow/providers/databricks/operators/databricks.py", line 374, in execute
    self.run_id = self._hook.submit_run(self.json)
  File "/usr/local/airflow/.local/lib/python3.7/site-packages/airflow/providers/databricks/hooks/databricks.py", line 152, in submit_run
    response = self._do_api_call(SUBMIT_RUN_ENDPOINT, json)
  File "/usr/local/airflow/.local/lib/python3.7/site-packages/airflow/providers/databricks/hooks/databricks_base.py", line 493, in _do_api_call
    headers = {**self.user_agent_header, **aad_headers}
  File "/usr/local/lib/python3.7/site-packages/cached_property.py", line 36, in __get__
    value = obj.__dict__[self.func.__name__] = self.func(obj)
  File "/usr/local/airflow/.local/lib/python3.7/site-packages/airflow/providers/databricks/hooks/databricks_base.py", line 136, in user_agent_header
    return {'user-agent': self.user_agent_value}
  File "/usr/local/lib/python3.7/site-packages/cached_property.py", line 36, in __get__
    value = obj.__dict__[self.func.__name__] = self.func(obj)
  File "/usr/local/airflow/.local/lib/python3.7/site-packages/airflow/providers/databricks/hooks/databricks_base.py", line 144, in user_agent_value
    if provider.is_source:
AttributeError: 'ProviderInfo' object has no attribute 'is_source'

I ran it on MWAA == 2.2.2 with below configuration:

new_cluster = {
    "autoscale": {"min_workers": 1, "max_workers": 2},
    "cluster_name": "",
    "spark_version": get_spark_version(),
    "spark_conf": Variable.get("SPARK_CONF", deserialize_json=True, default_var="{}"),
    "aws_attributes": {
        "first_on_demand": 1,
        "availability": "SPOT_WITH_FALLBACK",
        "zone_id": "auto",
        "instance_profile_arn": Variable.get("E2_INSTANCE_PROFILE_ARN", default_var=""),
        "spot_bid_price_percent": 100,
    },
    "enable_elastic_disk": True,
    "node_type_id": "r5a.xlarge",
    "ssh_public_keys": [],
    "custom_tags": {"Application": "databricks", "env": env, "AnalyticsTask": "task name"},
    "spark_env_vars": {},
    "cluster_source": "JOB",
    "init_scripts": [],
}

with DAG(
    dag_id="dag id",
    description="desc",
    default_args=default_args,
    schedule_interval="0 2 * * *",  # Every night at 02:00
    catchup=False,
    max_active_runs=1,
    concurrency=1,
    is_paused_upon_creation=dag_is_paused_upon_creation,
) as dag:

    task = DatabricksSubmitRunOperator(
        task_id="task-name",
        databricks_conn_id="connection-name",
        new_cluster=new_cluster,
        notebook_task="notebook task",
        timeout_seconds=3600 * 4,  # 4 hours
        polling_period_seconds=30,
        retries=1,
    )

Tell me if you need more detail.

alexott commented 2 years ago

@jgr-trackunit oh, this field was introduced in 2.3.0 :-( I think that I need to fix it before releasing

alexott commented 2 years ago

@potiuk unfortunately, Databricks provider became incompatible with 2.2. I'm preparing a fix for it, but it will be separate release. Sorry for adding more work on you :-(

potiuk commented 2 years ago

No problem. Good to know :). This is what testing is about @alexott :).

potiuk commented 2 years ago

Thanks @jgr-trackunit for spotting it.

LaPetiteSouris commented 2 years ago

@potiuk I am not sure if this is the right place to put it, but here is the deal:

https://github.com/apache/airflow/pull/24554 added num_batches parameter to SQSSensor in Amazon provider.

You did ask me as a contributor of the feature to test it for AWS provider amazon: 4.1.0rc1 in https://github.com/apache/airflow/issues/25037#event-7006630694

Unfortunately I did not have time to do so in time for the release.

Taking advantage of this 5.0.0 release for Amazon Providers, I tested the feature.

What have been tested

Given SQSSensor without num_batches and the same sensor with num_batches, ensure that:

Result

  1. The sensor
    read_from_queue = SqsSensor(
        aws_conn_id="aws_sqs_test",
        task_id="read_from_queue",
        sqs_queue=sqs_queue,
    )
    # Retrieve multiple batches of messages from SQS.
    # The SQS API only returns a maximum of 10 messages per poll.
    read_from_queue_in_batch = SqsSensor(
        aws_conn_id="aws_sqs_test",
        task_id="read_from_queue_in_batch",
        sqs_queue=sqs_queue,
        # Get maximum 10 messages each poll
        max_messages=3,
        # Combine 3 polls before returning results
        num_batches=3,
    )
  1. The result of task execution (success)

running

  1. The result of SQSSensor without num_batches enabled. Only a few messages are available in xcom

xcom_normal

  1. The result of SQSSensor with num_batches=3 and max_messages=3. It does get 3 x 3 = 9 messages for each execution. xcom_batch
alexott commented 2 years ago

opened #25674 to fix issue with DB provider

potiuk commented 2 years ago

@LaPetiteSouris - thank you ! This is cool to get it confirmed even now !

alexott commented 2 years ago

Found another issue with Databricks provider - DBSQL operator doesn't work anymore, most probably caused by #23971 - it looks like split_sql_string doesn't handle correctly simple SQL queries (like select * from default.a_events limit 10 that I'm using in tests). for this one I need to get more time debugging it

potiuk commented 2 years ago

Found another issue with Databricks provider - DBSQL operator doesn't work anymore, most probably caused by #23971 - it looks like split_sql_string doesn't handle correctly simple SQL queries (like select * from default.a_events limit 10 that I'm using in tests). for this one I need to get more time debugging it

No worries @alexott - I will anyhow has to wait with rc4 for databricks till after this voting completes.

alexott commented 2 years ago

My concern that this change in the common-sql may affect other packages - I see it in the Drill, Exasol, Presto,

pdebelak commented 2 years ago

Hashicorp provider change appears to be working as expected for me.

eladkal commented 2 years ago

My concern that this change in the common-sql may affect other packages - I see it in the Drill, Exasol, Presto,

If there is a dag/operator you want to verify with Presto you can add it here and I'll check

alexott commented 2 years ago

I don’t have something to test, but I’m concerned that if it broke databricks sql, may it break other as well?

ankurbajaj9 commented 2 years ago

Webhdfs worked for me

potiuk commented 2 years ago

I don’t have something to test, but I’m concerned that if it broke databricks sql, may it break other as well?

@alexott Can you mae a PR fixing it in databricks so that we can see how the problem manifests? I can take a look at others and asses if the potential of breaking it for other providers is there?

alexott commented 2 years ago

Yes, will do, most probable on Saturday...

potiuk commented 2 years ago

Just looked it up @alexott -> I do not think it is breaking other providers (@kazanzhy to confirm).

Only Databricks and Snowflake hooks have split_statements set to True by defauilt (because historically they were doing the split). All the others have split_statements = False as default - so the change is not breaking, for them. It might not work to split some statements from some DBs using the common method introduced by Dmitro - but it's not breaking as this is a new feature for them. The snowflake one actually does not use this new method - it continues to use it's own snowflake-internal "util.split_statements" so it is unaffected. The Databricks one is the only one with default split_statements = True using this common method.

BTW. Loking at the change I think the problem might be when the query contains ; followed by whitespace and EOL after. The old regexp and .strip() would remove such "empty" statement where the new one would likely not do it.

This is the method introduced:

    @staticmethod
    def split_sql_string(sql: str) -> List[str]:
        """
        Splits string into multiple SQL expressions

        :param sql: SQL string potentially consisting of multiple expressions
        :return: list of individual expressions
        """
        splits = sqlparse.split(sqlparse.format(sql, strip_comments=True))
        statements = [s.rstrip(';') for s in splits if s.endswith(';')]
        return statements
alexott commented 2 years ago

Thank you for looking into it. I'll debug it. My query is just single select without ;, and split_sql_string returns empty list for it

potiuk commented 2 years ago

Right - I see. I think the mistake is that it should be (@kazanzhy ?) :

statements = [s.rstrip(';') if s.endswith(';') else s.strip() for s in splits if s.strip() != ""]

That would actually make me think to remove common.sql and all the dependent packages and release rc4 together because indeed any query without ";" passed with "split_statement" will not work, which makes it quite problematic.

Update: added handling whitespace that "potentially" might bre returned (though this is just defensive -> sqlparse.split() should handle it, but better to be safe than sorry. Also whether this is a buf or not depends a bit on sqlparse's behaviour.

potiuk commented 2 years ago

Yep. Confirmed this looks like a bug for all SQL - probably safer to make rc4 for all of them. Thanks @alexott for being vigilant :) - @kazanzhy - will you have time to take a look and double-check my findings and fix it before Monday ?

>>> sqlparse.split(sqlparse.format('select * from 1', strip_comments=True))
['select * from 1']
>>> splits = sqlparse.split(sqlparse.format('select * from 1', strip_comments=True))
>>> print(splits)
['select * from 1']
>>> splits = sqlparse.split(sqlparse.format('select * from 1', strip_comments=True))
>>> print(splits)
['select * from 1']
>>> [s.rstrip(';') for s in splits if s.endswith(';')]
[]
>>> [s.rstrip(';') if s.endswith(';') else s.strip() for s in splits if s.strip() != ""]
['select * from 1']
>>> 
potiuk commented 2 years ago

I tested all my changes (mostly checking if the code I moved around is there). Looking for more tests :)

potiuk commented 2 years ago

@kazanzhy -> I will remove the common.sql and relatedd providers to get RC4 and if I don't hear from you till Monday, I will attempt to fix the problem found by @alexott and prepare an RC4

alexott commented 2 years ago

@potiuk is the PR open for it? If yes, I can test it tomorrow morning...

pankajastro commented 2 years ago

Tested https://github.com/apache/airflow/pull/25619 working as expected

dwreeves commented 2 years ago

I'm currently testing my implementation for the Amazon Provider package change #25432.

The one thing I have noticed so far is that the Connection object returned is missing a conn_id. Oops! I will implement a PR that adds it in + tests that it's there.

I didn't notice any other issues.

Another thing I learned while testing-- MWAA (AWS managed Airflow service) locks to the provider package version 2.4.0. It may be worth double checking the documentation clarifies the requirement to not URL-encode is a new 5.0.0 feature. There may be a lot of overlap between people using the provider package and people using MWAA. They may find it confusing to see the latest version of the documentation mentions you can do something that doesn't work in their environment.

potiuk commented 2 years ago

Another thing I learned while testing-- MWAA (AWS managed Airflow service) locks to the provider package version 2.4.0. It may be worth double checking the documentation clarifies the requirement to not URL-encode is a new 5.0.0 feature. There may be a lot of overlap between people using the provider package and people using MWAA. They may find it confusing to see the latest version of the documentation mentions you can do something that doesn't work in their environment.

The nice thin that the docs in providers is nice linked in the UI to the version that is installed. I think we also have nice changelog describing the difference, I am not sure if we need to do more. But any docs clarifications are welcome :)

potiuk commented 2 years ago

FYI. Vote is closed. I wil be removing databricks and also other SQL providers (that depend on common-sql-1.1) from the vote and prepare RC4 after fixing the problems found

potiuk commented 2 years ago

Closing the issue - thanks for help Everyone!