dbt-labs / dbt-redshift

dbt-redshift contains all of the code enabling dbt to work with Amazon Redshift
https://getdbt.com
Apache License 2.0
95 stars 58 forks source link

[ADAP-670] [Regression] `communication error` in 1.5 #518

Closed falonso-alo closed 1 year ago

falonso-alo commented 1 year ago

Is this a regression in a recent version of dbt-redshift?

Current Behavior

dbt compile fails with a "communication error"/OSError using DBT 1.5.

image

Expected/Previous Behavior

dbt compile succeeds using DBT 1.4

image

Steps To Reproduce

  1. Install DBT 1.5
  2. Run dbt compile

Relevant log output

No response

Environment

- OS: MacOS 13.3.1 (a) and Ubuntu
- Python: 3.10.5
- dbt-core (working version): 1.4.6
- dbt-redshift (working version): 1.4.6
- dbt-core (regression version): 1.5.2.
- dbt-redshift (regression version): 1.5.0
pip packages (working) ``` Package Version ------------------------ -------- agate 1.7.0 annotated-types 0.5.0 appdirs 1.4.4 asn1crypto 1.5.1 attrs 23.1.0 Babel 2.12.1 beautifulsoup4 4.12.2 betterproto 1.2.5 black 23.3.0 boto3 1.26.165 botocore 1.29.165 certifi 2023.5.7 cffi 1.15.1 chardet 5.1.0 charset-normalizer 3.1.0 click 8.1.3 click-config-file 0.6.0 colorama 0.4.6 configobj 5.0.8 dataclasses-json 0.5.4 dbt-core 1.4.6 dbt-extractor 0.4.1 dbt-postgres 1.4.6 dbt-redshift 1.4.0 decorator 5.1.1 diff-cover 7.6.0 exceptiongroup 1.1.2 future 0.18.3 graphql-core 3.2.3 grpclib 0.4.5 gunicorn 20.1.0 h2 4.1.0 hologram 0.0.16 hpack 4.0.0 hyperframe 6.0.1 idna 3.4 iniconfig 2.0.0 isodate 0.6.1 Jinja2 3.1.2 jinja2-simple-tags 0.5.0 jmespath 1.0.1 jsonschema 4.17.3 leather 0.3.4 Logbook 1.5.3 lxml 4.9.3 markdown-it-py 3.0.0 MarkupSafe 2.0.1 marshmallow 3.19.0 marshmallow-enum 1.5.1 mashumaro 3.3.1 mdurl 0.1.2 minimal-snowplow-tracker 0.0.2 montecarlodata 0.40.2 msgpack 1.0.5 multidict 6.0.4 mypy-extensions 1.0.0 networkx 2.8.8 packaging 23.1 parsedatetime 2.4 pathspec 0.10.3 pip 22.0.4 platformdirs 3.8.0 pluggy 1.2.0 protobuf 4.23.3 psycopg2-binary 2.9.6 py 1.11.0 pycarlo 0.7.1 pycparser 2.21 pydantic 2.0.2 pydantic_core 2.1.2 Pygments 2.15.1 pyrsistent 0.19.3 pytest 7.4.0 python-box 7.0.1 python-dateutil 2.8.2 python-slugify 8.0.1 pytimeparse 1.1.8 pytz 2023.3 PyYAML 6.0 redshift-connector 2.0.912 regex 2023.6.3 requests 2.31.0 retry 0.9.2 rich 13.4.2 ruamel.yaml 0.17.32 ruamel.yaml.clib 0.2.7 s3transfer 0.6.1 scramp 1.4.4 setuptools 58.1.0 sgqlc 14.1 shandy-sqlfmt 0.19.0 six 1.16.0 soupsieve 2.4.1 sqlfluff 2.1.2 sqlfluff-templater-dbt 2.1.2 sqlparse 0.4.3 stringcase 1.2.0 tabulate 0.9.0 tblib 2.0.0 text-unidecode 1.3 toml 0.10.2 tomli 2.0.1 tqdm 4.65.0 typing_extensions 4.7.1 typing-inspect 0.9.0 urllib3 1.26.16 Werkzeug 2.1.2 ```
pip packages (regression) ``` Package Version ------------------------ -------- agate 1.7.0 annotated-types 0.5.0 appdirs 1.4.4 asn1crypto 1.5.1 attrs 23.1.0 Babel 2.12.1 beautifulsoup4 4.12.2 betterproto 1.2.5 black 23.3.0 boto3 1.26.165 botocore 1.29.165 certifi 2023.5.7 cffi 1.15.1 chardet 5.1.0 charset-normalizer 3.1.0 click 8.1.3 click-config-file 0.6.0 colorama 0.4.6 configobj 5.0.8 dataclasses-json 0.5.4 dbt-core 1.5.2 dbt-extractor 0.4.1 dbt-postgres 1.5.2 dbt-redshift 1.5.0 decorator 5.1.1 diff-cover 7.6.0 exceptiongroup 1.1.2 future 0.18.3 graphql-core 3.2.3 grpclib 0.4.5 gunicorn 20.1.0 h2 4.1.0 hologram 0.0.16 hpack 4.0.0 hyperframe 6.0.1 idna 3.4 iniconfig 2.0.0 isodate 0.6.1 Jinja2 3.1.2 jinja2-simple-tags 0.5.0 jmespath 1.0.1 jsonschema 4.17.3 leather 0.3.4 Logbook 1.5.3 lxml 4.9.3 markdown-it-py 3.0.0 MarkupSafe 2.0.1 marshmallow 3.19.0 marshmallow-enum 1.5.1 mashumaro 3.6 mdurl 0.1.2 minimal-snowplow-tracker 0.0.2 montecarlodata 0.40.2 msgpack 1.0.5 multidict 6.0.4 mypy-extensions 1.0.0 networkx 2.8.8 packaging 23.1 parsedatetime 2.4 pathspec 0.10.3 pip 22.0.4 platformdirs 3.8.0 pluggy 1.2.0 protobuf 4.23.3 psycopg2-binary 2.9.6 py 1.11.0 pycarlo 0.7.1 pycparser 2.21 pydantic 2.0.2 pydantic_core 2.1.2 Pygments 2.15.1 pyrsistent 0.19.3 pytest 7.4.0 python-box 7.0.1 python-dateutil 2.8.2 python-slugify 8.0.1 pytimeparse 1.1.8 pytz 2023.3 PyYAML 6.0 redshift-connector 2.0.912 regex 2023.6.3 requests 2.31.0 retry 0.9.2 rich 13.4.2 ruamel.yaml 0.17.32 ruamel.yaml.clib 0.2.7 s3transfer 0.6.1 scramp 1.4.4 setuptools 58.1.0 sgqlc 14.1 shandy-sqlfmt 0.19.0 six 1.16.0 soupsieve 2.4.1 sqlfluff 2.1.2 sqlfluff-templater-dbt 2.1.2 sqlparse 0.4.3 stringcase 1.2.0 tabulate 0.9.0 tblib 2.0.0 text-unidecode 1.3 toml 0.10.2 tomli 2.0.1 tqdm 4.65.0 typing_extensions 4.7.1 typing-inspect 0.9.0 urllib3 1.26.16 Werkzeug 2.1.2 ```

