Open Joselguti opened 1 year ago
@Joselguti, Python Spanner client, on top of which the SQLAlchemy dialect is built, has a system of auto-retry of aborted transactions (see for details: https://github.com/googleapis/python-spanner-sqlalchemy#automatic-transactions-retry).
Thus, what's happening in your case is a too long transaction and some kind of a data change in parallel. The solution for this sort of problems is always the same - reduce the size of the transaction.
@IlyaFaer Thank you,
A follow up question, I have noticed that in general it takes a long time (in comparasion to other DBs) to insert data to spanner and looking at the logs/insight page at spanner both insert and read are around kB order in speed.
(Usually read is bigger than insert)
I am utilizing the examples in this library to insert data into Spanner, keeping it around 40.000 mutations per cycle when said inserted dataframes are bigger.
Is there another way or something I am doing wrong that can be done in order to boost insert speed? (Other than giving it more processing units)
Thank you.
@Joselguti, hm-m. Let's say sqlalchemy-spanner is not intended for such huge operations. Pure Python Spanner client, which is used by sqlalchemy-spanner dialect under the hood, has a mechanism of mutations, which gives significant speed boost: https://cloud.google.com/spanner/docs/modify-mutation-api
But SQLAlchemy doesn't provide a way to patch python-spanner batches mechanism to SQLAlchemy. Still, I think there is a workaround. Actually, when you're getting a connection with sqlalchemy-spanner, it has a reference to the original db_api connection, which has a method executemany()
.
So, preliminary, I see two workarounds to try:
with engine.begin() as connection:
connection.connection.executemany(sql, values)
Or even deeper, to the very basic Spanner API:
with engine.begin() as connection:
with connection.connection._database.batch() as batch:
batch.insert(
table="Singers",
columns=("SingerId", "FirstName", "LastName"),
values=[
(1, "Marc", "Richards"),
(2, "Catalina", "Smith"),
(3, "Alice", "Trentor"),
(4, "Lea", "Martin"),
(5, "David", "Lomond"),
],
)
This approach uses the mutations mechanism I mentioned at the top of this comment.
Here is the code of the connection.connection
object:
https://github.com/googleapis/python-spanner/blob/9ebbf78df75fb7ee4d58ea36847fb19b1194afb3/google/cloud/spanner_dbapi/connection.py#L58
I see you have uneasy cases, so I'd recommend to study this class, because django-spanner is just a wrap around. We override things that SQLAlchemy allows us to override. It has limitations at some items, but you still can go to the very basic functionality and use standard Spanner functions for complex cases.
@IlyaFaer
Hello,
I have been working around some of the issues we have been talking in this post, everything was working as intended however there is a new situation I'm having trouble to deal with.
Currently I'm generating several conections and data uploads (to different tables in spanners), to achieve this I am using Python threading tool, however when I have around 3-4 or more process uploading at the same time I sometimes get the following error:
raceback (most recent call last):
File "C:\Users\genexus\AppData\Local\Programs\Python\Python39\lib\site-packages\google\api_core\grpc_helpers.py", line 72, in error_remapped_callable
return callable_(*args, **kwargs)
File "C:\Users\genexus\AppData\Local\Programs\Python\Python39\lib\site-packages\grpc\_channel.py", line 946, in __call__
return _end_unary_response_blocking(state, call, False, None)
File "C:\Users\genexus\AppData\Local\Programs\Python\Python39\lib\site-packages\grpc\_channel.py", line 849, in _end_unary_response_blocking
raise _InactiveRpcError(state)
grpc._channel._InactiveRpcError: <_InactiveRpcError of RPC that terminated with:
status = StatusCode.ABORTED
details = "Transaction aborted. Database schema probably changed during transaction, retry may succeed."
debug_error_string = "{"created":"@1686679459.367000000","description":"Error received from peer ipv4:IP:443","file":"src/core/lib/surface/call.cc","file_line":967,"grpc_message":"Transaction aborted. Database schema probably changed during transaction, retry may succeed.","grpc_status":10}"
>
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "C:\Users\genexus\AppData\Local\Programs\Python\Python39\lib\site-packages\google\cloud\spanner_dbapi\cursor.py", line 335, in executemany
status, res = transaction.batch_update(statements)
File "C:\Users\genexus\AppData\Local\Programs\Python\Python39\lib\site-packages\google\cloud\spanner_v1\transaction.py", line 377, in batch_update
response = api.execute_batch_dml(request=request, metadata=metadata)
File "C:\Users\genexus\AppData\Local\Programs\Python\Python39\lib\site-packages\google\cloud\spanner_v1\services\spanner\client.py", line 1267, in execute_batch_dml
response = rpc(
File "C:\Users\genexus\AppData\Local\Programs\Python\Python39\lib\site-packages\google\api_core\gapic_v1\method.py", line 113, in __call__
return wrapped_func(*args, **kwargs)
File "C:\Users\genexus\AppData\Local\Programs\Python\Python39\lib\site-packages\google\api_core\retry.py", line 349, in retry_wrapped_func
return retry_target(
File "C:\Users\genexus\AppData\Local\Programs\Python\Python39\lib\site-packages\google\api_core\retry.py", line 191, in retry_target
return target()
File "C:\Users\genexus\AppData\Local\Programs\Python\Python39\lib\site-packages\google\api_core\grpc_helpers.py", line 74, in error_remapped_callable
raise exceptions.from_grpc_error(exc) from exc
google.api_core.exceptions.Aborted: 409 Transaction aborted. Database schema probably changed during transaction, retry may succeed. [retry_delay {
nanos: 11933644
}
]
I don't understand why it says that the Databsase schema has changed, the relations I have between thread - connection - data - upload are 1 to 1, meaning I have no parallel uploads towards the same table in spanner.
To respect the limit given by Spanner of 40.000 mutations I am using the following code:
import math
data = df.to_dict('records')
table_df = Table(holding_name, MetaData(bind=spanner_engine), autoload=True)
total_cycles = round(len(data) / int(40000 / len(data[0].keys())))
max_rows = math.floor(40000 / len(data[0].keys()))
total_cycles = 1 if total_cycles == 0 else total_cycles
data2=[]
for i in range(0, total_cycles+1):
data2.append(data[i * max_rows:(i + 1) * max_rows])
for i in range(0, len(data2)):
if len(data2[i]) == 0:
del data2[i]
for each in data2:
with spanner_engine.begin() as connection:
connection.execute(table_df.insert(), each)
Hello,
I am trying to read data from a spanner table that currently has 134.964 rows and 288 columns.
I have been using the example posted in the Github under "Read" and modying the query to read less rows between each cycle.
The first issue is that it takes a good couple minutes (Probably 3+ to execute said transactions) and the second is that they keep failing.
Here is the error: