Netflix / astyanax

Cassandra Java Client
Apache License 2.0
1.04k stars 355 forks source link

Support close idle connections and close connections that are older than some time #556

Open afilipchik opened 9 years ago

afilipchik commented 9 years ago

Given the fact that astyanax doesn't shrink it's connection pools and there is https://issues.apache.org/jira/browse/THRIFT-1457 it creates a possibility for a nasty memory leak. In our apps I saw pools consuming up to 8Gb of heap and actually doing nothing.

The solution could be to automatically shrink connection pool by closing old and idle connections. The naive implementation could be:

Connection

    void setLastUsed(long time);
    long getLastUsed();
    long getCreationTime();

ThriftConnection

   private long creationTime = System.currentTimeMillis();
   private AtomicLong lastUsed = new AtomicLong();

          @Override
        public void setLastUsed(final long time) {
            lastUsed.set(time);
        }

        @Override
        public long getLastUsed() {
            return lastUsed.get();
        }

        @Override
        public long getCreationTime() {
            return this.creationTime;
        }

SimpleHostConnectionPool

public SimpleHostConnectionPool(Host host, ConnectionFactory<CL> factory, ConnectionPoolMonitor monitor,
                                    ConnectionPoolConfiguration config, Listener<CL> listener) {
....
this.availableConnections = new LinkedBlockingDeque<Connection<CL>>();
....
}

  public void expireIdleConnections() {
        final List<Connection<CL>> connections = Lists.newArrayList();
        availableConnections.drainTo(connections);
        activeCount.addAndGet(-connections.size());

        for (Connection<CL> connection : connections) {
            try {
                if (connection instanceof com.sony.snei.kamaji.common.astyanax.pool.Connection
                    && (
                    System.currentTimeMillis() - IDLE_EXPIRE_TIME >
                        ((com.sony.snei.kamaji.common.astyanax.pool.Connection) connection).getLastUsed()
                        || System.currentTimeMillis() - MAX_TIME_TO_LIVE >
                        ((com.sony.snei.kamaji.common.astyanax.pool.Connection) connection).getCreationTime())) {
                    closedConnections.incrementAndGet();
                    connection.close(); // This is usually an async operation
                } else {
                    try {
                        availableConnections.add(connection);
                        activeCount.incrementAndGet();
                    } catch (Exception e) {
                        // Queue is full. Closing
                        closedConnections.incrementAndGet();
                        connection.close(); // This is usually an async operation
                    }
                }
            } catch (Throwable t) {
                // TODO
            }
        }
    }

public boolean returnConnection(Connection<CL> connection) {
....
   connection.setLastUsed(System.currentTimeMillis());
....

AbstractHostPartitionConnectionPool

   protected final ScheduledExecutorService executor;
   public void start() {
    ...
     this.executor.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                for (HostConnectionPool<CL> hostConnectionPool : hosts.values()) {
                    if (hostConnectionPool instanceof SimpleHostConnectionPool) {
                        ((SimpleHostConnectionPool) hostConnectionPool).expireIdleConnections();
                    }
                }
            }
        }, EXPIRE_IDLE_CONNECTIONS_INTERVAL_SECONDS, EXPIRE_IDLE_CONNECTIONS_INTERVAL_SECONDS, TimeUnit.SECONDS);
    ...
  }
tsteinmaurer commented 9 years ago

Hello Alex,

we too suffer from the fact that Astyanax doesn't close connections from time to time, especially when dealing with larger payloads over thrift. This will result in large TFramedTransport instances on the heap which will never go away.

Thomas

zznate commented 9 years ago

@afilipchik Adding reaping is a non-trivial change given the addition of a thread pool to manage (making sure it plays nice with containers, stand-alone apps, etc.).

I don't think it's a bad idea, but is there a more immediate fix of noodling the specific connection error states correctly, or, conversely, validating payloads tighter, that would be easier to drop in?

Regardless, if you submit a patch(s), I'll be happy to try them out as well given that @opuneet is usually doing like 26 things simultaneously over there :)

afilipchik commented 9 years ago

I can do a PR

but is there a more immediate fix of noodling the specific connection error states correctly

That is the thing - all connections are live and happy. Number grows when load spikes and stays there forever. Another option (and probably easier) will be to make RoundRobin and TokenAware pools to honor maxOperationsPerConnection(),

It is already in our prod and looks good :) . Number of live connections went down from 2000 to 600 in one case and from 800 to 200 in a second case.

nithinelias commented 6 years ago

Hi , has this issue been fixed in any of the versions of astyanax client library ?