With this, If a transaction.correlation.id is provided, CUD functions having the same transaction.correlation.id will use the same connection object when interacting with the database. The connection object will not be closed until a commit or rollback query is explicitly performed via a CUD function. This is useful when performing transactions with commit and rollback.
CUD functions without a transaction.correlation.id (this is the default - same as the existing implementation) will use their own connection object, which will be closed at the end of the operation.
from InsertStream#rdbms:cud("SAMPLE_DB", "INSERT INTO Names(name) VALUES (?);", name, "t1")
select name
insert into ignoreStream;
from CommitStream#rdbms:cud("SAMPLE_DB", "COMMIT", "t1")
select *
insert into ignoreStream2;
from RollbackStream#rdbms:cud("SAMPLE_DB", "ROLLBACK", "t1")
select *
insert into ignoreStream3;
t1 is the transaction.correlation.id.
Assume the following series of events arriving at InsertStream:
{"name": "A"}
{"name": "B"}
A and B will not be immediately committed to the Names table.
After these events, if an event arrives at CommitStream, events A and B will be committed, since the CommitStream performs a COMMIT.
Instead of that, if an event arrives at RollbackStream, events A and B will be rolled back, since the RollbackStream performs a ROLLBACK.
Note
When using transaction.correlation.id, the developer should make sure that, a commit or rollback operation is performed via a CUD operation, after all the events - that are supposed to be committed/rolled back are added to the batch.
Approach
In the existing implementation, even if we disable autocommit at datasource level, we do a conn.commit() manually [1].
Now, when a transactionCorrelationId is given to a CUD function, we won't be doing this. (enableCudOperationAutocommit becomes false when a transactionCorrelationId is provided.)
if (stmt != null) {
int[] numRecords = stmt.executeBatch();
if (!conn.getAutoCommit() && enableCudOperationAutocommit) {
conn.commit();
shouldCleanupConnection = true;
}
We now have a map correlatedConnections [2] which stores created connection objects, where the keys are transaction.correlation.ids.
If transactionCorrelationId is given; when creating a new connection [3], we check if there's an unclosed connection available in the correlatedConnections map against the given transactionCorrelationId key. If so, we use that. Otherwise, we create a new connection and store it in the correlatedConnections map.
This connection is used to execute the prepared statement with the provided query.
This connection is closed when the provided query is a commit or rollback, after which, the connection is removed from the correlatedConnections map.
Purpose
With this, If a
transaction.correlation.id
is provided, CUD functions having the sametransaction.correlation.id
will use the same connection object when interacting with the database. The connection object will not be closed until acommit
orrollback
query is explicitly performed via a CUD function. This is useful when performing transactions with commit and rollback.CUD functions without a
transaction.correlation.id
(this is the default - same as the existing implementation) will use their own connection object, which will be closed at the end of the operation.Syntax
Example
t1
is thetransaction.correlation.id
. Assume the following series of events arriving atInsertStream
:{"name": "A"}
{"name": "B"}
A
andB
will not be immediately committed to theNames
table. After these events, if an event arrives atCommitStream
, eventsA
andB
will be committed, since theCommitStream
performs aCOMMIT
. Instead of that, if an event arrives atRollbackStream
, eventsA
andB
will be rolled back, since theRollbackStream
performs aROLLBACK
.Note
When using
transaction.correlation.id
, the developer should make sure that, acommit
orrollback
operation is performed via a CUD operation, after all the events - that are supposed to be committed/rolled back are added to the batch.Approach
conn.commit()
manually [1].transactionCorrelationId
is given to a CUD function, we won't be doing this. (enableCudOperationAutocommit
becomesfalse
when atransactionCorrelationId
is provided.)correlatedConnections
[2] which stores created connection objects, where the keys aretransaction.correlation.id
s.transactionCorrelationId
is given; when creating a new connection [3], we check if there's an unclosed connection available in thecorrelatedConnections
map against the giventransactionCorrelationId
key. If so, we use that. Otherwise, we create a new connection and store it in thecorrelatedConnections
map.commit
orrollback
, after which, the connection is removed from thecorrelatedConnections
map.[1] https://github.com/siddhi-io/siddhi-store-rdbms/blob/master/component/src/main/java/io/siddhi/extension/execution/rdbms/CUDStreamProcessor.java#L236 [2] https://github.com/siddhi-io/siddhi-store-rdbms/blob/7fb285902872dfbfd19216a39f619531ce5852a4/component/src/main/java/io/siddhi/extension/execution/rdbms/CUDStreamProcessor.java#L200 [3] https://github.com/siddhi-io/siddhi-store-rdbms/blob/7fb285902872dfbfd19216a39f619531ce5852a4/component/src/main/java/io/siddhi/extension/execution/rdbms/CUDStreamProcessor.java#L385