dropbox / PyHive

Python interface to Hive and Presto. 🐝
Other
1.67k stars 549 forks source link

Insert with multiple rows fails #250

Open alex-ber opened 6 years ago

alex-ber commented 6 years ago

I use latest PyHive 0.1.8, thrift 0.11.0 and thrift-sasl 0.3.0. I use latest SQLAlchemy 1.2.12. I use pandas 0.16.0 (the problem is not in Pandas).

I am creating DataFrame with 3 rows. I want to create table if it doesn't exists and put these rows to it.

The DB schema can be defined as following:

CREATE DATABASE IF NOT EXISTS alx_test;

CREATE TABLE alx_test.alx_score (
    ids STRING, 
    scores_all FLOAT
);

The following code in Python 2.7. Below is simplified version of what I'm trying to achieve:


from sqlalchemy import create_engine
from sqlalchemy.schema import MetaData
from collections import OrderedDict
from pandas.core.frame import DataFrame

dbName = "alx_test"
tableName = 'alx_score'

import logging
logging.basicConfig(format='%(asctime)s [%(process)d]: %(message)s', level=logging.INFO)
logging.getLogger('sqlalchemy').setLevel(logging.DEBUG)

def checkMultiinsert():
    hive_engine = create_engine('hive://hive_username@remote_host:10000/default?auth=NOSASL')
    meta = MetaData(hive_engine, schema=dbName)
    meta.reflect(only=[tableName])

    ids_all = ['10', '20', '30']
    scores_all = [0.1, 0.5, 0.9]

    cols = OrderedDict()
    cols.update({'ids': ids_all})
    cols.update({'scores_all': scores_all})

    df_score = DataFrame.from_dict(cols)

    df_score.to_sql(tableName, hive_engine, 
                schema=dbName, 
                if_exists='replace',
                index=False
                )

if __name__ == "__main__":
    checkMultiinsert()

Note: I have to use Hive user in the connection URL because of bug

Note: I enable logs for SQLAlchemy just to get better understanding of what is going on, you can remove them.

I get following error:

Traceback (most recent call last):
  File "<some path>\poc\multiInsert.py", line 36, in <module>
    checkMultiinsert()
  File "<some path>\poc\multiInsert.py", line 31, in checkMultiinsert
    index=False
  File "C:\programs\Anaconda\lib\site-packages\pandas\core\generic.py", line 977, in to_sql
    dtype=dtype)
  File "C:\programs\Anaconda\lib\site-packages\pandas\io\sql.py", line 538, in to_sql
    chunksize=chunksize, dtype=dtype)
  File "C:\programs\Anaconda\lib\site-packages\pandas\io\sql.py", line 1177, in to_sql
    table.insert(chunksize)
  File "C:\programs\Anaconda\lib\site-packages\pandas\io\sql.py", line 717, in insert
    self._execute_insert(conn, keys, chunk_iter)
  File "C:\programs\Anaconda\lib\site-packages\pandas\io\sql.py", line 692, in _execute_insert
    conn.execute(self.insert_statement(), data)
  File "C:\programs\Anaconda\lib\site-packages\sqlalchemy\engine\base.py", line 948, in execute
    return meth(self, multiparams, params)
  File "C:\programs\Anaconda\lib\site-packages\sqlalchemy\sql\elements.py", line 269, in _execute_on_connection
    return connection._execute_clauseelement(self, multiparams, params)
  File "C:\programs\Anaconda\lib\site-packages\sqlalchemy\engine\base.py", line 1060, in _execute_clauseelement
    compiled_sql, distilled_params
  File "C:\programs\Anaconda\lib\site-packages\sqlalchemy\engine\base.py", line 1200, in _execute_context
    context)
  File "C:\programs\Anaconda\lib\site-packages\sqlalchemy\engine\base.py", line 1413, in _handle_dbapi_exception
    exc_info
  File "C:\programs\Anaconda\lib\site-packages\sqlalchemy\util\compat.py", line 265, in raise_from_cause
    reraise(type(exception), exception, tb=exc_tb, cause=cause)
  File "C:\programs\Anaconda\lib\site-packages\sqlalchemy\engine\base.py", line 1170, in _execute_context
    context)
  File "C:\programs\Anaconda\lib\site-packages\sqlalchemy\engine\default.py", line 506, in do_executemany
    cursor.executemany(statement, parameters)
  File "C:\programs\Anaconda\lib\site-packages\pyhive\common.py", line 90, in executemany
    self._fetch_more()
  File "C:\programs\Anaconda\lib\site-packages\pyhive\hive.py", line 380, in _fetch_more
    raise ProgrammingError("No result set")
