pandas-dev / pandas

Flexible and powerful data analysis / manipulation library for Python, providing labeled data structures similar to R data.frame objects, statistical functions, and much more
https://pandas.pydata.org
BSD 3-Clause "New" or "Revised" License
43.69k stars 17.92k forks source link

Trying to use to_sql on dataframe with numpy.ndarray columns causes ValueError #29240

Open fergu opened 5 years ago

fergu commented 5 years ago

I am trying out using pandas+sqlalchemy (specifically sqlite) as a means to store my data. In my use case, I will have a number of rows where most will be obvious types such as string, int, float, etc - but I will want to store a few numpy ndarrays. In the example below, each 'channel' entry is a 1x3000 array of floats. For higher dimensional arrays (at most 2D) I plan to store the shape of the data alongside the flattened array, so this is all I need.

Code Sample, a copy-pastable example if possible

This was run in Jupyter Lab.

import sqlalchemy
import numpy as np
import pandas as pd

engine = sqlalchemy.create_engine('sqlite:///:memory:')
allPts = []

for i in range(0,30):
    name = "R"+str(i).zfill(3)
    start = np.random.rand()
    end = np.random.rand()
    data = np.random.random((4,3000))
    thisDict = {'name':name,'start':start,'end':end,'channel1':data[:,0],'channel2':data[:,1],'channel3':data[:,2],'channel4':data[:,3]}
    allPts.append(pd.Series(thisDict))
asDF = pd.DataFrame(allPts)
asDF.to_sql('dftest',engine,if_exists='replace')

This produces the traceback

---------------------------------------------------------------------------
ValueError                                Traceback (most recent call last)
<ipython-input-30-e1fb02c5da64> in <module>
----> 1 asDF.to_sql('dftest',engine,if_exists='replace')

c:\users\(me)\documents\pyenv\dataproc\lib\site-packages\pandas\core\generic.py in to_sql(self, name, con, schema, if_exists, index, index_label, chunksize, dtype, method)
   2710             chunksize=chunksize,
   2711             dtype=dtype,
-> 2712             method=method,
   2713         )
   2714 

c:\users\(me)\documents\pyenv\dataproc\lib\site-packages\pandas\io\sql.py in to_sql(frame, name, con, schema, if_exists, index, index_label, chunksize, dtype, method)
    516         chunksize=chunksize,
    517         dtype=dtype,
--> 518         method=method,
    519     )
    520 

c:\users\(me)\documents\pyenv\dataproc\lib\site-packages\pandas\io\sql.py in to_sql(self, frame, name, if_exists, index, index_label, schema, chunksize, dtype, method)
   1318         )
   1319         table.create()
-> 1320         table.insert(chunksize, method=method)
   1321         if not name.isdigit() and not name.islower():
   1322             # check for potentially case sensitivity issues (GH7815)

c:\users\(me)\documents\pyenv\dataproc\lib\site-packages\pandas\io\sql.py in insert(self, chunksize, method)
    754 
    755                 chunk_iter = zip(*[arr[start_i:end_i] for arr in data_list])
