hortonworks / streamline

StreamLine - Streaming Analytics
Apache License 2.0
164 stars 96 forks source link

Review the transaction semantics in deploy topology #1137

Open arunmahadevan opened 6 years ago

arunmahadevan commented 6 years ago

During topology deployment where we run it in a separate thread, we keep on doing beginTransaction and commit here and here. I suspect this may not even produce the expected result. In the case of failures we rollback the txn and the last saved state here is lost.

In the case of deployment failures, I think we need to

  1. wrap the entire deploy topology in a single txn.
  2. Return the failure state back deploy
  3. rollback (to clean up any partial data in the db)
  4. start another txn, set the failure status and commit so that the state is saved.
arunmahadevan commented 6 years ago

@raju-saravanan @HeartSaVioR , please review

HeartSaVioR commented 6 years ago

@arunmahadevan Your proposal makes sense for a session, but we need to handle regarding multiple sessions so when deployment state is changing, other session should be able to know about this. (While we are deploying UI requests deploy status from another request, hence it is basically multiple sessions. Other case is that session A is deploying topology whereas session B just opens the edit page of same topology, session B should see the status of deploying topology.)

I think having transaction for each state change looks correct, but just ensure that we don't need to rollback if we can mark DEPLOYMENT_FAILED correctly.

TBH I don't see actual benefit of applying transaction in deployment, because we only write deploy state to the DB while deploying, and failure state always overwrites the partial data in DB. (Please correct me if I'm missing here.) If we fail to update DEPLOYMENT_FAILED, there's no way to represent correct state to DB (leaving the state is wrong, but rollback the state is also wrong).

raju-saravanan commented 6 years ago

@arunmahadevan @HeartSaVioR: One way I can think of resolving this issue would be to throw a CustomException from catch block of TopologyState() after the context state has been set to TOPOLOGY_STATE_DEPLOYMENT_FAILED, like below:

 public static final TopologyState TOPOLOGY_STATE_CLUSTER_ARTIFACTS_SETUP = new TopologyState() {
        @Override
        public void deploy(TopologyContext context) throws Exception {
            try {
                context.setCurrentAction("Setting up extra jars");
                Topology topology = context.getTopology();
                TopologyActions topologyActions = context.getTopologyActions();
                String mavenArtifacts = context.getTopologyActionsService().setUpExtraJars(topology, topologyActions);
                context.setMavenArtifacts(mavenArtifacts);
                context.setState(TOPOLOGY_STATE_EXTRA_JARS_SETUP);
                context.setCurrentAction("Extra jars set up");
            } catch (Exception ex) {
                LOG.error("Error while setting up extra jars", ex);
                context.setState(TOPOLOGY_STATE_DEPLOYMENT_FAILED);
                context.setCurrentAction("Extra jars setup failed due to: " + ex);
                **throw new IgnoreTransactionRollbackException(ex);**
            }
        }
    };

and catching this exception at the part where we rollback, if we the caught exception is the customException then we can commit the changes, else rollback, like below:

 public Void deployTopology(Topology topology, String asUser) throws Exception {
        TopologyContext ctx;
        try {
            transactionManager.beginTransaction(TransactionIsolation.DEFAULT);
            ctx = getTopologyContext(topology, asUser);
            transactionManager.commitTransaction();
        } catch (Exception e){
            transactionManager.rollbackTransaction();
            throw e;
        }
        LOG.debug("Deploying topology {}", topology);
        while (ctx.getState() != TopologyStates.TOPOLOGY_STATE_DEPLOYED) {
            try {
                transactionManager.beginTransaction(TransactionIsolation.DEFAULT);
                LOG.debug("Current state {}", ctx.getStateName());
                ctx.deploy();
                transactionManager.commitTransaction();
            } **catch(IgnoreTransactionRollbackException e) {
                  transactionManager.commit();
             }** catch (Exception e){
                transactionManager.rollbackTransaction();
                throw e;
            }
        }
        return null;
    }

This approach would work if the ctx is only changed by deploy() of TopologyState, if there are some other changes outside this function to ctx between beginTransaction() and commitTransaction() then that would also commited too, which may not be the desired behaviour. With this approach you would have to annotate the rest API endpoint with @UnitOfWork(transactional = false) in the resource.

satishd commented 6 years ago

Having a state like deployment_initialized would be helpful in avoiding deploy from other sessions if it is already in deployment_initialized state.

Ideally, user code should take care of begin/commit/rollback transactions with out the default thread-local transactions whenever they need to handle complex scenarios. It may be possible that multiple unit-of-works can be happening with in a REST call and they may want to commit/rollback based on their logic. StorageManager/TransactionManager should give a way to create a transaction and commit/rollback that transaction respectively. One such scenario can be like below. We may want to enhance StorageManager/TransactionManager APIs in future to support these usecases.