Additional Context

Tested in a venv.

fivetran-joemarkiewicz commented 1 year ago

Just wanted to chime in mentioning that I am seeing this error as well and can confirm that downgrading to 1.4 results in no communication error.

fabientra commented 1 year ago

Hello, I am seeing the same issue but without upgrading to 1.5. It was working perfectly fine on 1.5 until a few hours ago. Taking a closer look to the packages installed I see that the only difference is a change of the redshift-connector from redshift-connector-2.0.911 to redshift-connector-2.0.912.

EDIT: I confirm that downgrading to redshift-connector-2.0.911 after installing dbt-redshift==1.5.5 fixed the issue

dataders commented 1 year ago

@brooke-white, do you have any idea what might have changed in the latest patch that was released today?

EDIT: I confirm that downgrading to redshift-connector-2.0.911 after installing dbt-redshift==1.5.5 fixed the issue

david-beallor commented 1 year ago

Hello, I am seeing the same issue but without upgrading to 1.5. It was working perfectly fine on 1.5 until a few hours ago. Taking a closer look to the packages installed I see that the only difference is a change of the redshift-connector from redshift-connector-2.0.911 to redshift-connector-2.0.912.

EDIT: I confirm that downgrading to redshift-connector-2.0.911 after installing dbt-redshift==1.5.5 fixed the issue

Confirming that downgrading to redshift-connector-2.0.911 worked for my team too

tanmay-kulkarni commented 1 year ago

This issue also exists in dbt-redshift 1.5.6. On running dbt debug, I get the following error. image

jan-benisek commented 1 year ago

The exact same issue on 1.5.5 when running dbt compile, just today. I'll try to follow the advice here and downgrade the package.

