apache / pulsar

Apache Pulsar - distributed pub-sub messaging system
https://pulsar.apache.org/
Apache License 2.0
14.15k stars 3.57k forks source link

PIP-255: Support for Idempotent Commit and Abort. #19744

Open thetumbled opened 1 year ago

thetumbled commented 1 year ago

Motivation

Analysis

The main challenge to make commit and abort operation idempotent is that, broker will remove txnMeta from txnMetaMap as soon as the transaction is committed or aborted successfully. As there are multiple commit/abort requests trigger by client or broker itself due to transaction time out, the latter request will meet TransactionNotFound exception because broker do not hold the txnMeta corresponding transaction.

Goal

API Changes

message CommandEndTxn { required uint64 request_id = 1; optional uint64 txnid_least_bits = 2 [default = 0]; optional uint64 txnid_most_bits = 3 [default = 0]; optional TxnAction txn_action = 4; optional string client_name = 5; }

message TransactionMetadataEntry { ... optional string clientName = 13; }

enum ServerError { ... TransactionPreserverClosed = 26; // Transaction metadata preserver is closed }


- client configuration change

@ApiModelProperty( name = "clientName", value = "Client name that is used to save transaction metadata." ) private String clientName;


- broker configuration change
@FieldContext(
        category = CATEGORY_TRANSACTION,
        doc = "Max number of txnMeta of aborted transaction to persist in each TC."
        + "If the number of terminated transaction is greater than this value, the oldest terminated transaction will be "
        + "removed from the cache and persisted in the store."
        + "default value is 0, disable persistence of terminated transaction."
)
private int TransactionMetaPersistCount = 0;

@FieldContext(
        category = CATEGORY_TRANSACTION,
        doc = "Time in hour to persist the transaction metadata in TransactionMetadataPreserver."
)
private long TransactionMetaPersistTimeInHour = 72;

@FieldContext(
        category = CATEGORY_TRANSACTION,
        doc = "Interval in seconds to check the expired transaction metadata in TransactionMetadataPreserver."
)
private long TransactionMetaExpireCheckIntervalInSecond = 300;


## Implementation
Design Documents in Chinese:
https://blog.csdn.net/m0_43406494/article/details/130350491

- Each `TransactionMetadataPreserver` maintains a `Map<ClientName, List>`, which maintains the status of the latest `TransactionMetaPersistCount` transactions for each client. Before the transaction ends, the transaction metadata is appended to this List and writed to a compacted topic (persistent://pulsar/system/__terminated_txn_state_${TCid}), the key of the compacted topic is `ClientName`, so the old transaction metadata can be automatically compacted.
- Restore the maintained transaction status information from the compacted topic during TC recovery, which won't take too long time.
- To avoid that some transaction metadata of discarded `clientName` are retained in memory forever, a timer task is initiated to clear the expired transaction metadata in memory. The expiration time is `TransactionMetaPersistTimeInHour` hours. The timer task is executed every `TransactionMetaExpireCheckIntervalInSecond` seconds.
- The creation of topic `__ terminated_ txn_ state_${TCid}` utilizes the topic automatic creation mechanism, so the number of partitions and whether they are partition topics are determined by the configuration corresponding to `broker.conf`. `persistent://pulsar/system/__terminated_txn_state_${TCid}` is a system topic, and the compaction mechanism is forcibly enabled by default.
- The entire logical chain is as follows: **`OPEN status -> writing the aborting log entry to tc log -> updating status to Aborting in memory -> append metadata to TransactionMetaPreserver -> sending endTransaction command to TB,TP -> flush metadata to __terminated_txn_state_${TCid} -> Write aborted log entry to tc log -> Update status to Aborted in memory`**.
  Each step of the entire logical chain is idempotent, so it can be guaranteed to be idempotent.
- The reason why we split the operation of preserve terminated transaction metadata into `append` and `flush`?
    - To compress the message sent to `__ terminated_ txn_ state_${TCid}`.
    - For example, `append txn1 -> append txn2 -> flush txn1 -> flush txn2(No need to flush)`. If we do not split the operation, we will send `txn1, txn2` to `__ terminated_ txn_ state_${TCid}` twice, which is not necessary.
    - In my experiment, the compression ratio is about 1/4~2/3, so it is necessary to split the operation.
- Ensure `flush metadata to __terminated_txn_state_${TCid}` is before `Write aborted log entry to tc log`. If the order is reversed, the transaction metadata may be lost. For example, if the broker is shutdown after `Write aborted log entry to tc log` and before `flush metadata to __terminated_txn_state_${TCid}`, the transaction metadata will not be persisted in `__terminated_txn_state_${TCid}`.
- As `flush metadata to __terminated_txn_state_${TCid}` is before `Update status to Aborted in memory`, the transaction written to `__ terminated_ txn_ state_${TCid}` is of `Aborting` state, we should consider `Aborting` equivalent to `Aborted` in `TransactionMetadataPreserver`.
- As the `append`, `flush` methods are executed in the single thread pool in the `MLTransactionMetadataStore`, the implementation of `TransactionMetadataPreserver` does not require concurrency control.
- After the command sent to tb/tp has been successfully executed, we will remove the partitions metadata from `TransactionImpl` to save memory, which takes up most of the memory.
- Further optimization can be achieved by retaining only the aborted transaction metadata and not the committed transaction metadata. If the transaction metadata cannot be found in memory, it is considered to be a committed transaction by default. But in this way, the risk of losing data may be hidden, so **it is not adopted temporarily**. Even worse, users may not be aware of the actual capabilities that Pulsar can provide, and may complain about why Pulsar does not warn users when a loss occurs, which is unacceptable. But technically, this optimization can greatly relieve the pressure on the memory of the broker, as most of the transactions are committed.

## Reject Alternatives

### ~~Solution 1: persist txnMeta for `transactionTimeout` time.~~
The straightforward solution is to persist the txnMeta for `transactionTimeout` time. Every transaction has the property of `transactionTimeout` configured by client, after which the transaction in `OPEN` status will be set to `TIME_OUT`, and TC will try to abort the transaction actively.
If TC persist txnMeta for `transactionTimeout` time, in other words, TC do not remove txnMeta from txnMetaMap as soon as the transaction is committed or aborted, then TC can query the transaction status for subsequent commit/abort request based on txnMetaMap, which could resolve the problem.
We could analyze solution1 deeper.
![img](https://user-images.githubusercontent.com/52550727/233896559-a0589959-3978-465d-af4a-b22bd32eddf5.png)

Client start transaction `txn1` at T0, and may issue multiple commit/abort requests at T1, T2, T3. T1 is not close to `transactionTimeout`, while T2 is close to `transactionTimeout`. T3 is greater than the `transactionTimeout`.

- If client try to commit/abort at T3 for the first time, it cannot issue the commit request as the transaction has timed out.
- If client try to commit/abort at T1 for the first time, client will receive correct response and set the transaction status to `COMMITTED` or `ABORTED`. As a result, the subsequent commit/abort request will be discarded by client because client know that the transaction `txn1` has been `COMMITTED` or `ABORTED`.
- If client try to commit/abort at T2 for the first time, client could issue the commit request `req1` to TC as the transaction do not time out yet. When the request come to TC, `txn1` may be timed out as T2 is very very close to `transactionTimeout`. As a result, TC try to abort `txn1` actively ( called `req2` ) and there will be two concurrent commit/abort process `req1` and `req2`.
 - If `req1` is faster than `req2`, then client will commit successfully.
 - If `req2` is faster than `req1`, then TC will abort `txn1` and remove txnMeta from `txnMetaMap` instantly, which result into that the subsequent request `req1` fail with `TransactioinNotFound`. Though client receive the exception, the consistence of transaction can still be guaranteed as `txn1` is aborted in TC and client consider `txn1` commit failed.

#### Disadvantages:
- The persistence of transaction metadata is achieved by TC logs. If the metadata is not deleted immediately when the transaction is completed, the deletion all the tc log entries corresponding to the transaction will be delayed for a long time. If the transaction timeout is set to 1h, then the tc log entries produced in 1h will be scanned during TC recovery, which will greatly lengthen the TC recovery time and cause a long unavailable time.
- If the transaction concurrency suddenly increases and the transaction timeout time does not change, the memory size occupied by transaction metadata may suddenly increase, causing frequent GC or even OOM error, which is uncontrollable. Because as the time goes by, the transaction concurrency continue to increase, and the memory usage issues will become increasingly prominent.

Though solution1 has been implemented first, it is not a good solution. We do not adopt it finally.
If interested, you can refer to the following PR:
https://github.com/apache/pulsar/pull/19662
BewareMyPower commented 1 year ago

The PIP number is duplicated with https://github.com/apache/pulsar/issues/19705

congbobo184 commented 1 year ago

I am not sure that PIP solves some problems; let's see the above process:

  1. client sends a commit request to TC
  2. TC changed txn1 to Committing status then broker down
  3. the client will check the txn1 status in a loop
  4. when the txn1 timeout, the client will think this transaction has been committed fail.
  5. broker restart, changed txn1 to committed status

in this situation, we still cannot guarantee that messages will not be sent repeat

thetumbled commented 1 year ago

I am not sure that PIP solves some problems; let's see the above process:

  1. client sends a commit request to TC
  2. TC changed txn1 to Committing status then broker down
  3. the client will check the txn1 status in a loop
  4. when the txn1 timeout, the client will think this transaction has been committed fail.
  5. broker restart, changed txn1 to committed status

in this situation, we still cannot guarantee that messages will not be sent repeat

  1. After client commit txn1, txn1 will not be set as TIME_OUT. Only when the txn1 be in OPEN state can it be set to TIME_OUT state. org.apache.pulsar.client.impl.transaction.TransactionImpl#run

    image
  2. There may be situation similar to your concerns. For example, if broker take too much time to restart and result into long unavailable time, the client will throw following exceptions:

    2023-03-08T18:33:12,415+0800 [pulsar-client-internal-17-1] ERROR org.apache.pulsar.testclient.PerformanceProducer - Commit transaction:(7,206446) failed with exception :
    java.util.concurrent.CompletionException: org.apache.pulsar.client.api.PulsarClientException$TimeoutException: Could not get response from transaction meta store within given timeout.

    As client will wait for the response to the commit request for at most 30s default, if the unavailable time is longer than 30s, the client will throw exceptions above. At this situation, the client could not know the correct status of transaction, so it should retry to commit the transaction (query the state of txn1). I have implement such logic in PerformanceProducer. https://github.com/apache/pulsar/pull/19781

                                        if (arguments.isEnableTxnTest) {
                                            int retryTimes = arguments.retryTimes;
                                            if (isTimeOutException(exception)) {
                                                for (int i = 0; i < retryTimes; i++) {
                                                    try {
                                                        transaction.commit().get();
                                                        handleTxnOnCommitted(transaction);
                                                        break;
                                                    } catch (Exception e) {
                                                        if (!isTimeOutException(e)) {
                                                            break;
                                                        }
                                                    }
                                                }
                                            }
                                            if (transaction.getState() != Transaction.State.COMMITTED) {
                                                totalSent.add(-handleTxnOnAborted(transaction, messageFormatter,
                                                        arguments.baseDirToSaveResendTxnData));
                                            }
                                        }

    In most of time, the unavailable time is limited, so retrying policy is effective, users can control the max times to retry. I wonder whether the retrying logic should be implemented into the TransactionImpl instead of implemented by users themselves.

congbobo184 commented 1 year ago
  1. There may be situation similar to your concerns. For example, if broker take too much time to restart and result into long unavailable time, the client will throw following exceptions:
2023-03-08T18:33:12,415+0800 [pulsar-client-internal-17-1] ERROR org.apache.pulsar.testclient.PerformanceProducer - Commit transaction:(7,206446) failed with exception :
java.util.concurrent.CompletionException: org.apache.pulsar.client.api.PulsarClientException$TimeoutException: Could not get response from transaction meta store within given timeout.

if client down, and then client restart also will send the repeat message right? In most of time, the unavailable time is limited, so retrying policy is effective, users can control the max times to retry. I wonder whether the retrying logic should be implemented into the TransactionImpl instead of implemented by users themselves.

at the Pulsar Transaction in this time, I think make it for users to implement is better, Because it is best not to add this extra config until we have fully designed it

thetumbled commented 1 year ago

if client down, and then client restart also will send the repeat message right?

When client shutdown, it should persist unfinished transactions to a local snapshot and reconstruct TransactionImpl object with snapshot to commit again when restarted, or message duplication will occur.

congbobo184 commented 1 year ago

When client shutdown, it should persist unfinished transactions to a local snapshot and reconstruct TransactionImpl object with snapshot to commit again when restarted, or message duplication will occur.

I have a question, if you open a txn then client down, the txn only can wait txn timeout to end right?

thetumbled commented 1 year ago

I have a question, if you open a txn then client down, the txn only can wait txn timeout to end right?

congbobo184 commented 1 year ago

I have a question, if you open a txn then client down, the txn only can wait txn timeout to end right?

  • If a txn is opend and client shutdown before client commit it , we can just let the txn time out, which is consistent.

I am investigating how to solve the problem of zombie transaction

  • However, if this txn have been committed in broker but client do not receive the response and update the txn state before shutdown, we should reconstruct TransactionImpl object to query the txn state (that is commit again) to know the correct state of txn. We need to know whether the transaction is committed successfully in broker. If commit fail, client should resend messages in failed txn with a new txn. If commit successfully, client do not need to resend.

I think transaction should have a retention time, timeout is timeout, it can't represent the transaction need to retention. becasue some users don't need to check the transaction status again. So it's better to add a metadata in to the transaction like retention time is better.

If the retention time is 0, when the transaction timeout or ended, the transaction can be removed from the memory and storage directly. If the txn ended, we should check the txn can be removed from memory and storage by retention time.

thetumbled commented 1 year ago

I am investigating how to solve the problem of zombie transaction

what is the problem of zombie transaction?

I think transaction should have a retention time, timeout is timeout, it can't represent the transaction need to retention. becasue some users don't need to check the transaction status again. So it's better to add a metadata in to the transaction like retention time is better. If the retention time is 0, when the transaction timeout or ended, the transaction can be removed from the memory and storage directly. If the txn ended, we should check the txn can be removed from memory and storage by retention time.

congbobo184 commented 1 year ago

@thetumbled hi, I discussed with penghui, he said that because the txn in TopicTransactionBuffer has been recorded aborts txn(if transaction retention in tc, tc use memory will increase by the txn retention), so may we should continue to discuss until the solution is perfect

github-actions[bot] commented 1 year ago

The issue had no activity for 30 days, mark with Stale label.

github-actions[bot] commented 1 year ago

The issue had no activity for 30 days, mark with Stale label.