datastax / cql-proxy

A client-side CQL proxy/sidecar.
Apache License 2.0
172 stars 82 forks source link

URGENT: Multiple Requests Handle Issue #66

Open datalatics-official opened 2 years ago

datalatics-official commented 2 years ago

Hello @mpenick

Currently I'm facing another issue. This is related to timeout. Let me paste the error message here:

Query (SELECT * FROM table_name WHERE id=?;) failed com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (tried: host:9042 (com.datastax.driver.core.exceptions.DriverException: Timeout while trying to acquire available connection (you may want to increase the driver number of per-host connections)))

I've event increase the per host connections. Let share that part of code with you here:

private val cluster = Cluster.builder() .addContactPoints(configuration.astraHostnames: _*) .withPort(9042) .withPoolingOptions(new PoolingOptions().setConnectionsPerHost(HostDistance.LOCAL, 40, 10000).setConnectionsPerHost(HostDistance.REMOTE, 20, 5000)) .withSocketOptions(new SocketOptions().setReadTimeoutMillis(configuration.cassandraReadTimeout)) .withLoadBalancingPolicy(LatencyAwarePolicy.builder(new TokenAwarePolicy(new RoundRobinPolicy())).build) .withRetryPolicy(DefaultRetryPolicy.INSTANCE) .withSpeculativeExecutionPolicy(new ConstantSpeculativeExecutionPolicy(500, 2)) .build()

I think driver is working properly which is v.2.1.10.3. But my cql-proxy service is unable to handle multiple requests in hundreds at the same time. Could you please share that how many request cql-proxy can handle at a time? and how I can change that number?

mpenick commented 2 years ago

Thanks for the report. I'll work on reproducing the issue tomorrow.

mpenick commented 2 years ago

@datalatics-official Could you please describe your workload?

Are you sending multiple requests asynchronously?

Also, those numbers for connections per host are really high. The driver does coalescing of requests and having more connections can mitigate the benefit of that.

mpenick commented 2 years ago

I'm unable to reproduce using driver v2.1.10.3 using the following for load:

package org.example;

import com.codahale.metrics.ConsoleReporter;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Timer;
import com.codahale.metrics.Timer.Context;
import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.ConsistencyLevel;
import com.datastax.driver.core.HostDistance;
import com.datastax.driver.core.PoolingOptions;
import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.ResultSetFuture;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.SocketOptions;
import com.datastax.driver.core.policies.ConstantSpeculativeExecutionPolicy;
import com.datastax.driver.core.policies.DefaultRetryPolicy;
import com.datastax.driver.core.policies.LatencyAwarePolicy;
import com.datastax.driver.core.policies.RoundRobinPolicy;
import com.datastax.driver.core.policies.TokenAwarePolicy;
import com.google.common.util.concurrent.Futures;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class App
{
    private static final int  numThreads = 10;
    private static final MetricRegistry metrics = new MetricRegistry();
    private static final Timer requests = metrics.timer("requests");
    private static final ExecutorService executor = Executors.newFixedThreadPool(numThreads);

    public static void main( String[] args ) throws InterruptedException {
        ConsoleReporter reporter = ConsoleReporter.forRegistry(metrics)
            .convertRatesTo(TimeUnit.SECONDS)
            .convertDurationsTo(TimeUnit.MILLISECONDS)
            .build();
        reporter.start(2, TimeUnit.SECONDS);

        Cluster cluster = null;
        try {
            cluster = Cluster.builder()
                .addContactPoint("127.0.0.1")
                .withPort(9042)
                .withPoolingOptions(new PoolingOptions().setConnectionsPerHost(HostDistance.LOCAL,  40, 10000).setConnectionsPerHost(HostDistance.REMOTE, 20, 5000))
                .withSocketOptions(new SocketOptions().setReadTimeoutMillis(5000))
                .withLoadBalancingPolicy(LatencyAwarePolicy.builder(new TokenAwarePolicy(new RoundRobinPolicy())).build())
                .withRetryPolicy(DefaultRetryPolicy.INSTANCE)
                .withSpeculativeExecutionPolicy(new ConstantSpeculativeExecutionPolicy(500, 2))
                .build();
            Session session = cluster.connect("testks1");

            PreparedStatement preparedStatement = session.prepare("INSERT INTO testks1.test (pk) VALUES(?)").setConsistencyLevel(
                ConsistencyLevel.LOCAL_QUORUM);

            for (int i = 0; i < numThreads; ++i) {
                final long value = i;
                executor.submit(() -> {
                    while (true) {
                        try {
                            try (Context ignored = requests.time()) {
                                final int numRequests = 100;
                                List<ResultSetFuture> futures = new ArrayList<>(numRequests);
                                for (int j = 0; j < numRequests; ++j) {
                                    futures.add(
                                        session.executeAsync(preparedStatement.bind(value)));
                                }
                                Futures.successfulAsList(futures).get();
                            }
                        } catch (Exception e) {
                            System.err.println(e);
                        }
                    }
                });
            }

            while (!executor.awaitTermination(9999, TimeUnit.DAYS)) {
            }
        } finally {
            if (cluster != null) cluster.close();
        }
    }
}
datalatics-official commented 2 years ago

@mpenick Thanks for quick response. Let me check this in details. Get back to you soon with my feedback.