Brooke-white commented 1 year ago

Hi, if anyone experiencing this issue could enable redshift-connector's logging and provide the logs it'd be a great help in the redshift-connector team's investigation of this issue. We're working to try and reproduce on our end and will cut a release as soon as a fix is identified. Thanks!

Datarogie commented 1 year ago

The issue also exists on dbt-core 1.5.2 with dbt-redshift 1.5.6 which appears to install redshift-connector==2.0.912. Confirmed setting redshift-connector to 2.0.911 like others above have mentioned works to resolve this issue.

Does seem to hang a lot long at the initial connection after starting a dbt task but once it starts running it works.

mikealfare commented 1 year ago

@Brooke-white Here's what we saw in our CI:

click to expand ``` self = user = '***', password = '***', database = '***' host = '***', port = *** source_address = None, unix_sock = None, ssl = True, sslmode = 'verify-ca' timeout = None, max_prepared_statements = 1000, tcp_keepalive = True application_name = 'dbt.adapters.base.connections', replication = None client_protocol_version = 2, database_metadata_current_db_only = True credentials_provider = None, provider_name = None, web_identity_token = None numeric_to_float = False def __init__( self: "Connection", user: str, password: str, database: str, host: str = "localhost", port: int = ***, source_address: typing.Optional[str] = None, unix_sock: typing.Optional[str] = None, ssl: bool = True, sslmode: str = "verify-ca", timeout: typing.Optional[int] = None, max_prepared_statements: int = 1000, tcp_keepalive: typing.Optional[bool] = True, application_name: typing.Optional[str] = None, replication: typing.Optional[str] = None, client_protocol_version: int = DEFAULT_PROTOCOL_VERSION, database_metadata_current_db_only: bool = True, credentials_provider: typing.Optional[str] = None, provider_name: typing.Optional[str] = None, web_identity_token: typing.Optional[str] = None, numeric_to_float: bool = False, ): """ Creates a :class:`Connection` to an Amazon Redshift cluster. For more information on establishing a connection to an Amazon Redshift cluster using `federated API access `_ see our examples page. This is the underlying :class:`Connection` constructor called from :func:`redshift_connector.connect`. Parameters ---------- user : str The username to use for authentication with the Amazon Redshift cluster. password : str The password to use for authentication with the Amazon Redshift cluster. database : str The name of the database instance to connect to. host : str The hostname of the Amazon Redshift cluster. port : int The port number of the Amazon Redshift cluster. Default value is ***. source_address : Optional[str] unix_sock : Optional[str] ssl : bool Is SSL enabled. Default value is ``True``. SSL must be enabled when authenticating using IAM. sslmode : str The security of the connection to the Amazon Redshift cluster. 'verify-ca' and 'verify-full' are supported. timeout : Optional[int] The number of seconds before the connection to the server will timeout. By default there is no timeout. max_prepared_statements : int tcp_keepalive : Optional[bool] Is `TCP keepalive `_ used. The default value is ``True``. application_name : Optional[str] Sets the application name. The default value is None. replication : Optional[str] Used to run in `streaming replication mode `_. client_protocol_version : int The requested server protocol version. The default value is 1 representing `EXTENDED_RESULT_METADATA`. If the requested server protocol cannot be satisfied, a warning will be displayed to the user. database_metadata_current_db_only : bool Is `datashare `_ disabled. Default value is True, implying datasharing will not be used. credentials_provider : Optional[str] The class-path of the IdP plugin used for authentication with Amazon Redshift. provider_name : Optional[str] The name of the Redshift Native Auth Provider. web_identity_token: Optional[str] A web identity token used for authentication via Redshift Native IDP Integration numeric_to_float: bool Spe***fies if NUMERIC datatype values will be converted from ``de***mal.De***mal`` to ``float``. By default NUMERIC values are received as ``de***mal.De***mal``. """ self.merge_socket_read = True _client_encoding = "utf8" self._commands_with_count: typing.Tuple[bytes, ...] = ( b"INSERT", b"DELETE", b"UPDATE", b"MOVE", b"FETCH", b"COPY", b"SELECT", ) self.notifications: deque = deque(maxlen=100) self.notices: deque = deque(maxlen=100) self.parameter_statuses: deque = deque(maxlen=100) self.max_prepared_statements: int = int(max_prepared_statements) self._run_cursor: Cursor = Cursor(self, paramstyle=DbApiParamstyle.NAMED.value) self._client_protocol_version: int = client_protocol_version self._database = database self.py_types = deepcopy(PY_TYPES) self.redshift_types = deepcopy(REDSHIFT_TYPES) self._database_metadata_current_db_only: bool = database_metadata_current_db_only self.numeric_to_float: bool = numeric_to_float # based on _client_protocol_version value, we must use different conversion functions # for receiving some datatypes self._enable_protocol_based_conversion_funcs() self.web_identity_token = web_identity_token if user is None: raise InterfaceError("The 'user' connection parameter cannot be None") redshift_native_auth: bool = False if application_name is None or application_name == "": def get_calling_module() -> str: import inspect module_name: str = "" stack: typing.List[inspect.FrameInfo] = inspect.stack() try: # get_calling_module -> init -> connect -> init -> calling module start: int = min(4, len(stack) - 1) parent = stack[start][0] calling_module = inspect.getmodule(parent) if calling_module: module_name = calling_module.__name__ except: pass finally: del parent del stack return module_name application_name = get_calling_module() init_params: typing.Dict[str, typing.Optional[typing.Union[str, bytes]]] = { "user": "", "database": database, "application_name": application_name, "replication": replication, "client_protocol_version": str(self._client_protocol_version), "driver_version": DriverInfo.driver_full_name(), "os_version": self.client_os_version, } if credentials_provider: init_params["plugin_name"] = credentials_provider if credentials_provider.split(".")[-1] in ( "BasicJwtCredentialsProvider", "BrowserAzureOAuth2CredentialsProvider", ): redshift_native_auth = True init_params["idp_type"] = "AzureAD" if provider_name: init_params["provider_name"] = provider_name if not redshift_native_auth or user: init_params["user"] = user _logger.debug(make_divider_block()) _logger.debug("Establishing a connection") _logger.debug(init_params) _logger.debug(make_divider_block()) for k, v in tuple(init_params.items()): if isinstance(v, str): init_params[k] = v.encode("utf8") elif v is None: del init_params[k] elif not isinstance(v, (bytes, bytearray)): raise InterfaceError("The parameter " + k + " can't be of type " + str(type(v)) + ".") if "user" in init_params: self.user: bytes = typing.cast(bytes, init_params["user"]) else: self.user = b"" if isinstance(password, str): self.password: bytes = password.encode("utf8") else: self.password = password self.autocommit: bool = False self._xid = None self._caches: typing.Dict = {} # Create the TCP/Ip socket and connect to spe***fic database # if there already has a socket, it will not create new connection when run connect again try: if unix_sock is None and host is not None: self._usock: typing.Union[socket.socket, "SSLSocket"] = socket.socket( socket.AF_INET, socket.SOCK_STREAM ) if source_address is not None: self._usock.bind((source_address, 0)) elif unix_sock is not None: if not hasattr(socket, "AF_UNIX"): raise InterfaceError("attempt to connect to unix socket on unsupported " "platform") self._usock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) else: raise ProgrammingError("one of host or unix_sock must be provided") if timeout is not None: self._usock.settimeout(timeout) if unix_sock is None and host is not None: > hostport: typing.Tuple[str, int] = Connection.__get_host_address_info(host, port) /home/runner/work/dbt-redshift/dbt-redshift/.tox/integration-redshift/lib/python3.10/site-packages/redshift_connector/core.py:610: _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ /home/runner/work/dbt-redshift/dbt-redshift/.tox/integration-redshift/lib/python3.10/site-packages/redshift_connector/core.py:403: in __get_host_address_info response = socket.getaddrinfo(host=host, port=port, family=socket.AF_INET) _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ host = '***', port = *** family = , type = 0, proto = 0, flags = 0 def getaddrinfo(host, port, family=0, type=0, proto=0, flags=0): """Resolve host and port into list of address info entries. Translate the host/port argument into a sequence of 5-tuples that contain all the necessary arguments for creating a socket connected to that service. host is a domain name, a string representation of an IPv4/v6 address or None. port is a string service name such as 'http', a numeric port number or None. By passing None as the value of host and port, you can pass NULL to the underlying C API. The family, type and proto arguments can be optionally spe***fied in order to narrow the list of addresses returned. Passing zero as a value for each of these arguments selects the full range of results. """ # We override this function since we want to translate the numeric family # and socket type values to enum constants. addrlist = [] > for res in _socket.getaddrinfo(host, port, family, type, proto, flags): E OSError: Int or String expected /opt/hostedtoolcache/Python/3.10.12/x64/lib/python3.10/socket.py:955: OSError During handling of the above exception, another exception occurred: cls = connection = Connection(type='redshift', name='_test', state=, transaction_open=False, _handle=No...lse, connect_timeout=None, role=None, sslmode=, retries=6, region=None, autocommit=True)) connect = .connect at 0x7f1885e49240> logger = AdapterLogger(name='Redshift') retryable_exceptions = [, , ] retry_limit = 6 retry_timeout = .exponential_backoff at 0x7f1885e49000> _attempts = 0 @classmethod def retry_connection( cls, connection: Connection, connect: Callable[[], AdapterHandle], logger: AdapterLogger, retryable_exceptions: Iterable[Type[Exception]], retry_limit: int = 1, retry_timeout: Union[Callable[[int], SleepTime], SleepTime] = 1, _attempts: int = 0, ) -> Connection: """Given a Connection, set its handle by calling connect. The calls to connect will be retried up to retry_limit times to deal with transient connection errors. By default, one retry will be attempted if retryable_exceptions is set. :param Connection connection: An instance of a Connection that needs a handle to be set, usually when attempting to open it. :param connect: A callable that returns the appropiate connection handle for a given adapter. This callable will be retried retry_limit times if a subclass of any Exception in retryable_exceptions is raised by connect. :type connect: Callable[[], AdapterHandle] :param AdapterLogger logger: A logger to emit messages on retry attempts or errors. When handling expected errors, we call debug, and call warning on unexpected errors or when all retry attempts have been exhausted. :param retryable_exceptions: An iterable of exception classes that if raised by connect should trigger a retry. :type retryable_exceptions: Iterable[Type[Exception]] :param int retry_limit: How many times to retry the call to connect. If this limit is exceeded before a successful call, a FailedToConnectError will be raised. Must be non-negative. :param retry_timeout: Time to wait between attempts to connect. Can also take a Callable that takes the number of attempts so far, beginning at 0, and returns an int or float to be passed to time.sleep. :type retry_timeout: Union[Callable[[int], SleepTime], SleepTime] = 1 :param int _attempts: Parameter used to keep track of the number of attempts in calling the connect function across recursive calls. Passed as an argument to retry_timeout if it is a Callable. This parameter should not be set by the initial caller. :raises dbt.exceptions.FailedToConnectError: Upon exhausting all retry attempts without successfully acquiring a handle. :return: The given connection with its appropriate state and handle attributes set depending on whether we successfully acquired a handle or not. """ timeout = retry_timeout(_attempts) if callable(retry_timeout) else retry_timeout if timeout < 0: raise dbt.exceptions.FailedToConnectError( "retry_timeout cannot be negative or return a negative time." ) if retry_limit < 0 or retry_limit > sys.getrecursionlimit(): # This guard is not perfect others may add to the recursion limit (e.g. built-ins). connection.handle = None connection.state = ConnectionState.FAIL raise dbt.exceptions.FailedToConnectError("retry_limit cannot be negative") try: > connection.handle = connect() /home/runner/work/dbt-redshift/dbt-redshift/.tox/integration-redshift/lib/python3.10/site-packages/dbt/adapters/base/connections.py:241: _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ /home/runner/work/dbt-redshift/dbt-redshift/dbt/adapters/redshift/connections.py:204: in connect c = redshift_connector.connect( /home/runner/work/dbt-redshift/dbt-redshift/.tox/integration-redshift/lib/python3.10/site-packages/redshift_connector/__init__.py:344: in connect return Connection( _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ self = user = '***', password = '***', database = '***' host = '***', port = *** source_address = None, unix_sock = None, ssl = True, sslmode = 'verify-ca' timeout = None, max_prepared_statements = 1000, tcp_keepalive = True application_name = 'dbt.adapters.base.connections', replication = None client_protocol_version = 2, database_metadata_current_db_only = True credentials_provider = None, provider_name = None, web_identity_token = None numeric_to_float = False def __init__( self: "Connection", user: str, password: str, database: str, host: str = "localhost", port: int = ***, source_address: typing.Optional[str] = None, unix_sock: typing.Optional[str] = None, ssl: bool = True, sslmode: str = "verify-ca", timeout: typing.Optional[int] = None, max_prepared_statements: int = 1000, tcp_keepalive: typing.Optional[bool] = True, application_name: typing.Optional[str] = None, replication: typing.Optional[str] = None, client_protocol_version: int = DEFAULT_PROTOCOL_VERSION, database_metadata_current_db_only: bool = True, credentials_provider: typing.Optional[str] = None, provider_name: typing.Optional[str] = None, web_identity_token: typing.Optional[str] = None, numeric_to_float: bool = False, ): """ Creates a :class:`Connection` to an Amazon Redshift cluster. For more information on establishing a connection to an Amazon Redshift cluster using `federated API access `_ see our examples page. This is the underlying :class:`Connection` constructor called from :func:`redshift_connector.connect`. Parameters ---------- user : str The username to use for authentication with the Amazon Redshift cluster. password : str The password to use for authentication with the Amazon Redshift cluster. database : str The name of the database instance to connect to. host : str The hostname of the Amazon Redshift cluster. port : int The port number of the Amazon Redshift cluster. Default value is ***. source_address : Optional[str] unix_sock : Optional[str] ssl : bool Is SSL enabled. Default value is ``True``. SSL must be enabled when authenticating using IAM. sslmode : str The security of the connection to the Amazon Redshift cluster. 'verify-ca' and 'verify-full' are supported. timeout : Optional[int] The number of seconds before the connection to the server will timeout. By default there is no timeout. max_prepared_statements : int tcp_keepalive : Optional[bool] Is `TCP keepalive `_ used. The default value is ``True``. application_name : Optional[str] Sets the application name. The default value is None. replication : Optional[str] Used to run in `streaming replication mode `_. client_protocol_version : int The requested server protocol version. The default value is 1 representing `EXTENDED_RESULT_METADATA`. If the requested server protocol cannot be satisfied, a warning will be displayed to the user. database_metadata_current_db_only : bool Is `datashare `_ disabled. Default value is True, implying datasharing will not be used. credentials_provider : Optional[str] The class-path of the IdP plugin used for authentication with Amazon Redshift. provider_name : Optional[str] The name of the Redshift Native Auth Provider. web_identity_token: Optional[str] A web identity token used for authentication via Redshift Native IDP Integration numeric_to_float: bool Spe***fies if NUMERIC datatype values will be converted from ``de***mal.De***mal`` to ``float``. By default NUMERIC values are received as ``de***mal.De***mal``. """ self.merge_socket_read = True _client_encoding = "utf8" self._commands_with_count: typing.Tuple[bytes, ...] = ( b"INSERT", b"DELETE", b"UPDATE", b"MOVE", b"FETCH", b"COPY", b"SELECT", ) self.notifications: deque = deque(maxlen=100) self.notices: deque = deque(maxlen=100) self.parameter_statuses: deque = deque(maxlen=100) self.max_prepared_statements: int = int(max_prepared_statements) self._run_cursor: Cursor = Cursor(self, paramstyle=DbApiParamstyle.NAMED.value) self._client_protocol_version: int = client_protocol_version self._database = database self.py_types = deepcopy(PY_TYPES) self.redshift_types = deepcopy(REDSHIFT_TYPES) self._database_metadata_current_db_only: bool = database_metadata_current_db_only self.numeric_to_float: bool = numeric_to_float # based on _client_protocol_version value, we must use different conversion functions # for receiving some datatypes self._enable_protocol_based_conversion_funcs() self.web_identity_token = web_identity_token if user is None: raise InterfaceError("The 'user' connection parameter cannot be None") redshift_native_auth: bool = False if application_name is None or application_name == "": def get_calling_module() -> str: import inspect module_name: str = "" stack: typing.List[inspect.FrameInfo] = inspect.stack() try: # get_calling_module -> init -> connect -> init -> calling module start: int = min(4, len(stack) - 1) parent = stack[start][0] calling_module = inspect.getmodule(parent) if calling_module: module_name = calling_module.__name__ except: pass finally: del parent del stack return module_name application_name = get_calling_module() init_params: typing.Dict[str, typing.Optional[typing.Union[str, bytes]]] = { "user": "", "database": database, "application_name": application_name, "replication": replication, "client_protocol_version": str(self._client_protocol_version), "driver_version": DriverInfo.driver_full_name(), "os_version": self.client_os_version, } if credentials_provider: init_params["plugin_name"] = credentials_provider if credentials_provider.split(".")[-1] in ( "BasicJwtCredentialsProvider", "BrowserAzureOAuth2CredentialsProvider", ): redshift_native_auth = True init_params["idp_type"] = "AzureAD" if provider_name: init_params["provider_name"] = provider_name if not redshift_native_auth or user: init_params["user"] = user _logger.debug(make_divider_block()) _logger.debug("Establishing a connection") _logger.debug(init_params) _logger.debug(make_divider_block()) for k, v in tuple(init_params.items()): if isinstance(v, str): init_params[k] = v.encode("utf8") elif v is None: del init_params[k] elif not isinstance(v, (bytes, bytearray)): raise InterfaceError("The parameter " + k + " can't be of type " + str(type(v)) + ".") if "user" in init_params: self.user: bytes = typing.cast(bytes, init_params["user"]) else: self.user = b"" if isinstance(password, str): self.password: bytes = password.encode("utf8") else: self.password = password self.autocommit: bool = False self._xid = None self._caches: typing.Dict = {} # Create the TCP/Ip socket and connect to spe***fic database # if there already has a socket, it will not create new connection when run connect again try: if unix_sock is None and host is not None: self._usock: typing.Union[socket.socket, "SSLSocket"] = socket.socket( socket.AF_INET, socket.SOCK_STREAM ) if source_address is not None: self._usock.bind((source_address, 0)) elif unix_sock is not None: if not hasattr(socket, "AF_UNIX"): raise InterfaceError("attempt to connect to unix socket on unsupported " "platform") self._usock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) else: raise ProgrammingError("one of host or unix_sock must be provided") if timeout is not None: self._usock.settimeout(timeout) if unix_sock is None and host is not None: hostport: typing.Tuple[str, int] = Connection.__get_host_address_info(host, port) _logger.debug( "Attempting to create connection socket with address {} {}".format(hostport[0], str(hostport[1])) ) self._usock.connect(hostport) elif unix_sock is not None: self._usock.connect(unix_sock) # For Redshift, we the default ssl approve is True # create ssl connection with Redshift CA certificates and check the hostname if ssl is True: try: from ssl import CERT_REQUIRED, SSLContext # ssl_context = ssl.create_default_context() path = os.path.abspath(__file__) if os.name == "nt": path = "\\".join(path.split("\\")[:-1]) + "\\files\\redshift-ca-bundle.crt" else: path = "/".join(path.split("/")[:-1]) + "/files/redshift-ca-bundle.crt" ssl_context: SSLContext = SSLContext() ssl_context.verify_mode = CERT_REQUIRED ssl_context.load_default_certs() ssl_context.load_verify_locations(path) # Int32(8) - Message length, including self. # Int32(80877103) - The SSL request code. self._usock.sendall(ii_pack(8, 80877103)) resp: bytes = self._usock.recv(1) if resp != b"S": _logger.debug( "Server response code when attempting to establish ssl connection: {!r}".format(resp) ) raise InterfaceError("Server refuses SSL") if sslmode == "verify-ca": self._usock = ssl_context.wrap_socket(self._usock) elif sslmode == "verify-full": ssl_context.check_hostname = True self._usock = ssl_context.wrap_socket(self._usock, server_hostname=host) except ImportError: raise InterfaceError("SSL required but ssl module not available in " "this python installation") self._sock: typing.Optional[typing.BinaryIO] = self._usock.makefile(mode="rwb") if tcp_keepalive: self._usock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) except socket.error as e: self._usock.close() > raise InterfaceError("communication error", e) E redshift_connector.error.InterfaceError: ('communication error', OSError('Int or String expected')) /home/runner/work/dbt-redshift/dbt-redshift/.tox/integration-redshift/lib/python3.10/site-packages/redshift_connector/core.py:661: InterfaceError ```
Brooke-white commented 1 year ago

