aerospike / aerospike-client-java

Aerospike Java Client Library
Other
236 stars 212 forks source link

Fix inconsistent connection idle check causing java.io.EOFException #150

Closed jacinpoz closed 4 years ago

jacinpoz commented 4 years ago

After a lot of research into why java.io.EOFException is thrown in certain scenarios in our codebase (in a consistent and reproducible manner), and reading plenty of Aerospike forum threads with no real fix for this problem, I ended up adding lots of logs to the client to debug and fix the problem. This issue should theoretically never happen, as by default the client will close a connection after 55 seconds of inactivity (maxSocketIdle) and the server will close a connection after 60 seconds of inactivity (proto-fd-idle-ms).

Turns out the client "lastUsed" timestamp is only updated when the connection is put back in the pool, which is definitely not representative in several scenarios (like query operations).

SyncCommand.java does the following when sending a command and receive its reply:

        try {
            // Set command buffer.
            writeBuffer();
            // Check if total timeout needs to be changed in send buffer.
            if (totalTimeout != policy.totalTimeout) {
                // Reset timeout in send buffer (destined for server) and socket.
                    Buffer.intToBytes(totalTimeout, dataBuffer, 22);
            }

            // Send command.
            conn.write(dataBuffer, dataOffset);
            commandSentCounter++;

            // Parse results.
            parseResult(conn);

            // Put connection back in pool.
            node.putConnection(conn);

            // Command has completed successfully.  Exit method.
            return;
        }

node.putConnection(conn); will update the lastUsed time, but parseResult(conn); is what is actually reading the reply from the server. In our failing case scenario involving a query operation, parseResult(conn) is implemented by MultiCommand.java:

    @Override
    protected final void parseResult(Connection conn) throws IOException {
        // Read socket into receive buffer one record at a time.  Do not read entire receive size
        // because the thread local receive buffer would be too big.  Also, scan callbacks can nest
        // further database commands which contend with the receive buffer.
        bis = new BufferedInputStream(conn.getInputStream());
        boolean status = true;

                while (status) {
            // Read header.
                state = Command.STATE_READ_HEADER;
                dataOffset = 0;
                readBytes(8);

            long size = Buffer.bytesToLong(dataBuffer, 0);
            receiveSize = ((int) (size & 0xFFFFFFFFFFFFL));

                if (receiveSize > 0) {
                status = parseGroup();
            }
        }
    }

Following "parseGroup()" we can see that it is used to parse keys and rows coming from the server:

    /**
     * Parse all records in the group.
     */
    private final boolean parseGroup() throws IOException {
        //Parse each message response and add it to the result array
        state = Command.STATE_READ_DETAIL;
        dataOffset = 0;

        while (dataOffset < receiveSize) {
            readBytes(MSG_REMAINING_HEADER_SIZE);
            resultCode = dataBuffer[5] & 0xFF;

            // The only valid server return codes are "ok" and "not found".
            // If other return codes are received, then abort the batch.
            if (resultCode != 0) {
                if (resultCode == ResultCode.KEY_NOT_FOUND_ERROR || resultCode == ResultCode.FILTERED_OUT) {
                    if (stopOnNotFound) {
                        return false;
                    }
                }
                else {
                    throw new AerospikeException(resultCode);
                }
            }

            byte info3 = dataBuffer[3];

            // If this is the end marker of the response, do not proceed further
            if ((info3 & Command.INFO3_LAST) == Command.INFO3_LAST) {
                return false;
            }

            generation = Buffer.bytesToInt(dataBuffer, 6);
            expiration = Buffer.bytesToInt(dataBuffer, 10);
            batchIndex = Buffer.bytesToInt(dataBuffer, 14);
            fieldCount = Buffer.bytesToShort(dataBuffer, 18);
            opCount = Buffer.bytesToShort(dataBuffer, 20);

            Key key = parseKey(fieldCount);
            parseRow(key);
        }
        return true;
    }

And now checking QueryRecordCommand.java parseRow(Key key) implementation, we can see that for each record we parse we add it to the "RecordSet" result:

    @Override
    protected void parseRow(Key key) throws IOException {
        Record record = parseRecord();

        if (! valid) {
            throw new AerospikeException.QueryTerminated();
        }

        if (! recordSet.put(new KeyRecord(key, record))) {
            stop();
            throw new AerospikeException.QueryTerminated();
        }
    }

RecordSet.java contains a BlockingQueue<KeyRecord> queue field with a limited capacity, and it clearly explains the "put" operation is a blocking one in case the capacity has been reached:

/**
     * Put a record on the queue.
     */
    protected final boolean put(KeyRecord record) {
        if (! valid) {
            return false;
        }

        try {
            // This put will block if queue capacity is reached.
            *queue.put(record)*;
            return true;
        }
        catch (InterruptedException ie) {
            if (Log.debugEnabled()) {
                Log.debug("RecordSet " + executor.statement.taskId + " put interrupted");
            }

            // Valid may have changed.  Check again.
            if (valid) {
                abort();
            }
            return false;
        }
    }

Our problem here is that we have several thousand records to consume in this RecordSet from the application perspective, and for each one of those we trigger other database operations and other calculations that take a significant amount of time. It transpires that, the server has finished sending all the records to the client and the time counter for "proto-fd-idle-ms" has already started, even before we have finished consuming all the RecordSet records. However, the client code will only update "lastUsed" in `Connection.java" after all the RecordSet put operations can finish without blocking.

The end result is that by the time the client updates "lastUsed" value in Connection.java, 30 seconds have passed on the server side since the connection was marked as updated for "proto-fd-idle-ms", and the next time this connection is used (around 35-40 seconds after), the client still thinks it is a valid connection and the server has closed it already, causing the client to throw a java.io.EOFException

If the client updates the "lastUsed" value only when it has finished parsing the whole server result, it means the server "idle" counter will always start before the client, and therefore it is not difficult to imagine even more scenarios in which this could happen (network failures, cpu contention on the client, etc). The main purpose of having the client "maxSocketIdle" value to be lower than the server is to avoid this problem happening in the first place, but the current implementation can't guarantee this.

If we mark the connection as last updated just before the "write" operation happens, we can definitely guarantee that the client will expire the connection before the server does it and therefore avoid the problems. This might mean more connections are expired than they should, but at least it will not force all application developers to perform "try/catch" operations in all sorts of places to handle this error, no surprise exceptions in production code (as it was our case), and anyway when this error happens a new connection needs to be established, so we don't win anything on that side either.

I have run all the tests successfully, so my changes are not breaking any of them.

Please let me know if you want me to improve the solution for this problem.

Kind Regards, Jose Ignacio Acin Pozo

BrianNichols commented 4 years ago

Java client 4.4.7 has been released which updates connection's lastUsed just after command's last socket read instead of command end.

https://www.aerospike.com/download/client/java/4.4.7/