sqlalchemy.exc.ProgrammingError: (pyhive.exc.ProgrammingError) No result set [SQL: u'INSERT INTO TABLE `alx_test`.`alx_score` VALUES (%(ids)s, %(scores_all)s)'] [parameters: ({'ids': '10', 'scores_all': 0.1}, {'ids': '20', 'scores_all': 0.5}, {'ids': '30', 'scores_all': 0.9})] (Background on this error at: http://sqlalche.me/e/f405)

As you can see the problem is in hive.py at Cursor._fetch_more() method

This is it's code:


    def _fetch_more(self):
        """Send another TFetchResultsReq and update state"""
        assert(self._state == self._STATE_RUNNING), "Should be running when in _fetch_more"
        assert(self._operationHandle is not None), "Should have an op handle in _fetch_more"
        if not self._operationHandle.hasResultSet:
            raise ProgrammingError("No result set")
        req = ttypes.TFetchResultsReq(
            operationHandle=self._operationHandle,
            orientation=ttypes.TFetchOrientation.FETCH_NEXT,
            maxRows=self.arraysize,
        )
        response = self._connection.client.FetchResults(req)
        _check_status(response)
        schema = self.description
        assert not response.results.rows, 'expected data in columnar format'
        columns = [_unwrap_column(col, col_schema[1]) for col, col_schema in
                   zip(response.results.columns, schema)]
        new_data = list(zip(*columns))
        self._data += new_data
        # response.hasMoreRows seems to always be False, so we instead check the number of rows
        # https://github.com/apache/hive/blob/release-1.2.1/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java#L678
        # if not response.hasMoreRows:
        if not new_data:
            self._state = self._STATE_FINISHED

The reason is in line if not self._operationHandle.hasResultSet: this condition holds, and ProgrammingError is thrown. The state of self._operationHandle is the following:

TOperationHandle: TOperationHandle(hasResultSet=False, modifiedRowCount=None, operationType=0, operationId=THandleIdentifier(secret='\xac\x9a\x0f\xbf\x83\x87@\x86\xb8\x9e@np\xf8\xf6g', guid='\x81\x17=v.\x83G\xc1\x86U*+\xb4\xca\xa6\xdd'))

P.S. Insertion of 1 row works fine. That is, if I have DataFrame with 1 row, it works. This has to do with SQLAlchemy code (there is check if multiparam has only 1 value, than another execution path is taken, that is one that doesn't involve call to cursor's _fetch_more() method),

alex-ber commented 6 years ago

The workaround is to monkey patch pyhive.hive.Cursor's _fetch_more() method.

I've replaced

if not self._operationHandle.hasResultSet:
    raise ProgrammingError("No result set")

with

 if not self._operationHandle.hasResultSet:
        if not self._operationHandle.modifiedRowCount:
            self._state = self._STATE_FINISHED
            return
        else:
            raise ProgrammingError("No result set")

def _fetch_more(self):
    from pyhive.exc import ProgrammingError

    """Send another TFetchResultsReq and update state"""
    assert(self._state == self._STATE_RUNNING), "Should be running when in _fetch_more"
    assert(self._operationHandle is not None), "Should have an op handle in _fetch_more"
    if not self._operationHandle.hasResultSet:
        if not self._operationHandle.modifiedRowCount:
            self._state = self._STATE_FINISHED
            return
        else:
            raise ProgrammingError("No result set")

    self._old_fetch_more()

if __name__ == "__main__":
    #monkey patch
    from pyhive.hive import Cursor
    _fetch_more_fn = Cursor._fetch_more 
    Cursor._fetch_more=_fetch_more
    Cursor._old_fetch_more = _fetch_more_fn

    checkMultiinsert()

I am not sure whether this is correct fix or not, but it works in my case.

Cherishword commented 5 years ago

@alex-ber i hava same question,I get following error: `--------------------------------------------------------------------------- ProgrammingError Traceback (most recent call last)

in () 39 from sqlalchemy import create_engine 40 engine = create_engine('hive://root@192.168.12.67:10000/item_recommand') ---> 41 data.to_sql('user', con=engine,index=False, if_exists='append') /usr/local/lib/python2.7/site-packages/pandas/core/generic.pyc in to_sql(self, name, con, schema, if_exists, index, index_label, chunksize, dtype) 2128 sql.to_sql(self, name, con, schema=schema, if_exists=if_exists, 2129 index=index, index_label=index_label, chunksize=chunksize, -> 2130 dtype=dtype) 2131 2132 def to_pickle(self, path, compression='infer', /usr/local/lib/python2.7/site-packages/pandas/io/sql.pyc in to_sql(frame, name, con, schema, if_exists, index, index_label, chunksize, dtype) 448 pandas_sql.to_sql(frame, name, if_exists=if_exists, index=index, 449 index_label=index_label, schema=schema, --> 450 chunksize=chunksize, dtype=dtype) 451 452 /usr/local/lib/python2.7/site-packages/pandas/io/sql.pyc in to_sql(self, frame, name, if_exists, index, index_label, schema, chunksize, dtype) 1125 schema=schema, dtype=dtype) 1126 table.create() -> 1127 table.insert(chunksize) 1128 if (not name.isdigit() and not name.islower()): 1129 # check for potentially case sensitivity issues (GH7815) /usr/local/lib/python2.7/site-packages/pandas/io/sql.pyc in insert(self, chunksize) 639 640 chunk_iter = zip(*[arr[start_i:end_i] for arr in data_list]) --> 641 self._execute_insert(conn, keys, chunk_iter) 642 643 def _query_iterator(self, result, chunksize, columns, coerce_float=True, /usr/local/lib/python2.7/site-packages/pandas/io/sql.pyc in _execute_insert(self, conn, keys, data_iter) 614 def _execute_insert(self, conn, keys, data_iter): 615 data = [{k: v for k, v in zip(keys, row)} for row in data_iter] --> 616 conn.execute(self.insert_statement(), data) 617 618 def insert(self, chunksize=None): /usr/local/lib/python2.7/site-packages/sqlalchemy/engine/base.pyc in execute(self, object, *multiparams, **params) 946 raise exc.ObjectNotExecutableError(object) 947 else: --> 948 return meth(self, multiparams, params) 949 950 def _execute_function(self, func, multiparams, params): /usr/local/lib/python2.7/site-packages/sqlalchemy/sql/elements.pyc in _execute_on_connection(self, connection, multiparams, params) 267 def _execute_on_connection(self, connection, multiparams, params): 268 if self.supports_execution: --> 269 return connection._execute_clauseelement(self, multiparams, params) 270 else: 271 raise exc.ObjectNotExecutableError(self) /usr/local/lib/python2.7/site-packages/sqlalchemy/engine/base.pyc in _execute_clauseelement(self, elem, multiparams, params) 1058 compiled_sql, 1059 distilled_params, -> 1060 compiled_sql, distilled_params 1061 ) 1062 if self._has_events or self.engine._has_events: /usr/local/lib/python2.7/site-packages/sqlalchemy/engine/base.pyc in _execute_context(self, dialect, constructor, statement, parameters, *args) 1198 parameters, 1199 cursor, -> 1200 context) 1201 1202 if self._has_events or self.engine._has_events: /usr/local/lib/python2.7/site-packages/sqlalchemy/engine/base.pyc in _handle_dbapi_exception(self, e, statement, parameters, cursor, context) 1411 util.raise_from_cause( 1412 sqlalchemy_exception, -> 1413 exc_info 1414 ) 1415 else: /usr/local/lib/python2.7/site-packages/sqlalchemy/util/compat.pyc in raise_from_cause(exception, exc_info) 263 exc_type, exc_value, exc_tb = exc_info 264 cause = exc_value if exc_value is not exception else None --> 265 reraise(type(exception), exception, tb=exc_tb, cause=cause) 266 267 if py3k: /usr/local/lib/python2.7/site-packages/sqlalchemy/engine/base.pyc in _execute_context(self, dialect, constructor, statement, parameters, *args) 1168 statement, 1169 parameters, -> 1170 context) 1171 elif not parameters and context.no_parameters: 1172 if self.dialect._has_events: /usr/local/lib/python2.7/site-packages/sqlalchemy/engine/default.pyc in do_executemany(self, cursor, statement, parameters, context) 504 505 def do_executemany(self, cursor, statement, parameters, context=None): --> 506 cursor.executemany(statement, parameters) 507 508 def do_execute(self, cursor, statement, parameters, context=None): /usr/local/lib/python2.7/site-packages/pyhive/common.pyc in executemany(self, operation, seq_of_parameters) 88 self.execute(operation, parameters) 89 while self._state != self._STATE_FINISHED: ---> 90 self._fetch_more() 91 if seq_of_parameters: 92 self.execute(operation, seq_of_parameters[-1]) /usr/local/lib/python2.7/site-packages/pyhive/hive.py in _fetch_more(self) 378 assert(self._operationHandle is not None), "Should have an op handle in _fetch_more" 379 if not self._operationHandle.hasResultSet: --> 380 if not self._operationHandle.modifiedRowCount: 381 self._state = self._STATE_FINISHED 382 return ProgrammingError: (pyhive.exc.ProgrammingError) No result set [SQL: u'INSERT INTO TABLE `user` VALUES (%(vipid)s, %(item)s, %(acttype)s, %(month)s, %(day)s)'] [parameters: ({'item': '95', 'acttype': 2, 'vipid': '17', 'day': '30', 'month': '4'}, {'item': '19', 'acttype': 1, 'vipid': '11', 'day': '13', 'month': '4'}, {'item': '48', 'acttype': 2, 'vipid': '6', 'day': '14', 'month': '10'}, {'item': '37', 'acttype': 2, 'vipid': '58', 'day': '9', 'month': '8'}, {'item': '14', 'acttype': 1, 'vipid': '51', 'day': '14', 'month': '4'}, {'item': '90', 'acttype': 2, 'vipid': '50', 'day': '26', 'month': '6'}, {'item': '82', 'acttype': 2, 'vipid': '46', 'day': '6', 'month': '2'}, {'item': '52', 'acttype': 2, 'vipid': '52', 'day': '3', 'month': '2'} ... displaying 10 of 10000 total bound parameter sets ... {'item': '36', 'acttype': 1, 'vipid': '54', 'day': '20', 'month': '10'}, {'item': '86', 'acttype': 2, 'vipid': '94', 'day': '28', 'month': '7'})] (Background on this error at: http://sqlalche.me/e/f405)`
aa3222119 commented 5 years ago

have this issue fixed?@Cherishsword @alex-ber

alex-ber commented 5 years ago

As far as I know, no.

nicolasesnis commented 4 years ago

This issue is still not fixed

CompileError: The 'presto' dialect with current database version settings does not support in-place multirow inserts.

alex-ber commented 4 years ago

I've found 2 work-arrounds:

  1. Using fabric upload files to Hive server and run command to put them to HDFS on location where you have Hive table defined.
  2. Put them on S3 bucket. Amazon EMR use this bucket for table definition.

Note: you should compute statistic for table after such manipulation.

Note: if you have more than, say 10 rows, doing insert row by row will take like forever, so this is not really an option.

bkyryliuk commented 4 years ago

@nicholasbern I've enabled multi-row inserts for the presto, it is on master and will be available in the next release. As for hive issues, we are open for the contributions.

nicolasesnis commented 4 years ago

It works fine. Thank you @bkyryliuk

wilberh commented 4 years ago

Any fix / workaround for Hive on inserting a batch of data? Other than uploading to HDFS or s3 bucket, or monkey patching the monkey patch pyhive.hive.Cursor's _fetch_more() method.

wilberh commented 4 years ago

Here's a workaround - Chunking csv files using panda's dataframe.

https://pandas.pydata.org/pandas-docs/stable/user_guide/io.html#io-chunking https://medium.com/towards-artificial-intelligence/efficient-pandas-using-chunksize-for-large-data-sets-c66bf3037f93 https://github.com/dropbox/PyHive/issues/55

CristhianBoujon commented 4 years ago

Hi! I'm using pyhive + pandas I have the same problem when I run:

data.to_sql("test", con=engine, if_exists='append', index=False)

Since the problem happens when inserting multiple rows, I tried to add method='multi' which pass multiple values in a single INSERT clause:

data.to_sql("test", con=engine, if_exists='append', index=False, method='multi')

And it worked for me!