Thanks so much, that was extremely helpful :)

I reviewed the trace and was able to reproduce the issue The root cause is this line in DBT. I see the type of port passed to redshift-connector is dbt.type_helpers.Port rather than int. A simple fix is to change the line to apply a int case to self.credentials.port:

            "port": int(self.credentials.port) if self.credentials.port else 5439,
Brooke-white commented 1 year ago

for those interested, here is my repro:

from dbt.helper_types import Port
import redshift_connector

with redshift_connector.connect(
    user='***',
    host='***',
    password='***',
    database='***',
    port = Port(5439),
) as conn:
      with conn.cursor() as cursor:
        cursor.execute("select 1")
        print(cursor.fetchone())

here's my verification of fix:

from dbt.helper_types import Port
import redshift_connector

with redshift_connector.connect(
    user='***',
    host='***',
    password='***',
    database='***',
    port = int(Port(5439)),
) as conn:
      with conn.cursor() as cursor:
        cursor.execute("select 1")
        print(cursor.fetchone())

happy to open a PR if you folks are ok with this fix.

dataders commented 1 year ago

The root cause is this line in DBT.

if you'll humor me being overly pedantic, this isn't the "root" cause per se. The root cause would be where in the changes issues in 2.0.912 that caused a breaking change in dbt. This code was previously working for months.

