cloudera / impyla

Python DB API 2.0 client for Impala and Hive (HiveServer2 protocol)
Apache License 2.0
725 stars 247 forks source link

impala.error.HiveServer2Error: Failed after retrying 3 times with SSL and broken pipe #356

Open alexjironkin opened 5 years ago

alexjironkin commented 5 years ago

We have intermittently been facing issues with:

impala.error.HiveServer2Error: Failed after retrying 3 times

Caused by:

  File "/opt/app-root/lib/python3.6/site-packages/thriftpy/transport/socket.py", line 129, in write
    self.sock.sendall(buff)
  File "/opt/rh/rh-python36/root/usr/lib64/python3.6/ssl.py", line 975, in sendall
    return socket.sendall(self, data, flags)
BrokenPipeError: [Errno 32] Broken pipe 

We are using this in conjunction with sqlalchemy pools and started using pre_ping option:

from flask_sqlalchemy import SQLAlchemy as _BaseSQLAlchemy

class SQLAlchemy(_BaseSQLAlchemy):
    """Custom Alchemy implementation to force to use pessimistic disconnection handling.
        See:
            * https://docs.sqlalchemy.org/en/13/core/pooling.html#disconnect-handling-pessimistic
            * https://github.com/pallets/flask-sqlalchemy/issues/589
    """
    def apply_pool_defaults(self, app, options):
        super().apply_pool_defaults(app, options)
        # Force pre ping on the pools. Generates one extra query however, handles stale connections.
        options["pool_pre_ping"] = True

However, this fails now and again. I have tracked the issue to:

Traceback (most recent call last):
  File "/opt/app-root/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 2262, in _wrap_pool_connect
    return fn()
  File "/opt/app-root/lib/python3.6/site-packages/sqlalchemy/pool/base.py", line 354, in connect
    return _ConnectionFairy._checkout(self)
  File "/opt/app-root/lib/python3.6/site-packages/sqlalchemy/pool/base.py", line 782, in _checkout
    result = pool._dialect.do_ping(fairy.connection)
  File "/opt/app-root/lib/python3.6/site-packages/sqlalchemy/engine/default.py", line 517, in do_ping
    cursor = dbapi_connection.cursor()
  File "/opt/app-root/lib/python3.6/site-packages/impala/hiveserver2.py", line 125, in cursor
    session = self.service.open_session(user, configuration)
  File "/opt/app-root/lib/python3.6/site-packages/impala/hiveserver2.py", line 995, in open_session
    resp = self._rpc('OpenSession', req)
  File "/opt/app-root/lib/python3.6/site-packages/impala/hiveserver2.py", line 923, in _rpc
    response = self._execute(func_name, request)
  File "/opt/app-root/lib/python3.6/site-packages/impala/hiveserver2.py", line 954, in _execute
    .format(self.retries))