Txn txn = TM.createTransaction()
// part -1
storageManager.update(txn, ...)
if(condition) {
   txn.commit();
} else {
   txn.rollback()
   // subpart
   Txn txn = TM.createTransaction();
   try {
      storageManager.insert(txn, ...)
      // more code here..
      txn.commit()
    } catch(Exception ex) {
       txn.rollback();
    }
}
//subsequent parts
arunmahadevan commented 6 years ago

Having a state like deployment_initialized would be helpful in avoiding deploy from other sessions if it is already in deployment_initialized state.

Yes I think we can add such a state plus some kind of optimistic locking to prevent concurrent deployments. Before invoking the parallel thread to handle deployment here , the main thread should acquire a lock by querying the deployment state and inserting or updating an entry into the state table with the 'next' version number. Whoever gets the lock should go ahead with forking the deployment thread.

For avoiding the state rollback in case of failures, IgnoreTransactionRollbackException looks like a reasonable option for now.

@raju-saravanan can you make the above two changes ?

arunmahadevan commented 6 years ago

@HeartSaVioR , can you take look if you get time?

HeartSaVioR commented 6 years ago

@arunmahadevan @satishd I feel this is not going to be trivial with optimistic lock manner.

  1. We need to make two transactions being able to show the lock, which means manipulating the lock should be committed (please correct me if I'm wrong) and it is not rolled back automatically. Rolling back the version when failed should be done manually.

  2. Based on 1, if session A deploys topology with version v2 (current topology version is v1) and Streamline crashes, rolling back the version will not occur and lock table still marks v2, and no one can deploy with current topology v1 unless manual operation has been done.

I guess same applies on updating/querying deploy state.

From what I can think of is utilizing SELECT ... FOR UPDATE to lock the row (pessimistic lock), while I'm not sure it will work clearly across supporting databases (Oracle, PostgreSQL, MySQL). Also this assumes relevant row to exist, so we should insert one when topology is generated, and that would bring migration issue as well.

HeartSaVioR commented 6 years ago

Maybe even optimistic locking manner with transactions may behave with pessimistic locking for given row (via shared lock) based on transaction isolation level and how it is implemented in DB. But again I'm not expert on this and so not clear how it will work across supporting databases.

HeartSaVioR commented 6 years ago

We may be able to handle stale lock via introducing lock timeout (via comparing timestamp), but it will still block deployment for some periods, and even ensuring only one session picks stale lock and updates the lock to be used may need pessimistic lock.

HeartSaVioR commented 6 years ago

For avoiding the state rollback in case of failures, IgnoreTransactionRollbackException looks like a reasonable option for now.

This is separate issue and relatively easy to be addressed. I'll file an issue independently and address this for now.

HeartSaVioR commented 6 years ago

1146 for addressing failure case.

arunmahadevan commented 6 years ago

yes make sense to address #1146 first.

I feel this is not going to be trivial with optimistic lock manner.

I may be missing something but this was the idea:

  1. Main thread queries topology_state with the topology_id (id1) and state = initial 2.1 If no record, insert a new record with topology_id = id1, state = deployment_started, version=0. 2.2 else update the record and set state = deployment_started, version = version_from_step_1 + 1 where version = version_from_step_1
  2. If 2.1 or 2.2 fails, exit the deployment (some other thread started the deployment)
  3. The forked thread continues to deploy the topology.

Crashes needs to be handled via timeouts. We can discuss further and take this up based on priorities.

HeartSaVioR commented 6 years ago

Could you elaborate it with detail in point of transactions (considering transaction isolation level)? I'm not expert here, so not clear how it runs safely with multiple transactions.

HeartSaVioR commented 6 years ago

More clearer, I can see how your idea works, but we can't assume state = 'initial' because the state can be various after taking some actions. That means, someone should update state to 'initial' when starting deployment which might not still be safe.

Maybe we could have explicit lock table (@julianhyde gave an idea) with applying your idea, and both optimistic and pessimistic look possible with explicit lock table. Acquiring lock and releasing lock looks requiring separate transaction, but I might be wrong here.

satishd commented 6 years ago

It should work fine if we use isolation level as SERIALIZABLE, let me know if I am missing anything here.

satishd commented 6 years ago

topology_state needs to be added with version column. Current constraint of topologyId as PK should be changed to (topologyId, version) as UK. This will not allow dirty updates while updating the topology state. Version is used like CAS approach for updates.

hmcl commented 6 years ago

You have already run into this article, but just in case: https://stackoverflow.com/questions/2495628/mysql-transactions-across-multiple-threads

arunmahadevan commented 6 years ago

Maybe we could have explicit lock table (@julianhyde gave an idea) with applying your idea, and both optimistic and pessimistic look possible with explicit lock table. Acquiring lock and releasing lock looks requiring separate transaction, but I might be wrong here.

Not sure if I am overlooking something, But I think we either need a lock table (E.g. select * from lock_table for update) or optimistic locking with version Ids; not both. Also I don't think we would need a transaction just to acquire the lock. These would be independent sql statements (query and insert/update in the case of optimistic) with auto-commit set to true outside the context of a txn. Once the lock is acquired, the master (the one that acquired the lock) initiates the topology deployment which would execute within a txn.

HeartSaVioR commented 6 years ago

1. Before addressing this, we should know that allowing edit mode for multiple sessions is already unstable. I already filed it to #768. Concurrent topology deployment is just a part of the issue. I can't see any ideal way to resolve it without disallowing simultaneous edit. So even we can address this issue via applying some DBMS technique, #768 is still valid, and if we can apply global lock, it should be applied to enter/exit edit mode. (releasing lock when exiting should be really tricky, and we may want to have session management with session timeout. Active-active H/A does matter here: it may require session sharing.)

2. This should be taken into account for multi-databases, and we should know we can't test this well but will just encounter the issue when it doesn't work well. That's why I would want to take safest idea even if it requires lock or something more heavier thing.

3. As I commented earlier, state is not 'initial' while starting deployment. Topology state is not only for topology deployment, but also for suspending, resuming, killing, etc. So whether we should update the state to 'initial' when starting deployment (which I also concern about race condition) or query with 'non-in-progress-of-deployment' states. If they are separated from 'in-progress-of-deployment' states, we can use it for query.

4. I'm not clear what version exactly means. I was sure that it is generated value when @arunmahadevan is describing, but having it as UK makes me confused. Let's make version clear: it is generated value, not topology version, right?

5. It would be better to provide full query (with new tables if necessary) with explanation instead of enumerating available techniques. Because I think we already know that there're two ways to apply some kinds of lock which we don't need to explain again to make a loop, but I'm just not clear how we can make the thing robust with the technique across supported DBMS.

6. If we only have one Streamline instance we can just apply concurrent techniques (at least using lock) in a process to ensure this easier (doesn't mean it becomes easy to address, just easier than current), but active-active H/A does really matter. To be honest, I think it is not that easy to make Streamline robust with active-active H/A manner. 1 may be a good example of this, which multiple requests in a server may have similar issue but we can try to handle it in a process.

julianhyde commented 6 years ago

Is optimistic locking a requirement?

Optimistic locking can improve concurrency, but is more complex to implement than pessimistic locking.

From what I can tell, pessimistic locking is sufficient. Algorithm something like this:

HeartSaVioR commented 6 years ago

Pessimistic lock with lock timeout in case of crash: I commented this earlier, and I have some experiences with this, though my use case was a bit different (grab and mark "a" row atomically which table has N rows and more than N processes try to grab a row. I implemented this with SELECT FOR UPDATE which hides the row for other transactions when the row is selected from a transaction.). I also had exactly same use case (external lock table) but it was against MyISAM storage engine.

@julianhyde Let me clear on this: the sequence may like this (with explicit lock table)

  1. start tx
  2. query: SELECT timestamp, owner FROM topology_deploy_lock WHERE topology_id = <id> and (status = 'unlocked' or (timestamp < current time - lock timeout));

(current time will be provided to System.currentTimeMills() instead of relying on DB time function to make query compatible with multiple DBMS easily.)

3.a. If the row doesn't exist: 3.a.1 query: INSERT INTO topology_deploy_lock (topology_id, status, owner, timestamp) VALUES (<id>, 'locked', <owner: maybe generated UUID per request>, <timestamp>); 3.a.2 if query succeeds it grabs a lock, doesn't grab a lock (give up deployment) if the query fails with PK constraints

3.b. If the row exists: 3.b.1 query: UPDATE topology_deploy_lock SET status = 'locked', owner = '<owner: maybe generated UUID per request>', timestamp = '<timestamp>' where topology_id = '<id>' and status = 'unlocked'; 3.b.2 if query returns 1 it grabs a lock, doesn't grab a lock (give up deployment) if the query returns 0

  1. commit the transaction (or set autocommit to true for transactions dealing with lock table) -- not sure which would work best

  2. after deployment (succeed or not): query: UPDATE topology_deploy_lock SET status = 'unlocked', owner = '' WHERE topology_id = '<id>';

Please correct me if I'm missing here, and confirm it would work with Oracle.

We may also don't need to have status column for explicit lock table if we check against owner. If owner is empty it is unlocked, and if owner is not empty, it is locked.

HeartSaVioR commented 6 years ago

Maybe adding lock_owner and timestamp column to topology_state would work without having external lock table, but we should remind that topology_state is not only used for deployment, hence updating row can be done with and without locking row, which may incur unexpected behavior.

And when even locking the row and write, reading the row (current state: what the locked transaction is writing now) from other transaction should be possible.

julianhyde commented 6 years ago

Regarding using the topology_state table: I lean slightly to adding a new table, topology_lock. Then it would be safe to clean it up by deleting all rows.