silly example:

if I put a 60v battery into a 20v cordless drill and it shorts the drills motor, it would be incorrect to point to the motor as root cause, as the root cause is the battery.

Brooke-white commented 1 year ago

While passing a dbt.helper_types.Port to the connect method's port parameter worked prior to 2.0.912, it isn't something redshift-connector supports per the docs.

falonso-alo commented 1 year ago

At the end of the day, we don't really need to assign blame anywhere. It's just important that we solve the problem.

Thank you, @Brooke-white!

mikealfare commented 1 year ago

Agreed with @falonso-alo, I don't think either side (I already don't like that connotation) should be assigning blame. This industry is toxic enough as it is and this work is hard enough as it is. With that in mind, I did some research to understand exactly what is going on and felt like sharing that information.

The initial confusion is coming from that fact that Port subclasses from int; in other words, isinstance(Port, int)==True. We are in fact passing an int, we're just not passing exactly an int.

In version v2.0.911, this line was used to connect via host/port: https://github.com/aws/amazon-redshift-python-driver/blob/2898d86de9bb6be1e3def88e15c78c9ea767ec52/redshift_connector/core.py#L596

There is no check there to see whether port is a proper int. As long as port can behave like an int, everyone's happy.

This line was updated in v2.0.912 to: https://github.com/aws/amazon-redshift-python-driver/blob/b2dde82ec9156e2adcc801ac54c051f3cfe61e33/redshift_connector/core.py#L610-L614

