ClickHouse / clickhouse-java

ClickHouse Java Clients & JDBC Driver
https://clickhouse.com
Apache License 2.0
1.45k stars 536 forks source link

Please clarify the logic of error handling in InputBasedPreparedStatement.executeAny #1406

Open malikas05 opened 1 year ago

malikas05 commented 1 year ago

Hi. We don't completely understand the logic behind the error handling in InputBasedPreparedStatement.executeAny(...). Our application attempts to persist a batch of statements into ReplicatedSummingMergeTree table. The method specified returns an array of long values which designates the status of executed statements: 1 - success, -3 - EXECUTE_FAILED. We rely on those returned values in our error handling/retry logic and recently, we have encountered multiple scenarios in which we fail to retry properly because of the misleading returned values. The reason that I called them misleading is due to the catch block in InputBasedPreparedStatement.executeAny(...). Here is the code of the method extracted from the official clickhouse jdbc connector:

Code example

    @Override
    protected long[] executeAny(boolean asBatch) throws SQLException {
        ensureOpen();
        boolean continueOnError = false;
        if (asBatch) {
            if (counter < 1) {
                return ClickHouseValues.EMPTY_LONG_ARRAY;
            }
            continueOnError = getConnection().getJdbcConfig().isContinueBatchOnError();
        } else {
            try {
                if (counter != 0) {
                    throw SqlExceptionUtils.undeterminedExecutionError();
                }
                addBatch();
            } catch (SQLException e) {
                clearBatch();
                throw e;
            }
        }

        long[] results = new long[counter];
        long rows = 0;
        try {
            stream.close();
            rows = executeInsert(getRequest().getStatements(false).get(0), stream.getInputStream());
            if (asBatch && getResultSet() != null) {
                throw SqlExceptionUtils.queryInBatchError(results);
            }

            if (counter == 1) {
                results[0] = rows;
            } else {
                // FIXME grpc and tcp by default can provides accurate result
                Arrays.fill(results, 1);
            }
        } catch (Exception e) {
            if (!asBatch) {
                throw SqlExceptionUtils.handle(e);
            }

            // just a wild guess...
            if (rows < 1) {
                results[0] = EXECUTE_FAILED;
            } else {
                if (rows >= counter) {
                    rows = counter;
                }
                for (int i = 0, len = (int) rows - 1; i < len; i++) {
                    results[i] = 1;
                }
                results[(int) rows] = EXECUTE_FAILED;
            }

            if (!continueOnError) {
                throw SqlExceptionUtils.batchUpdateError(e, results);
            }
            log.error("Failed to execute batch insert of %d records", counter + 1, e);
        } finally {
            clearBatch();
        }

        return results;
    }

In the catch block, all result elements are marked with 1s except for the last element which is marked with -3. This signifies that those statements marked with 1 succeeded whereas in reality they didn't -- I have verified and reproduced this myself by ensuring that no pertaining data is stored on the server.

Could you please advise why it is implemented this way? Was it done on purpose and we are just misusing the response? How should we retry the batch in failure scenarious? Should we perhaps retry and persist the whole batch again and not filter by successful and failed statements based on the results array returns from the jdbc client? Hope to receive some clarification on this.

Environment

ClickHouse server

zhicwu commented 1 year ago

Hi @malikas05, what was the exception you got? As you may notice that regardless how many batches were added through JDBC API, it's in fact one http request being sent to ClickHouse. Because of the implementation, if there's error in the end, even if ClickHouse handles all the inserts in synchronous way, it's still hard for a client to tell which exact batch was failed, and of course it's also inaccurate to mark all batches failed especially when affected row count is greater than zero. On the other hand, the affected rows returned from ClickHouse is different from traditional RDBMS, if you insert rows into a distributed table, you'll see the number amplified. I don't currently have a good way to handle this, so I only mark the last batch as failed hoping at least this will be noticed.

