apache / airflow

Apache Airflow - A platform to programmatically author, schedule, and monitor workflows
https://airflow.apache.org/
Apache License 2.0
36.98k stars 14.27k forks source link

google.api_core.exceptions.Unknown: None Stream removed (Snowflake and GCP Secret Manager) #16590

Closed nazarsh closed 3 years ago

nazarsh commented 3 years ago

Apache Airflow version: 2.1.0

Kubernetes version (if you are using kubernetes) (use kubectl version): N/A

Environment:

What happened:

Having configured Snowflake connection and pointing to GCP Secret Manager backend AIRFLOW__SECRETS__BACKEND=airflow.providers.google.cloud.secrets.secret_manager.CloudSecretManagerBackend I am getting a pretty consistent error traced all the way down to gRPC

    return callable_(*args, **kwargs)
  File "/usr/local/lib/python3.7/site-packages/grpc/_channel.py", line 946, in __call__
    return _end_unary_response_blocking(state, call, False, None)
  File "/usr/local/lib/python3.7/site-packages/grpc/_channel.py", line 849, in _end_unary_response_blocking
    raise _InactiveRpcError(state)
grpc._channel._InactiveRpcError: <_InactiveRpcError of RPC that terminated with:
    status = StatusCode.UNKNOWN
    details = "Stream removed"
    debug_error_string = "{"created":"@1624370913.481874500","description":"Error received from peer ipv4:172.xxx.xx.xxx:443","file":"src/core/lib/surface/call.cc","file_line":1067,"grpc_message":"Stream removed","grpc_status":2}"
>

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1137, in _run_raw_task
    self._prepare_and_execute_task_with_callbacks(context, task)
  File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1311, in _prepare_and_execute_task_with_callbacks
    result = self._execute_task(context, task_copy)
  File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1341, in _execute_task
    result = task_copy.execute(context=context)
  File "/usr/local/lib/python3.7/site-packages/airflow/operators/python.py", line 150, in execute
    return_value = self.execute_callable()
  File "/usr/local/lib/python3.7/site-packages/airflow/operators/python.py", line 161, in execute_callable
    return self.python_callable(*self.op_args, **self.op_kwargs)
  File "/usr/local/airflow/dags/qe/weekly.py", line 63, in snfk_hook
    df = hook.get_pandas_df(sql)
  File "/usr/local/lib/python3.7/site-packages/airflow/hooks/dbapi.py", line 116, in get_pandas_df
    with closing(self.get_conn()) as conn:
  File "/usr/local/lib/python3.7/site-packages/airflow/providers/snowflake/hooks/snowflake.py", line 220, in get_conn
    conn_config = self._get_conn_params()
  File "/usr/local/lib/python3.7/site-packages/airflow/providers/snowflake/hooks/snowflake.py", line 152, in _get_conn_params
    self.snowflake_conn_id  # type: ignore[attr-defined] # pylint: disable=no-member
  File "/usr/local/lib/python3.7/site-packages/airflow/hooks/base.py", line 67, in get_connection
    conn = Connection.get_connection_from_secrets(conn_id)
  File "/usr/local/lib/python3.7/site-packages/airflow/models/connection.py", line 376, in get_connection_from_secrets
    conn = secrets_backend.get_connection(conn_id=conn_id)
  File "/usr/local/lib/python3.7/site-packages/airflow/secrets/base_secrets.py", line 64, in get_connection
    conn_uri = self.get_conn_uri(conn_id=conn_id)
  File "/usr/local/lib/python3.7/site-packages/airflow/providers/google/cloud/secrets/secret_manager.py", line 134, in get_conn_uri
    return self._get_secret(self.connections_prefix, conn_id)
  File "/usr/local/lib/python3.7/site-packages/airflow/providers/google/cloud/secrets/secret_manager.py", line 170, in _get_secret
    return self.client.get_secret(secret_id=secret_id, project_id=self.project_id)
  File "/usr/local/lib/python3.7/site-packages/airflow/providers/google/cloud/_internal_client/secret_manager_client.py", line 86, in get_secret
    response = self.client.access_secret_version(name)
  File "/usr/local/lib/python3.7/site-packages/google/cloud/secretmanager_v1/gapic/secret_manager_service_client.py", line 968, in access_secret_version
    request, retry=retry, timeout=timeout, metadata=metadata
  File "/usr/local/lib/python3.7/site-packages/google/api_core/gapic_v1/method.py", line 145, in __call__
    return wrapped_func(*args, **kwargs)
  File "/usr/local/lib/python3.7/site-packages/google/api_core/retry.py", line 286, in retry_wrapped_func
    on_error=on_error,
  File "/usr/local/lib/python3.7/site-packages/google/api_core/retry.py", line 184, in retry_target
    return target()
  File "/usr/local/lib/python3.7/site-packages/google/api_core/timeout.py", line 214, in func_with_timeout
    return func(*args, **kwargs)
  File "/usr/local/lib/python3.7/site-packages/google/api_core/grpc_helpers.py", line 59, in error_remapped_callable
    six.raise_from(exceptions.from_grpc_error(exc), exc)
  File "<string>", line 3, in raise_from
google.api_core.exceptions.Unknown: None Stream removed

What you expected to happen:

DAG successfully retrieves a configured connection for Snowflake from GCP Secret Manager and executes a query returning back a result.

How to reproduce it:

  1. Configure Google Cloud Platform as secrets backend AIRFLOW__SECRETS__BACKEND=airflow.providers.google.cloud.secrets.secret_manager.CloudSecretManagerBackend
  2. Configure a Snowflake connection (requirements.txt has apache-airflow-providers-snowflake)
  3. Create a DAG which uses SnowflakeHook similar to this:
import logging

import airflow
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.contrib.hooks.snowflake_hook import SnowflakeHook
from airflow.contrib.operators.snowflake_operator import SnowflakeOperator

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

args = {"owner": "Airflow", "start_date": airflow.utils.dates.days_ago(2)}

dag = DAG(
    dag_id="snowflake_automation", default_args=args, schedule_interval=None
)

snowflake_query = [
    """create table public.test_employee (id number, name string);""",
    """insert into public.test_employee values(1, “Sam”),(2, “Andy”),(3, “Gill”);""",
]

def get_row_count(**context):
    dwh_hook = SnowflakeHook(snowflake_conn_id="snowflake_conn")
    result = dwh_hook.get_first("select count(*) from public.test_employee")
    logging.info("Number of rows in `public.test_employee`  - %s", result[0])

with dag:
    create_insert = SnowflakeOperator(
        task_id="snowfalke_create",
        sql=snowflake_query ,
        snowflake_conn_id="snowflake_conn",
    )

    get_count = PythonOperator(task_id="get_count", python_callable=get_row_count)

create_insert >> get_count

Anything else we need to know:

I looked around to see if this is an issue with Google's api-core and it seems like somebody has done research into it to point out that it might be downstream implementation issue and not the api-core issue: https://stackoverflow.com/questions/67374613/why-does-accessing-this-variable-fail-after-it-is-used-in-a-thread

boring-cyborg[bot] commented 3 years ago

Thanks for opening your first issue here! Be sure to follow the issue template!

potiuk commented 3 years ago

I am afraid there isn't much we can do in Airflow to fix it. Closing it.