with reference to this new function: https://github.com/aws/amazon-redshift-python-driver/blob/b2dde82ec9156e2adcc801ac54c051f3cfe61e33/redshift_connector/core.py#L397-L409

The new function calls the builtin socket, which raises the OSError(Int or String expected) exception that we're seeing. This library must be looking for exactly an int. I stopped digging at that point as socket is implemented in C, making it a bit more difficult for me to investigate. While I didn't see the check, I'm willing to assume that there is a check for int in there.

Putting all of that aside, I have to imagine we're not the only group that is passing a subclass of int (or str for that matter) into the Connection class and expecting it to work (given the argument above). It might not be the primary usage pattern, but until v2.0.912 it appeared to be a valid one. And I'm sure you can appreciate the nuance that comes with sitting on top of code that you can't easily alter (such as socket in this scenario). So while we aren't passing a proper int into Connection, it might be useful for those downstream (both us and other teams) to update this line:

https://github.com/aws/amazon-redshift-python-driver/blob/b2dde82ec9156e2adcc801ac54c051f3cfe61e33/redshift_connector/core.py#L403

to this:

response = socket.getaddrinfo(host=host, port=int(port), family=socket.AF_INET)

Such is life with dynamic typing in python.

jan-benisek commented 1 year ago

Hi, seems like the error is back. I am on dbt-core=1.5.1 and dbt-redshift=1.5.7.