If you need a reliable approach for inserting, perhaps you can try async insert(#820) or transaction(#920).

malikas05 commented 1 year ago

Hi @zhicwu. Thanks for your reply.

The underlying exception that the client has thrown is as follows:

java.sql.BatchUpdateException: Code: 242, e.displayText() = DB::Exception: Table is in readonly mode (zookeeper path: /clickhouse/tables/*-2/*)
 (version 21.8.14.5 (official build))

This is just a random exception that caused the batch to fail, so previously we encountered some others that served as the culprit of the failed batch.

You are referring to the multiple batches being sent as a single http call. The method that I shared above wraps around a single batch, I believe, and works on the bunch of statements included within that batch. Could you please double check it and agree if I am correct?

You recommended to use async inserts or transactions. Could you please provide more information or share official documentation to know more about them in terms of how they work and how to use them using jdbc client?

Also, do you have something in mind how to properly retry in the scenarios that I shared above? Is it safe to retry the whole batch again?

zhicwu commented 1 year ago

The method that I shared above wraps around a single batch, I believe, and works on the bunch of statements included within that batch. Could you please double check it and agree if I am correct?

This method is used by both executeBatch() and executeUpdate(), so it handles multiple batches as well as single batches. The main issue is that the affected rows returned from ClickHouse are unreliable (e.g. ClickHouse/ClickHouse#18237). This makes it difficult to determine which specific batch or rows in a batch have failed.

Could you please provide more information or share official documentation to know more about them in terms of how they work and how to use them using jdbc client?

Async inserts can help achieve eventual consistency, which may work well for small writes. However, high volume inserts may encounter issues like ClickHouse/ClickHouse#47623. In my opinion, transaction, although still experimental, are likely easier to start with when using the JDBC API. By default transactions use auto commit, but you can manually commit or rollback as needed. Compared to async inserts, transactions provide more consistency guarantees out of the box.

do you have something in mind how to properly retry in the scenarios that I shared above? Is it safe to retry the whole batch again?

I don't have the full context on your table design, batch size, insert frequency, and data characteristics. If you would prefer an alternative approach for idempotent insert, you could consider using a staging table (truncate, insert, then insert into select) or maybe light-weight delete.

malikas05 commented 1 year ago

@zhicwu, thanks so much for your detailed answer.

We persist a set of analytical events into tables which inherit ReplicatedSummingMergeTree engine. The batches are pretty large, from 10K to 100K records with the frequency of writes between 3 and 10 seconds. I have learnt that replicated tables have a special feature called "insert deduplication", so the same data blocks can be safely retried without introducing any duplicate data. I have verified this locally, and it indeed works. However, in our design we front our local tables with Distributed tables, so our client application(writer) is pointing to them. As it transpired, data block deduplication doesn't work for Distributed table which seems to be a huge downside. Do you know if there is a way to enable it for Distributed tables as well? Otherwise, do you have other approaches in mind? We can't really change the design in terms of removing Distributed tables because 1. we are already in production, 2. we rely on Distributed tables as proxy that distributes the traffic evenly between our shards/replicas.

malikas05 commented 1 year ago

Update: Just tried to test deduplication locally again. It looks like the behavior is not deterministic for Distributed tables, and I suspect this is expected. I was generating the same inserts over and over again, deduplication works case by case, i.e., randomly. If I understand this correctly, this is due to distributed nature of the queries(inserts in our case). If the same insert is directed to the same replica node, deduplication will be properly applied as the checksums are stored in Zookeeper for each replica separately.

Do you know if there is another way to enable deduplication for Distributed tables that point to ReplicatedSummingMergeTree tables?

Thanks in advance!

zhicwu commented 1 year ago

Thank you for sharing the use case and your findings. It seems like you are trying to convince yourself to use a distributed table, even though you know that inserting into a local table is better :) If there is no single point of failure in your design, I hope you didn't mix the usage of DNS-based load balancing and distributed tables, because you only need one - in my opinion it should be the former. Apart from that, in case you want better control of deduplication, you may want to use insert_dedupplication_token by upgrading ClickHouse to 22.2+.

To summarize:

Regarding the Java library, I will need to add more tests for SRV DNS resolving and enhance the failover and retry options to support insert.

malikas05 commented 1 year ago

@zhicwu, thanks for your suggestions. Just to clarify, by alluding to DNS Load Balancing, you meant some external implementation, not something that exists in Clickhouse, correct? Ultimately, I suspect that regardless of which load balancing implementation we are relying on, deduplication won't work deterministically because there is no guarantee that on consequent retries of batch inserts the same server will handle the request; thus, properly deduplicating data blocks. Please let me know if my point is correct.

We can look into using insert_deduplication_token; however, wouldn't we face the same problem with different servers handling retries? Do you have an example using that feature in jdbc client by any chance?

Regarding the Java library, I will need to add more tests for SRV DNS resolving and enhance the failover and retry options to support insert.

I think it would be super critical to support deduplication in Distributed tables in the long run. I am not sure how feasible it is but probably something to consider.

zhicwu commented 1 year ago

Just to clarify, by alluding to DNS Load Balancing, you meant some external implementation, not something that exists in Clickhouse, correct?

Yes.

Ultimately, I suspect that regardless of which load balancing implementation we are relying on, deduplication won't work deterministically because there is no guarantee that on consequent retries of batch inserts the same server will handle the request; thus, properly deduplicating data blocks. Please let me know if my point is correct.

It's true that each time the DNS server may return a different IP address due to load balancing. However, instead of relying on the DNS server or cache to provide same IPs, you should resolve the address once per client request. Use the resolved IP to connect to a specific ClickHouse server, insert data into a local table (with replication enabled), and retry the insert using the same IP if it fails. In the next client request, resolve the DNS again to get a potentially different IP address. If you need consistent names across environments(PROD/UAT/DEV/LOCAL) or more control in a multi-datacenter deployment, consider using a service like Consul.

Apart from that, you probably don't want to use connection pool, because it's not helping in this case.

malikas05 commented 1 year ago

Thanks for your help and answers to all my questions, @zhicwu!