Closed koxudaxi closed 5 years ago
@koxudaxi Maybe we could forward the continueAfterTimeout
parameter from session.execute()
to the pydataapi execute()
?
I think continueAfterTimeout
is always true.
Cursor.execute()
call DataAPI.execute()
which has continue_after_timeout
as a arugment
Also, the argument is set True
as default value.
Did you think the value is False?
Or, Did you want to set False
?
If it is always True
, then I think that is good for me!
@koxudaxi The problem is when I make many request using pydatapi
I see the following error:
(botocore.errorfactory.StatementTimeoutException) An error occurred (StatementTimeoutException) when calling the BeginTransaction operation: Request timed out
[SQL: select version()]: StatementError
Traceback (most recent call last):
File "/var/task/app.py", line 47, in post_to_db
ServerlessInsert().go(session, Variant, variant)
File "/var/task/chalicelib/queries.py", line 3, in go
session.execute(mapper.__table__.insert(), mappings)
File "/var/task/sqlalchemy/orm/session.py", line 1268, in execute
return self._connection_for_bind(bind, close_with_result=True).execute(
File "/var/task/sqlalchemy/orm/session.py", line 1130, in _connection_for_bind
engine, execution_options
File "/var/task/sqlalchemy/orm/session.py", line 431, in _connection_for_bind
conn = bind._contextual_connect()
File "/var/task/sqlalchemy/engine/base.py", line 2229, in _contextual_connect
self._wrap_pool_connect(self.pool.connect, None),
File "/var/task/sqlalchemy/engine/base.py", line 2265, in _wrap_pool_connect
return fn()
File "/var/task/sqlalchemy/pool/base.py", line 363, in connect
return _ConnectionFairy._checkout(self)
File "/var/task/sqlalchemy/pool/base.py", line 760, in _checkout
fairy = _ConnectionRecord.checkout(pool)
File "/var/task/sqlalchemy/pool/base.py", line 492, in checkout
rec = pool._do_get()
File "/var/task/sqlalchemy/pool/impl.py", line 139, in _do_get
self._dec_overflow()
File "/var/task/sqlalchemy/util/langhelpers.py", line 68, in __exit__
compat.reraise(exc_type, exc_value, exc_tb)
File "/var/task/sqlalchemy/util/compat.py", line 153, in reraise
raise value
File "/var/task/sqlalchemy/pool/impl.py", line 136, in _do_get
return self._create_connection()
File "/var/task/sqlalchemy/pool/base.py", line 308, in _create_connection
return _ConnectionRecord(self)
File "/var/task/sqlalchemy/pool/base.py", line 437, in __init__
self.__connect(first_connect_check=True)
File "/var/task/sqlalchemy/pool/base.py", line 649, in __connect
).exec_once(self.connection, self)
File "/var/task/sqlalchemy/event/attr.py", line 287, in exec_once
self(*args, **kw)
File "/var/task/sqlalchemy/event/attr.py", line 297, in __call__
fn(*args, **kw)
File "/var/task/sqlalchemy/util/langhelpers.py", line 1481, in go
return once_fn(*arg, **kw)
File "/var/task/sqlalchemy/engine/strategies.py", line 199, in first_connect
dialect.initialize(c)
File "/var/task/sqlalchemy/dialects/postgresql/base.py", line 2414, in initialize
super(PGDialect, self).initialize(connection)
File "/var/task/sqlalchemy/engine/default.py", line 291, in initialize
connection
File "/var/task/sqlalchemy/dialects/postgresql/base.py", line 2655, in _get_server_version_info
v = connection.execute("select version()").scalar()
File "/var/task/sqlalchemy/engine/base.py", line 982, in execute
return self._execute_text(object_, multiparams, params)
File "/var/task/sqlalchemy/engine/base.py", line 1155, in _execute_text
parameters,
File "/var/task/sqlalchemy/engine/base.py", line 1182, in _execute_context
e, util.text_type(statement), parameters, None, None
File "/var/task/sqlalchemy/engine/base.py", line 1466, in _handle_dbapi_exception
util.raise_from_cause(sqlalchemy_exception, exc_info)
File "/var/task/sqlalchemy/util/compat.py", line 398, in raise_from_cause
reraise(type(exception), exception, tb=exc_tb, cause=cause)
File "/var/task/sqlalchemy/util/compat.py", line 152, in reraise
raise value.with_traceback(tb)
File "/var/task/sqlalchemy/engine/base.py", line 1179, in _execute_context
context = constructor(dialect, self, conn, *args)
File "/var/task/sqlalchemy/engine/default.py", line 909, in _init_statement
self.cursor = self.create_cursor()
File "/var/task/sqlalchemy/engine/default.py", line 1042, in create_cursor
return self._dbapi_connection.cursor()
File "/var/task/pydataapi/dbapi.py", line 74, in cursor
self._data_api.begin()
File "/var/task/pydataapi/pydataapi.py", line 405, in begin
response: Dict[str, str] = self.client.begin_transaction(**options.build())
File "/var/task/botocore/client.py", line 357, in _api_call
return self._make_api_call(operation_name, kwargs)
File "/var/task/botocore/client.py", line 661, in _make_api_call
raise error_class(parsed_response, operation_name)
sqlalchemy.exc.StatementError: (botocore.errorfactory.StatementTimeoutException) An error occurred (StatementTimeoutException) when calling the BeginTransaction operation: Request timed out
[SQL: select version()]
This issue happens hen I try to use sqlalchemy
core (not orm) like this:
session.execute(ModelClass.__table__.insert(), [{"attribute": "value"}, {"attribute": "value"}])
The problem is when I make many request using pydatapi I see the following error:
Could you please tell me how many request did you call for SQL Server?
I could not found the limitation of BeginTransaction
on this official page
Also, If you can use DataAPI
, then you may be able to get a hint.
with DataAPI(...) as data_api:
data_api.batch_execute(ModelClass.__table__.insert(), [{"attribute": "value"}, {"attribute": "value"}])
@koxudaxi I make maybe ~60k concurrent requests to the DB. It is a lot, but I would think the DB could scale..
After the DB scales, I still see timeout, but I also start to see other errors like so:
Traceback (most recent call last):
File "/var/task/app.py", line 44, in post_to_db
ServerlessInsert().go(session, annotation)
File "/var/lang/lib/python3.6/contextlib.py", line 88, in __exit__
next(self.gen)
File "/var/task/chalicelib/session.py", line 24, in session_scope
session.commit()
File "/var/task/sqlalchemy/orm/session.py", line 1027, in commit
self.transaction.commit()
File "/var/task/sqlalchemy/orm/session.py", line 494, in commit
self._prepare_impl()
File "/var/task/sqlalchemy/orm/session.py", line 473, in _prepare_impl
self.session.flush()
File "/var/task/sqlalchemy/orm/session.py", line 2459, in flush
self._flush(objects)
File "/var/task/sqlalchemy/orm/session.py", line 2597, in _flush
transaction.rollback(_capture_exception=True)
File "/var/task/sqlalchemy/util/langhelpers.py", line 68, in __exit__
compat.reraise(exc_type, exc_value, exc_tb)
File "/var/task/sqlalchemy/util/compat.py", line 153, in reraise
raise value
File "/var/task/sqlalchemy/orm/session.py", line 2557, in _flush
flush_context.execute()
File "/var/task/sqlalchemy/orm/unitofwork.py", line 422, in execute
rec.execute(self)
File "/var/task/sqlalchemy/orm/unitofwork.py", line 589, in execute
uow,
File "/var/task/sqlalchemy/orm/persistence.py", line 245, in save_obj
insert,
File "/var/task/sqlalchemy/orm/persistence.py", line 1138, in _emit_insert_statements
statement, params
File "/var/task/sqlalchemy/engine/base.py", line 988, in execute
return meth(self, multiparams, params)
File "/var/task/sqlalchemy/sql/elements.py", line 287, in _execute_on_connection
return connection._execute_clauseelement(self, multiparams, params)
File "/var/task/sqlalchemy/engine/base.py", line 1107, in _execute_clauseelement
distilled_params,
File "/var/task/sqlalchemy/engine/base.py", line 1182, in _execute_context
e, util.text_type(statement), parameters, None, None
File "/var/task/sqlalchemy/engine/base.py", line 1466, in _handle_dbapi_exception
util.raise_from_cause(sqlalchemy_exception, exc_info)
File "/var/task/sqlalchemy/util/compat.py", line 398, in raise_from_cause
reraise(type(exception), exception, tb=exc_tb, cause=cause)
File "/var/task/sqlalchemy/util/compat.py", line 152, in reraise
raise value.with_traceback(tb)
File "/var/task/sqlalchemy/engine/base.py", line 1179, in _execute_context
context = constructor(dialect, self, conn, *args)
File "/var/task/sqlalchemy/engine/default.py", line 688, in _init_compiled
self.cursor = self.create_cursor()
File "/var/task/sqlalchemy/engine/default.py", line 1042, in create_cursor
return self._dbapi_connection.cursor()
File "/var/task/sqlalchemy/pool/base.py", line 965, in cursor
return self.connection.cursor(*args, **kwargs)
File "/var/task/pydataapi/dbapi.py", line 74, in cursor
self._data_api.begin()
File "/var/task/pydataapi/pydataapi.py", line 405, in begin
response: Dict[str, str] = self.client.begin_transaction(**options.build())
File "/var/task/botocore/client.py", line 357, in _api_call
return self._make_api_call(operation_name, kwargs)
File "/var/task/botocore/client.py", line 661, in _make_api_call
raise error_class(parsed_response, operation_name)
sqlalchemy.exc.StatementError: (botocore.exceptions.ClientError) An error occurred (ThrottlingException) when calling the BeginTransaction operation (reached max retries: 4): The rate exceeds the limit
Can we increase the max retry? Do you think with DataAPI
I will get better results?
@Rubyj Thank you for giving me the detail.
I guess two causes which are a bug of pydataapi
or a limitation of AWS service.
Can we increase the max retry
The retires count control is by botocore. It may be too diffcult to change the count.
Do you think with DataAPI I will get better results?
The cause may be a bug of pydataapi
, We can find a bug easily with DataAPI
.
because SQLAlchemy is complex logic. We may be confused by SQLAlchemy.
Also, I sent an email to aws to ask about error. If the cause may be a limitation of AWS service then, they will support us to resolve it.
And, I'm reading the source code of SQLAlchemy with your stack trace :)
Thank you! I believe the number of retries can be increased like this:
https://github.com/boto/botocore/issues/882#issuecomment-338846339
from botocore.config import Config
config = Config(
retries = dict(
max_attempts = 10
)
)
ec2 = boto3.client('ec2', config=config)
You may know the way. But, I write this example. Just to be sure.
from sqlalchemy.engine import create_engine
import boto3
client = boto3.client('rds-data')
engine = create_engine(
'mysql+pydataapi://', # or 'postgresql+pydataapi://',
connect_args={
'resource_arn': 'arn:aws:rds:us-east-1:123456789012:cluster:dummy',
'secret_arn': 'arn:aws:secretsmanager:us-east-1:123456789012:secret:dummy',
'database': 'test',
'client': client},
)
You may know the way. But, I write this example. Just to be sure.
from sqlalchemy.engine import create_engine import boto3 client = boto3.client('rds-data') engine = create_engine( 'mysql+pydataapi://', # or 'postgresql+pydataapi://', connect_args={ 'resource_arn': 'arn:aws:rds:us-east-1:123456789012:cluster:dummy', 'secret_arn': 'arn:aws:secretsmanager:us-east-1:123456789012:secret:dummy', 'database': 'test', 'client': client}, )
Ah, so I can pass the client
to connect_args
? Good to know.
Yes, you can do it :)
The problem seems to be that DataAPI limits the number of concurrent connections to 300. This is a limitation of AWS. I think this issue can be closed.
@Rubyj Thank you for your reporting. Where did you know about the limitation? I could not found it.
@koxudaxi From AWS support
Thank you for the continued patience.
Upon discussing the issue with our internal team, our internal team confirmed that currently Data API caps the max number of simultaneous connections to the database to 300; and throttles users requests above 1000 requests per second. They were further able to link the initial connection error [botocore.errorfactory.StatementTimeoutException: An error occurred (StatementTimeoutException)when calling the BeginTransaction operation: Connection is not available, request timed out] with this limiation.
The internal engineering team is working on removing the cap on max number of simultaneous connections to Aurora Serverless databases; and increasing the throttle limits for requests per second. Unfortunately,I do not have an ETA when the changes will be implemented. I would recommend to keep an eye on the following links to have a first hand information when the fix is implemented[1][2][3].
Having said this, I have further communicated your use case to the internal team about the expected number of requests per second for your use case in my feature request
As per the recent communication from the team, the changes to remove the cap on max number of simultaneous connections to Aurora Serverless databases; and increasing the throttle limits for requests per second will be deployed shortly.
Please accept my sincere apologies for the inconvenience caused due to the limitation.
Please let me know If you have any further questions, I will be more than happy to help you.
[1] Amazon Blog - https://aws.amazon.com/blogs/aws/ [2] Amazon Forum - https://forums.aws.amazon.com/index.jspa [3] Amazon Aurora FAQs - https://aws.amazon.com/rds/aurora/faqs/
Best regards,
Saurabh R. Amazon Web Services
It's very clear. Thank you very much.
I close this issue :)
Is there a way to use the feature of datapi continueAfterTimeout
https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/rds-data.html#RDSDataService.Client.execute_statement
Even though I am using SQLAlchemy
session.execute(query)
, is there a way to enablecontinueAfterTimeOut
?the original post was created by @Rubyj