06:02:48  Encountered an error while running operation: Database Error

  ('communication error', OSError('Int or String expected'))

script returned exit code 1

I see in my docker build that redshift-connector-2.0.913 is installed. That version was released 7hrs ago.

dataders commented 1 year ago

@jan-benisek shoot! I've opened #531 to fix this once we're working on a 1.5 patch release to fix this as we speak. I'll reply back here when the patch is out, my estimate is within the next four hours, but certainly in the next 24 hours.

Datarogie commented 1 year ago

Is it normal to have a stable release that doesn't include dependencies locked in? If that's even an options. 😅 Please excuse my lack of knowledge in app/package development, just generally curious as it'd be great to be able to trust a stable version means it AND the dependancies all work together and new versions of the current package or dependencies won't be integrated together until testing to confirm they're stable. However I can also see how this could be difficult to manage.

dataders commented 1 year ago

dbt-redshift 1.5.8 has now been released. I cannot stress enough how:

  1. how sorry we are to have had you all suffer a degraded reliability of the dbt-redshift adapter, and
  2. grateful we are for your patience and assistance!
jan-benisek commented 1 year ago

@dataders Thanks a lot for such a quick fix, all 🟢 ! And don't worry, we all have been there. Looking at your fixes, the python driver seems like not the easiest piece of software to work with.

mustafa0taru commented 5 months ago

hi @dataders, i'm currently facing this blocker. do i need to downgrade the dbt-redshift adapter?