impala.error.HiveServer2Error: Failed after retrying 3 times

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/opt/app-root/lib/python3.6/site-packages/flask/app.py", line 1813, in full_dispatch_request
    rv = self.dispatch_request()
  File "/opt/app-root/lib/python3.6/site-packages/flask/app.py", line 1799, in dispatch_request
    return self.view_functions[rule.endpoint](**req.view_args)
  File "/opt/app-root/lib/python3.6/site-packages/flask_restplus/api.py", line 325, in wrapper
    resp = resource(*args, **kwargs)
  File "/opt/app-root/lib/python3.6/site-packages/flask/views.py", line 88, in view
    return self.dispatch_request(*args, **kwargs)
  File "/opt/app-root/lib/python3.6/site-packages/flask_restplus/resource.py", line 44, in dispatch_request
    resp = meth(*args, **kwargs)
  File "/opt/app-root/lib/python3.6/site-packages/flask_restplus/marshalling.py", line 136, in wrapper
    resp = f(*args, **kwargs)
  File "/opt/app-root/src/sleuth_rest/webservice/auth/utils.py", line 92, in wrapper
    return func(*args, **kwargs)
  File "/opt/app-root/src/sleuth_rest/webservice/trader_risk/resources.py", line 83, in get
    end_date=args['endDate'], trader_no_list=args.get('traderNoList'))
  File "/opt/app-root/src/sleuth_rest/core/references/datalake/traders.py", line 142, in trader_risk_scores
    risk_scores = result_as_dicts(risk_scores_query.all())
  File "/opt/app-root/lib/python3.6/site-packages/sqlalchemy/orm/query.py", line 3161, in all
    return list(self)
  File "/opt/app-root/lib/python3.6/site-packages/sqlalchemy/orm/query.py", line 3317, in __iter__
    return self._execute_and_instances(context)
  File "/opt/app-root/lib/python3.6/site-packages/sqlalchemy/orm/query.py", line 3339, in _execute_and_instances
    querycontext, self._connection_from_session, close_with_result=True
  File "/opt/app-root/lib/python3.6/site-packages/sqlalchemy/orm/query.py", line 3354, in _get_bind_args
    mapper=self._bind_mapper(), clause=querycontext.statement, **kw
  File "/opt/app-root/lib/python3.6/site-packages/sqlalchemy/orm/query.py", line 3332, in _connection_from_session
    conn = self.session.connection(**kw)
  File "/opt/app-root/lib/python3.6/site-packages/sqlalchemy/orm/session.py", line 1123, in connection
    execution_options=execution_options,
  File "/opt/app-root/lib/python3.6/site-packages/sqlalchemy/orm/session.py", line 1129, in _connection_for_bind
    engine, execution_options
  File "/opt/app-root/lib/python3.6/site-packages/sqlalchemy/orm/session.py", line 430, in _connection_for_bind
    conn = bind._contextual_connect()
  File "/opt/app-root/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 2226, in _contextual_connect
    self._wrap_pool_connect(self.pool.connect, None),
  File "/opt/app-root/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 2266, in _wrap_pool_connect
    e, dialect, self
  File "/opt/app-root/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 1536, in _handle_dbapi_exception_noconnection
    util.raise_from_cause(sqlalchemy_exception, exc_info)
  File "/opt/app-root/lib/python3.6/site-packages/sqlalchemy/util/compat.py", line 383, in raise_from_cause
    reraise(type(exception), exception, tb=exc_tb, cause=cause)
  File "/opt/app-root/lib/python3.6/site-packages/sqlalchemy/util/compat.py", line 128, in reraise
    raise value.with_traceback(tb)
  File "/opt/app-root/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 2262, in _wrap_pool_connect
    return fn()
  File "/opt/app-root/lib/python3.6/site-packages/sqlalchemy/pool/base.py", line 354, in connect
    return _ConnectionFairy._checkout(self)
  File "/opt/app-root/lib/python3.6/site-packages/sqlalchemy/pool/base.py", line 782, in _checkout
    result = pool._dialect.do_ping(fairy.connection)
  File "/opt/app-root/lib/python3.6/site-packages/sqlalchemy/engine/default.py", line 517, in do_ping
    cursor = dbapi_connection.cursor()
  File "/opt/app-root/lib/python3.6/site-packages/impala/hiveserver2.py", line 125, in cursor
    session = self.service.open_session(user, configuration)
  File "/opt/app-root/lib/python3.6/site-packages/impala/hiveserver2.py", line 995, in open_session
    resp = self._rpc('OpenSession', req)
  File "/opt/app-root/lib/python3.6/site-packages/impala/hiveserver2.py", line 923, in _rpc
    response = self._execute(func_name, request)
  File "/opt/app-root/lib/python3.6/site-packages/impala/hiveserver2.py", line 954, in _execute
    .format(self.retries))