--> 756                 exec_insert(conn, keys, chunk_iter)
    757 
    758     def _query_iterator(

c:\users\(me)\documents\pyenv\dataproc\lib\site-packages\pandas\io\sql.py in _execute_insert(self, conn, keys, data_iter)
    668         """
    669         data = [dict(zip(keys, row)) for row in data_iter]
--> 670         conn.execute(self.table.insert(), data)
    671 
    672     def _execute_insert_multi(self, conn, keys, data_iter):

c:\users\(me)\documents\pyenv\dataproc\lib\site-packages\sqlalchemy\engine\base.py in execute(self, object_, *multiparams, **params)
    986             raise exc.ObjectNotExecutableError(object_)
    987         else:
--> 988             return meth(self, multiparams, params)
    989 
    990     def _execute_function(self, func, multiparams, params):

c:\users\(me)\documents\pyenv\dataproc\lib\site-packages\sqlalchemy\sql\elements.py in _execute_on_connection(self, connection, multiparams, params)
    285     def _execute_on_connection(self, connection, multiparams, params):
    286         if self.supports_execution:
--> 287             return connection._execute_clauseelement(self, multiparams, params)
    288         else:
    289             raise exc.ObjectNotExecutableError(self)

c:\users\(me)\documents\pyenv\dataproc\lib\site-packages\sqlalchemy\engine\base.py in _execute_clauseelement(self, elem, multiparams, params)
   1105             distilled_params,
   1106             compiled_sql,
-> 1107             distilled_params,
   1108         )
   1109         if self._has_events or self.engine._has_events:

c:\users\(me)\documents\pyenv\dataproc\lib\site-packages\sqlalchemy\engine\base.py in _execute_context(self, dialect, constructor, statement, parameters, *args)
   1251         except BaseException as e:
   1252             self._handle_dbapi_exception(
-> 1253                 e, statement, parameters, cursor, context
   1254             )
   1255 

c:\users\(me)\documents\pyenv\dataproc\lib\site-packages\sqlalchemy\engine\base.py in _handle_dbapi_exception(self, e, statement, parameters, cursor, context)
   1473                 util.raise_from_cause(sqlalchemy_exception, exc_info)
   1474             else:
-> 1475                 util.reraise(*exc_info)
   1476 
   1477         finally:

c:\users\(me)\documents\pyenv\dataproc\lib\site-packages\sqlalchemy\util\compat.py in reraise(tp, value, tb, cause)
    151         if value.__traceback__ is not tb:
    152             raise value.with_traceback(tb)
--> 153         raise value
    154 
    155     def u(s):

c:\users\(me)\documents\pyenv\dataproc\lib\site-packages\sqlalchemy\engine\base.py in _execute_context(self, dialect, constructor, statement, parameters, *args)
   1227                 if not evt_handled:
   1228                     self.dialect.do_executemany(
-> 1229                         cursor, statement, parameters, context
   1230                     )
   1231             elif not parameters and context.no_parameters:

c:\users\(me)\documents\pyenv\dataproc\lib\site-packages\sqlalchemy\engine\default.py in do_executemany(self, cursor, statement, parameters, context)
    575 
    576     def do_executemany(self, cursor, statement, parameters, context=None):
--> 577         cursor.executemany(statement, parameters)
    578 
    579     def do_execute(self, cursor, statement, parameters, context=None):

ValueError: could not convert BLOB to buffer

Problem description

I notice that the bottom of the traceback specifies the error started in sqlalchemy - though for all my googling I can't find any record of this problem (or error message) anywhere either with sqlalchemy or pandas. I'm assuming this means I probably am doing something wrong, but I'm not seeing it when looking at docs etc. Storing a numpy array as a binary blob makes sense to me, so I feel like the type inference makes sense.

Expected Output

Writing to the database/no error. :)

Output of pd.show_versions()

[paste the output of ``pd.show_versions()`` here below this line] INSTALLED VERSIONS ------------------ commit : None python : 3.7.4.final.0 python-bits : 64 OS : Windows OS-release : 10 machine : AMD64 processor : Intel64 Family 6 Model 158 Stepping 11, GenuineIntel byteorder : little LC_ALL : None LANG : None LOCALE : None.None pandas : 0.25.1 numpy : 1.17.3 pytz : 2019.3 dateutil : 2.8.0 pip : 19.3 setuptools : 40.8.0 Cython : None pytest : None hypothesis : None sphinx : None blosc : None feather : None xlsxwriter : None lxml.etree : None html5lib : None pymysql : None psycopg2 : None jinja2 : 2.10.3 IPython : 7.8.0 pandas_datareader: None bs4 : None bottleneck : None fastparquet : None gcsfs : None lxml.etree : None matplotlib : 3.1.1 numexpr : 2.7.0 odfpy : None openpyxl : None pandas_gbq : None pyarrow : None pytables : None s3fs : None scipy : 1.3.1 sqlalchemy : 1.3.10 tables : 3.6.0 xarray : None xlrd : None xlwt : None xlsxwriter : None
fergu commented 5 years ago

I found the solution from this link

Essentially, np.ndarray needs something to convert the array in to something that sqlite can actually handle as it has no array datatype, and apparently something goes wrong when trying to buffer the BLOB'd array directly. So, instead, convert the whole mess to text using numpy's save functionality.

Taking the code from the above link and attaching it to my (now working) code from above.

import sqlalchemy
import sqlite3
import numpy as np
import io

def adapt_array(arr):
    """
    http://stackoverflow.com/a/31312102/190597 (SoulNibbler)
    """
    out = io.BytesIO()
    np.save(out, arr)
    out.seek(0)
    return sqlite3.Binary(out.read())

def convert_array(text):
    out = io.BytesIO(text)
    out.seek(0)
    return np.load(out)

# Converts np.array to TEXT when inserting
sqlite3.register_adapter(np.ndarray, adapt_array)

# Converts TEXT to np.array when selecting
sqlite3.register_converter("array", convert_array)

engine = sqlalchemy.create_engine('sqlite:///:memory:',connect_args={'detect_types':sqlite3.PARSE_DECLTYPES})

This feels like something that should maybe be default behavior for Panda's underlying communication with sqlalchemy (or maybe sqlalchemy's communication with sqlite3)?

fergu commented 5 years ago

Just to add on to my particular issue here - I am finding that reading back does not seem to work properly if everything is done as I put above. I can save to the database, but trying to read back I just get binary data.

I confirmed that the converter does not fire when the column is read back. The reason for this is that it appears sqlalchemy creates the table using whatever appropriate data type that is natively supported (TEXT in this case) and ignores whether a custom adapter was used. The only way (as far as I know) to get the converter to fire when reading from the table is to create the table yourself. I.E

engine = sqlalchemy.create_engine('sqlite:///:memory:',connect_args={'detect_types':sqlite3.PARSE_DECLTYPES},echo=True)
con = engine.connect()
con.execute("create table dftest (\"index\" BIGINT, name TEXT, start FLOAT, \"end\" FLOAT, channel1 array, channel2 array, channel3 array, channel4 array)") # NOTE: 'index' column needs to be included for the Pandas functions for SQLite to work

After this the pandas functions can be used normally as long as the if_exists argument is set to 'append' as otherwise the declared types will be forgotten and the converter will never fire.

This may end up being an issue for the folks working on sqlalchemy rather than Pandas it appears, though.

fergu commented 5 years ago

Okay, another update for anyone who stumbles across this in the future with a similar problem.

It seems to be that this is not (easily/obviously) possible when using the sqlalchemy engine since something seems to get lost on its way in/out of there. I can get a custom type (as outlined here) process_bind_param call to fire and store appropriately, but I can't get process_result_value to fire during the read. I've tried using the @compiles decorator, which does affect the CREATE TABLE command (I.E so I get channel1 mycustomtype), but even then the result command never fires during pd.read_sql_table. I spent some time working my way through the code to see where things got lost, but never managed to find exactly where things went missing or why my class never fired during a read. I did not try using sqlalchemy's direct functionality to read/write from the database to see if this was a pandas problem or not, though.

On the other hand, I have gotten this working using some tweaks on what I've posted above. Basically, the code in my earlier replies seems to work as long as an sqlite3 connection is passed in instead of an sqlalchemy engine.

I've left the (now commented out) parts of what I tried using sqlalchemy's procedures in case anyone wants to try and chew on why it wasn't working.

import pandas as pd

pd.__version__
import os
import sys
# import sqlalchemy
# import sqlalchemy.types as types
# from sqlalchemy.ext.compiler import compiles
import sqlite3
import numpy as np
import io

#
# class arraytodb(types.TypeDecorator):
#     impl = types.Text # This is needed to indicate what "base type" we're using
#     def process_bind_param(self, value, dialect):
#         print("Binding")
#         return "a"
# #         out = io.BytesIO()
# #         np.save(out, value)
# #         out.seek(0)
# #         return sqlite3.Binary(out.read())
#
#     def process_result_value(self, value, dialect):
#         print("Process result")
#         print(value)
#         print(dialect)
#         return "b"
# #         return value
#
# @compiles(arraytodb)
# def arraycompiler(type_, compiler, **kw):
#     return "arraytodb"

def adapt_array(arr):
    """
    http://stackoverflow.com/a/31312102/190597 (SoulNibbler)
    """
    out = io.BytesIO()
    np.save(out, arr)
    out.seek(0)
    return sqlite3.Binary(out.read())

def convert_array(text):
    out = io.BytesIO(text)
    out.seek(0)
    return np.load(out)

# Converts np.array to TEXT when inserting
sqlite3.register_adapter(np.ndarray, adapt_array)

# Converts TEXT to np.array when selecting
sqlite3.register_converter("arraytodb", convert_array)

con = sqlite3.connect(":memory:", detect_types=sqlite3.PARSE_DECLTYPES)
# engine = sqlalchemy.create_engine('sqlite:///:memory:', connect_args={'detect_types':sqlite3.PARSE_DECLTYPES},echo=True)

allPts = []

for i in range(0,30):
    name = "R"+str(i).zfill(3)
    start = np.random.rand()
    end = np.random.rand()
    data = np.random.random((3000,4))
    thisDict = {'name':str(name),'start':start,'end':end,'channel1':data[:,0],'channel2':data[:,1],'channel3':data[:,2],'channel4':data[:,3]}
    allPts.append(pd.Series(thisDict))

asDF = pd.DataFrame(allPts)

asDF.to_sql('dftest',con,if_exists='replace',dtype={'channel1':"arraytodb",'channel2':"arraytodb",'channel3':"arraytodb",'channel4':"arraytodb"})

print(pd.read_sql('SELECT * from dftest',con))