sqlalchemy.exc.DBAPIError: (impala.error.HiveServer2Error) Failed after retrying 3 times
(Background on this error at: http://sqlalche.me/e/dbapi)

caused by 3 failures in:

Traceback (most recent call last):
  File "/opt/app-root/lib/python3.6/site-packages/impala/hiveserver2.py", line 940, in _execute
    return func(request)
  File "/opt/app-root/lib/python3.6/site-packages/thriftpy/thrift.py", line 195, in _req
    self._send(_api, **kwargs)
  File "/opt/app-root/lib/python3.6/site-packages/thriftpy/thrift.py", line 207, in _send
    self._oprot.trans.flush()
  File "/opt/app-root/lib/python3.6/site-packages/thrift_sasl/__init__.py", line 129, in flush
    self._flushPlain(buffer)
  File "/opt/app-root/lib/python3.6/site-packages/thrift_sasl/__init__.py", line 152, in _flushPlain
    self._trans.write(struct.pack(">I", len(buffer)) + buffer)
  File "/opt/app-root/lib/python3.6/site-packages/thriftpy/transport/socket.py", line 129, in write
    self.sock.sendall(buff)
  File "/opt/rh/rh-python36/root/usr/lib64/python3.6/ssl.py", line 975, in sendall
    return socket.sendall(self, data, flags)
BrokenPipeError: [Errno 32] Broken pipe 

Unfortunately the default dialect from sqlalchemy in do_ping() method only catches:

    def do_ping(self, dbapi_connection):
        cursor = None
        try:
            cursor = dbapi_connection.cursor()
            try:
                cursor.execute(self._dialect_specific_select_one)
            finally:
                cursor.close()
        except self.dbapi.Error as err:
            if self.is_disconnect(err, dbapi_connection, cursor):
                return False
            else:
                raise
        else:
            return True

Any thoughts why this would not catch Broken pipes? Feels like self.dbapi.Error should be catching the impala.error.HiveServer2Error?

Impala: 0.14.1 Python: 3.6

alexjironkin commented 5 years ago

The default Dialect implements is_disconnect to be always False and individual dialects implement the specific version of this (see https://docs.sqlalchemy.org/en/13/core/internals.html?highlight=is_disconnect#sqlalchemy.engine.default.DefaultDialect.is_disconnect)

I think we need to do the following for Impyla in impala.sqlalchemy.ImpalaDialect:

alexjironkin commented 5 years ago

We found more exceptions:

Traceback (most recent call last):
  File "/opt/app-root/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 1244, in _execute_context
    cursor, statement, parameters, context
  File "/opt/app-root/lib/python3.6/site-packages/sqlalchemy/engine/default.py", line 552, in do_execute
    cursor.execute(statement, parameters)
  File "/opt/app-root/lib/python3.6/site-packages/impala/hiveserver2.py", line 304, in execute
    self._wait_to_finish()  # make execute synchronous
  File "/opt/app-root/lib/python3.6/site-packages/impala/hiveserver2.py", line 380, in _wait_to_finish
    raise OperationalError(resp.errorMessage)
impala.error.OperationalError: EndDataStream() to *.*.*.*:27000 failed: Network error: failed to read from TLS socket: Connection reset by peer (error 104)
  File "/opt/app-root/lib/python3.6/site-packages/sqlalchemy/engine/default.py", line 517, in do_ping
    cursor = dbapi_connection.cursor()
  File "/opt/app-root/lib/python3.6/site-packages/impala/hiveserver2.py", line 139, in cursor
    cursor.execute('USE %s' % self.default_db)
  File "/opt/app-root/lib/python3.6/site-packages/impala/hiveserver2.py", line 302, in execute
    configuration=configuration)
  File "/opt/app-root/lib/python3.6/site-packages/impala/hiveserver2.py", line 343, in execute_async
    self._execute_async(op)
  File "/opt/app-root/lib/python3.6/site-packages/impala/hiveserver2.py", line 362, in _execute_async
    operation_fn()
  File "/opt/app-root/lib/python3.6/site-packages/impala/hiveserver2.py", line 340, in op
    async=True)
  File "/opt/app-root/lib/python3.6/site-packages/impala/hiveserver2.py", line 1027, in execute
    return self._operation('ExecuteStatement', req)
  File "/opt/app-root/lib/python3.6/site-packages/impala/hiveserver2.py", line 957, in _operation
    resp = self._rpc(kind, request)
  File "/opt/app-root/lib/python3.6/site-packages/impala/hiveserver2.py", line 923, in _rpc
    response = self._execute(func_name, request)
  File "/opt/app-root/lib/python3.6/site-packages/impala/hiveserver2.py", line 937, in _execute
    open_transport(transport)
  File "/opt/app-root/lib/python3.6/site-packages/impala/hiveserver2.py", line 971, in open_transport
    transport.open()
  File "/opt/app-root/lib/python3.6/site-packages/thrift_sasl/__init__.py", line 61, in open
    self._trans.open()
  File "/opt/app-root/lib/python3.6/site-packages/thriftpy/transport/socket.py", line 96, in open
    self.sock.connect(addr)
  File "/opt/rh/rh-python36/root/usr/lib64/python3.6/ssl.py", line 1100, in connect
    self._real_connect(addr, False)
  File "/opt/rh/rh-python36/root/usr/lib64/python3.6/ssl.py", line 1091, in _real_connect
    self.do_handshake()
  File "/opt/rh/rh-python36/root/usr/lib64/python3.6/ssl.py", line 1068, in do_handshake
    self._sslobj.do_handshake()
AttributeError: 'NoneType' object has no attribute 'do_handshake'
bgedik commented 4 years ago

Any progress on this?

alexjironkin commented 4 years ago

Hi, sorry for long delay, yes lots of progress. With SQLAlchemy pool pre-ping we managed to eliminate these. The issue seems to stem from server drops connections and was unreliable (don't know if this was configuration issue or design we don't manage this service).

We did implement a SELECT 1 based ping, but as I mentioned there is your own ping method we should use. It would be great to see this implemented in impyla.

Wonz5130 commented 2 years ago

I met this problem and I solved it finally. The following url is my stackoverflow answer url:

impala.error.HiveServer2Error: Failed after retrying 3 times