mitonize / okuyama-client-java

Java client library for Okuyama KVS
Apache License 2.0
1 stars 1 forks source link

SocketStreamsの生存期間について #5

Closed otonari12 closed 8 years ago

otonari12 commented 8 years ago

1点ご確認いただきたく、連絡いたしました。

現状、SocketManager内で生成されたSocketStreamsはqueueに溜められていきますが、 queue内のSocketStreamsは、何らかのエラーが発生しない限り破棄されないため、 複数のEndpointがある際に、1つのEndpointでエラーを発生した際に、 それ以外のEndpointから作られたSocketStreamsで、queueが一杯になった場合、 SocketStreamsの破棄が行われないため、 エラーが発生していたEndpointが復帰した場合でも、そこへのアクセスが発生し難い状態になるかと思います。 その対応として、SocketStreamsを一定期間経過したときに、削除することが考えられるかと思います。

つきましては、本件の修正版を作成しましたので、取り込みの検討をいただけませんか。

※Pull requestsを出すのが適当かと思いますが、ただいまgitcommitができない環境ですので、 恐縮ですが、修正したソースをはらせてください。

otonari12 commented 8 years ago
package mitonize.datastore;

import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.UnknownHostException;
import java.nio.channels.UnresolvedAddressException;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SocketManager {
    Logger logger = LoggerFactory.getLogger(SocketManager.class);

    /**
     * 接続先のTCPエンドポイントを保持
     */
    class Endpoint {
        public Endpoint(InetAddress address, int port) {
            this.address = address;
            this.port = port;
        }
        final InetAddress address;
        final int port;
        private boolean offline = false;
        /** for canceling on being online */
        ScheduledFuture<Object> offlineManagementFuture;

        /**
         * 指定したエンドポイントのオフライン状態を設定する。
         * @param endpoint エンドポイント
         * @param offline オフラインかどうか (true:オフライン、false:オンライン)
         */
        @SuppressWarnings("unchecked")
        synchronized void markEndpointOffline(boolean offline) {
            if (offline) {
                if (!this.offline) {
                    OfflineManagementTask task = new OfflineManagementTask(this);
                    this.offlineManagementFuture = 
                            (ScheduledFuture<Object>) offlineManagementService.scheduleWithFixedDelay(task, 0, 5, TimeUnit.SECONDS);
                    this.offline = true;
                    logger.warn("Mark offline - {}:{}", this.address.getHostName(), this.port);
                }
            } else {
                ScheduledFuture<Object> future = this.offlineManagementFuture;
                if (future != null) {
                    future.cancel(true);
                    this.offlineManagementFuture = null;
                }
                this.offline = false;
                logger.warn("Mark online  - {}:{}", this.address.getHostName(), this.port);
            }
        }

        @Override
        public String toString() {
            StringBuilder builder = new StringBuilder();
            builder.append(address.getHostName());
            builder.append(":");
            builder.append(port);
            if (offline) {
                builder.append(" (offline)");
            }
            return builder.toString();
        }
    }

    /**
     * 接続先のエンドポイント(ホスト名とポートの組で"hostname:port"の形式)の配列。
     * ラウンドロビンで選択されるが、接続がオフラインであることを検知するとラウンドロビン対象から外される。
     * ただし、すべての接続先がオフラインの時は1番目(添字0)の要素が返却される。
     */
    Endpoint[] endpoints;

    /**
     * 直近で新規に開いたソケットをプールに格納した時刻。初期値は0。
     */
    long timestampLatelyPooled = 0;

    /**
     * 同時に開いているソケットの最大値。
     */
    int maxCountOfCoucurrentSockets = 0;

    /**
     * オフライン状態になったエンドポイントがオンラインになったかをバックグラウンドで検査してオンライン状態にするExecutor。
     */
    final ScheduledExecutorService offlineManagementService;

    /**
     * オフライン状態からTCP接続が確立してからオンラインにするまでに待つ時間(ミリ秒)。
     * TCPポートが開いてから実際に待ち受け可能になるまで時間がかかるサーバの場合に指定する。
     * 不要な場合は0でよい。
     */
    private int delayToMarkOnlineInMillis = 3000;

    /**
     * TCP接続が確立するかどうか確認するときのコネクションタイムアウト時間(ミリ秒)。
     */
    private int timeoutToConnectInMillis = 1000;

    /**
     * ソケットの読み取りタイムアウト時間(ミリ秒)。
     */
    private int timeoutToReadInMillis = 1000;

    /**
     * オープンしているソケット、入出力ストリームをプールするためのキュー。
     * 要求された際にキューから取り出し、使用後に返却された時にキューに戻す。
     */
    final protected ArrayBlockingQueue<SocketStreams> queue;
    final protected AtomicInteger activeSocketCount;
    final protected AtomicInteger currentEndpointIndex;

    private int maxPoolSize = 0;

    private DumpFilterStreamFactory dumpFilterStreamFactory;

    /**
     * ソケットの生存期間[ミリ秒](デフォルトは5分)
     */
    private long socketTimeToLiveInMilli = 5 * 60 * 1000;

    public SocketManager(String[] masternodes, int maxPoolSize) throws UnknownHostException {
        if (masternodes.length == 0) {
            throw new IllegalStateException("No connection endpoint setting specified.");
        }
        this.offlineManagementService = Executors.newSingleThreadScheduledExecutor();
        this.queue = new ArrayBlockingQueue<SocketStreams>(maxPoolSize);
        this.activeSocketCount = new AtomicInteger(0);
        this.currentEndpointIndex = new AtomicInteger(0);
        this.maxPoolSize = maxPoolSize;
        setEndpoints(masternodes);
    }

    private void setEndpoints(String[] nodes) throws UnknownHostException {
        this.endpoints = new Endpoint[nodes.length];
        for (int i = 0; i < nodes.length; ++i) {
            String hostname = nodes[i].split(":")[0];
            int port = Integer.parseInt(nodes[i].split(":")[1]); // May throws NumberFormatException

            if (!hostname.matches("[0-9a-zA-Z.-]+")) {
                throw new IllegalArgumentException("hostname contains illegal character. " + hostname);
            }
            if (port < 0 || port > 65535) {
                throw new IllegalArgumentException("port number must in range 0 to 65535. " + port);
            }           
            endpoints[i] = new Endpoint(InetAddress.getByName(hostname), port);
        }
    }

    @Deprecated
    public void setDumpStream(boolean b) {
        dumpFilterStreamFactory = new TextDumpFilterStreamFactory();
    }

    public boolean isDumpStream() {
        return dumpFilterStreamFactory != null;
    }

    /**
     * プールしているソケットを取り出して返却する。
     * 取り出した時にそのソケットが有効かをチェックして、無効なら新規にソケットを開く。
     * 新規のソケットは登録されているエンドポイントのうちラウンドロビンで選択され、接続に成功したものとなる。
     * 
     * @return 有効な接続先のソケット。
     * @throws IOException 有効な接続先が1つもない場合。
     */
    public SocketStreams aquire() throws IOException {
        SocketStreams socket = queue.poll();

        if(socket == null){
            return openSocket();
        }

        // ソケットに問題がある場合、そのソケットは利用せず、次のソケットを利用する
        if(!isAvailable(socket)){
            // closeSocket(socket);
            return aquire();
        }

        return socket;
    }

    /**
     * プールから取り出されたソケットを返却する。
     * 新規にオープンしたソケットのタイムスタンプを管理して、プールの上限数に達した最後のタイムスタンプよりも
     * 新しいソケットはキューに戻されない。タイムスタンプよりも古いソケットが無効だった場合はプール数が減るが、
     * 同時接続数が有効なプール数よりも増えた段階で新規に開かれ、タイムスタンプも更新される。
     * 
     * @param socket プールから取り出したソケット
     */
    public void recycle(SocketStreams socket) {
        if (socket == null)
            return;
        if (socket.timestamp > timestampLatelyPooled
                || !isAvailable(socket)
                || !queue.offer(socket)) {
            closeSocket(socket);
        }
    }

    /**
     * オフラインとマークされたエンドポイントを定期的にチェックして復帰していればオフラインとしてマークする。
     */
    class OfflineManagementTask implements Runnable {
        private Endpoint endpoint;
        public OfflineManagementTask(Endpoint endpoint) {
            this.endpoint = endpoint;
        }
        @Override
        public void run() {
            int timeout = getTimeoutToConnectInMillis();
            int delay = getDelayToMarkOnlineInMillis();
            InetSocketAddress address = new InetSocketAddress(endpoint.address, endpoint.port);
            Socket socket = new Socket();
            try {
                // 接続に成功したらオフラインマークをクリアする。
                socket.connect(address, timeout);
                Thread.sleep(delay);
                endpoint.markEndpointOffline(false);
            } catch (IOException e) {
            } catch (InterruptedException e) {
            } finally {
                try {
                    socket.close();
                } catch (IOException e) {
                }
            }
        }
    }

    /**
     * 指定したインデックスのエンドポイント情報を取得する。
     * @return エンドポイント情報。
     */
    Endpoint getEndpointAt(int i) {
        return endpoints[i];
    }

    /**
     * 次のエンドポイント情報を取得する。
     * オフラインマークが付いているものはスキップする。登録されているすべてのエンドポイントにオフラインマークが付いていた場合は1番目の要素を返す。
     * @return オフラインでないエンドポイント情報。すべてオフラインの場合は1番目の要素を返す。
     */
    Endpoint nextEndpoint() {
        int count = endpoints.length;
        int index = currentEndpointIndex.getAndSet(currentEndpointIndex.incrementAndGet() % count);
        while (count-- > 0) {
            if (index >= endpoints.length)
                index = 0;

            Endpoint endpoint = endpoints[index++];
            if (endpoint == null)
                throw new IllegalStateException("No connection endpoint setting specified.");
            /** オフラインのマークが付いていないならこれを返す */
            if (!endpoint.offline) {
                return endpoint;
            }
        }
        return endpoints[0];
    }

    /**
     * 新しくソケットを開く。登録されているエンドポイントをラウンドロビンで順次選択し、接続が確立するまで繰り返す。
     * 最後に失敗した接続試行と今回取得したエンドポイントが同じであればそれ以上の試行は行わず、例外を投げる。
     * @return 開かれたソケット
     * @throws IOException 有効な接続先が1つもないとき
     */
    @SuppressWarnings("resource")
    SocketStreams openSocket() throws IOException {
        int timeoutToConnect = timeoutToConnectInMillis;
        int timeoutToRead = timeoutToReadInMillis;

        Endpoint lastAttempt = null;
        for (int i=endpoints.length; i >= 0; --i) {
            Endpoint endpoint = nextEndpoint(); // nextEndpoint() never returns null.
            if (endpoint.equals(lastAttempt)) {
                // 一連の接続試行の最後に試したエンドポイントと今回取得したエンドポイントが同じなら失敗させる。
                break;
            }
            try {
                InetSocketAddress address = new InetSocketAddress (endpoint.address, endpoint.port);
                Socket socket = new Socket();
                socket.setSoTimeout(timeoutToRead);
                socket.connect(address, timeoutToConnect);
                OutputStream os = new BufferedOutputStream(socket.getOutputStream());
                InputStream is = new BufferedInputStream(socket.getInputStream());
                if (dumpFilterStreamFactory != null) {
                    is = dumpFilterStreamFactory.wrapInputStream(is);
                    os = dumpFilterStreamFactory.wrapOutputStream(os);
                }
                SocketStreams s = new SocketStreams(socket, os, is, socketTimeToLiveInMilli);
                int c = activeSocketCount.incrementAndGet();
                // プールする上限数に収まっていれば最近のタイムスタンプを保持しておく。
                // プール上限数を超えた場合、このタイムスタンプより新しいソケットはrecycleでプールに戻されない。
                if (c <= maxPoolSize) {
                    timestampLatelyPooled = s.timestamp;
                }
                if (c > maxCountOfCoucurrentSockets) {
                    maxCountOfCoucurrentSockets = c;
                }
                if (logger.isInfoEnabled()) {
                    logger.info("Socket opened - {}:{} count:{}", endpoint.address.getHostName(), endpoint.port, c);
                }
                if (endpoint.offline) {
                    endpoint.markEndpointOffline(false);
                }
                return s;
            } catch (UnresolvedAddressException e) {
                logger.error("Hostname cannot be resolved. {}:{} {}", endpoint.address.getHostName(), endpoint.port, e.getMessage());
                endpoint.markEndpointOffline(true);
            } catch (IOException e) {
                logger.error("Failed to open socket. {}:{} {}", endpoint.address.getHostName(), endpoint.port, e.getMessage());
                endpoint.markEndpointOffline(true);
            }
            lastAttempt = endpoint;
        }
        logger.error("No available endpoint to connect");
        throw new IOException("No available endpoint to connect");
    }

    void closeSocket(SocketStreams socket) {
        try {
            socket.getOutputStream().close();
            socket.getInputStream().close();
            socket.getSocket().close();
            int c = activeSocketCount.decrementAndGet();
            if (logger.isInfoEnabled()) {
                Socket s = socket.getSocket();
                logger.info("Socket closed - {}:{} count:{}", s.getInetAddress().getHostName(), s.getPort(), c);
            }
        } catch (IOException e) {
            e.printStackTrace();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    private boolean isAvailable(SocketStreams streams) {
        if(streams.isExpired()){
            closeSocket(streams);
            return false;
        }

        Socket s = streams.getSocket();
        if (s.isInputShutdown() || s.isOutputShutdown() || s.isClosed()) {
            try {
                s.close();
            } catch (IOException e) {
            }
            return false;
        }
        return true;
    }

    public void destroy(SocketStreams streams) {
        if (streams == null)
            return;
        Socket s = streams.getSocket();
        logger.info("Destroy connection - {}:{}", s.getInetAddress().getHostName(), s.getPort());
        try {
            streams.getOutputStream().close();
            streams.getInputStream().close();
            streams.getSocket().close();
        } catch (IOException e) {
        }
    }

    public int getMaxPoolSize() {
        return maxPoolSize;
    }

    public int getMaxCoucurrentSockets() {
        return maxCountOfCoucurrentSockets;
    }

    public void setDumpFilterStreamFactory(
            DumpFilterStreamFactory dumpFilterStreamFactory) {
        this.dumpFilterStreamFactory = dumpFilterStreamFactory;
    }

    /**
     * ソケットの読み取りタイムアウト時間(ミリ秒)を取得する。
     * @return ソケットの読み取りタイムアウト時間(ミリ秒)
     */
    public int getTimeoutToReadInMillis() {
        return timeoutToReadInMillis;
    }

    /**
     * ソケットの読み取りタイムアウト時間(ミリ秒)を設定する(デフォルト:1000ミリ秒)
     * @param timeoutToReadInMillis ソケットの読み取りタイムアウト時間(ミリ秒)
     */
    public void setTimeoutToReadInMillis(int timeoutToReadInMillis) {
        this.timeoutToReadInMillis = timeoutToReadInMillis;
    }

    /**
     * TCP接続が確立するかどうか確認するときのコネクションタイムアウト時間(ミリ秒)を取得する。
     * @return TCP接続が確立するかどうか確認するときのコネクションタイムアウト時間(ミリ秒)
     */
    public int getTimeoutToConnectInMillis() {
        return timeoutToConnectInMillis;
    }

    /**
     * TCP接続が確立するかどうか確認するときのコネクションタイムアウト時間(ミリ秒)を設定する(デフォルト:1000ミリ秒)。
     * @param timeoutToConnectInMillis TCP接続が確立するかどうか確認するときのコネクションタイムアウト時間(ミリ秒)
     */
    public void setTimeoutToConnectInMillis(int timeoutToConnectInMillis) {
        this.timeoutToConnectInMillis = timeoutToConnectInMillis;
    }

    /**
     * オフライン状態からTCP接続が確立後にオンラインにするまでに待つ時間(ミリ秒)を取得
     * @return オフライン状態からTCP接続が確立後にオンラインにするまでに待つ時間(ミリ秒)
     */
    public int getDelayToMarkOnlineInMillis() {
        return delayToMarkOnlineInMillis;
    }

    /**
     * オフライン状態からTCP接続が確立後にオンラインにするまでに待つ時間(ミリ秒)を設定する(デフォルト:3000ミリ秒)。
     * TCPポートが開いてから実際に待ち受け可能になるまで時間がかかるサーバの場合に指定する。
     * 不要な場合は0でよい。
     * @param delayToMarkOnlineInMillis オフライン状態からTCP接続が確立後にオンラインにするまでに待つ時間(ミリ秒)
     */
    public void setDelayToMarkOnlineInMillis(int delayToMarkOnlineInMillis) {
        this.delayToMarkOnlineInMillis = delayToMarkOnlineInMillis;
    }

    /**
     * 新規に作成されてから設定値の期間経過したソケットは、そのソケットがプールから選択されるタイミングで削除され、利用されなくなります。
     * 
     * @param socketTimeToLiveInMilli ソケットの生存期間[ミリ秒](デフォルトは5分)
     */
    public void setSocketTimeToLiveInMilli(long socketTimeToLiveInMilli) {
        this.socketTimeToLiveInMilli = socketTimeToLiveInMilli;
    }

    public void shutdown() {
        this.offlineManagementService.shutdown();
    }
}
otonari12 commented 8 years ago
package mitonize.datastore;

import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;

public class SocketStreams implements Comparable<SocketStreams> {
    OutputStream os;
    InputStream is;
    Socket socket;
    long timestamp;
    long expiryTime;

    public SocketStreams(Socket socket, OutputStream os, InputStream is,
            long socketTimeToLiveInMilli) {
        this.socket = socket;
        this.os = os;
        this.is = is;
        this.timestamp = System.currentTimeMillis();
        this.expiryTime = this.timestamp + socketTimeToLiveInMilli;
    }

    public OutputStream getOutputStream() {
        return os;
    }

    public InputStream getInputStream() {
        return is;
    }

    public Socket getSocket() {
        return socket;
    }

    public long getExpiryTime() {
        return expiryTime;
    }

    public void setExpiryTime(long expiryTime) {
        this.expiryTime = expiryTime;
    }

    public boolean isExpired() {
        return System.currentTimeMillis() > expiryTime;
    }

    @Override
    public int compareTo(SocketStreams other) {
        return (int) (timestamp - other.timestamp);
    }
}
otonari12 commented 8 years ago
package mitonize.datastore.okuyama;

import java.net.UnknownHostException;

import mitonize.datastore.CompressionStrategy;
import mitonize.datastore.DefaultCompressionStrategy;
import mitonize.datastore.SocketManager;
import mitonize.datastore.TextDumpFilterStreamFactory;

public class OkuyamaClientFactoryImpl implements OkuyamaClientFactory {
    private static CompressionStrategy DEFAULT_COMPRESSION_STRATEGY = new DefaultCompressionStrategy();

    private SocketManager socketManager;

    boolean compatibilityMode = true;
    private CompressionStrategy compressionStrategy = null;
    private boolean base64key = true;
    private boolean serializeString = false;

    @Override
    public OkuyamaClient createClient() {
        OkuyamaClientImpl2 okuyamaClient;
        okuyamaClient = new OkuyamaClientImpl2(socketManager, base64key, serializeString || compatibilityMode, compressionStrategy);
        return okuyamaClient;
    }

    @Override
    public void destroy() {
        this.socketManager.shutdown();
    }

    /**
     * マスターノード、最小のソケットプールサイズを指定してファクトリクラスを生成する。
     * マスターノードの指定は、[ホスト名]:[ポート番号] の形式の文字列の配列である。
     * オリジナルのOkuyamaClientから読み出せる形式で書き出すようにする(互換モード)。
     * 
     * <p>互換モードを設定すると下のような状態で稼働する。</p>
     * <ul>
     * <li>キーを必ずbase64エンコードする</li>
     * <li>文字列を格納する場合にもJavaのシリアライズを行ってからBase64エンコードする</li>
     * <li>圧縮しない(圧縮設定すると例外を発生させる)</li>
     * </ul>
     * 
     * @param masternodes マスターノード接続先の配列
     * @param minPoolSize 最小のソケットプールサイズ
     * @throws UnknownHostException 指定したホスト名のIPアドレスが取得できない場合
     * @throws IllegalArgumentException 指定したホストの形式が不正の場合
     */
    public OkuyamaClientFactoryImpl(String[] masternodes, int minPoolSize) throws UnknownHostException {
        this(masternodes, minPoolSize, true, false, null);
    }

    /**
     * マスターノード、最小のソケットプールサイズ、互換モードを指定してファクトリクラスを生成する。
     * マスターノードの指定は、[ホスト名]:[ポート番号] の形式の文字列の配列である。
     * オリジナルのOkuyamaClientからでも読み出し可能な形式で格納するかを指定する(互換モード)。
     * 互換モードを設定すると下のような状態で稼働する。
     * <ul>
     * <li>キーを必ずbase64エンコードする</li>
     * <li>文字列を格納する場合にもJavaのシリアライズを行ってからBase64エンコードする</li>
     * <li>圧縮しない(圧縮設定すると例外を発生させる)</li>
     * </ul>
     * 
     * @param masternodes マスターノード接続先の配列
     * @param minPoolSize 最小のソケットプールサイズ
     * @param compatibilityMode オリジナルOkuyamaClient互換モード
     * @throws UnknownHostException 指定したホスト名のIPアドレスが取得できない場合
     * @throws IllegalArgumentException 指定したホストの形式が不正の場合
     */
    public OkuyamaClientFactoryImpl(String[] masternodes, int minPoolSize, boolean compatibilityMode) throws UnknownHostException {
        this(masternodes, minPoolSize, compatibilityMode, false, null);
    }

    /**
     * マスターノード、最小のソケットプールサイズ、互換モード、ストリームのダンプモードを指定してファクトリクラスを生成する。
     * マスターノードの指定は、[ホスト名]:[ポート番号] の形式の文字列の配列である。
     * オリジナルのOkuyamaClientからでも読み出し可能な形式で格納するかを指定する(互換モード)。
     * 互換モードを設定すると下のような状態で稼働する。
     * <ul>
     * <li>キーを必ずbase64エンコードする</li>
     * <li>文字列を格納する場合にもJavaのシリアライズを行ってからBase64エンコードする</li>
     * <li>圧縮しない(圧縮設定すると例外を発生させる)</li>
     * </ul>
     * 
     * @param masternodes マスターノード接続先の配列
     * @param minPoolSize 最小のソケットプールサイズ
     * @param compatibilityMode オリジナルOkuyamaClient互換モード
     * @param dumpStream 入出力のストリームをダンプする(主にデバッグ用)
     * @throws UnknownHostException 指定したホスト名のIPアドレスが取得できない場合
     * @throws IllegalArgumentException 指定したホストの形式が不正の場合
     */
    public OkuyamaClientFactoryImpl(String[] masternodes, int minPoolSize, boolean compatibilityMode, boolean dumpStream) throws UnknownHostException {
        this(masternodes, minPoolSize, compatibilityMode, dumpStream, null);
    }

    /**
     * マスターノード、最小のソケットプールサイズ、互換モード、ストリームのダンプモードを指定してファクトリクラスを生成する。
     * マスターノードの指定は、[ホスト名]:[ポート番号] の形式の文字列の配列である。
     * ストリームのダンプを指定すると標準出力にOkuyamaマスターノードへの入出力データをダンプする。
     * 圧縮戦略を指定すると、キーのパターンや値のサイズに応じで圧縮の有無、圧縮アルゴリズムの選択ができる。(参照 {@link CompressionStrategy})
     * 
     * <p>
     * 互換モードを指定すると、オリジナルのOkuyamaClientからでも読み出し可能な形式で格納するように下の設定でクライアントを生成する。
     * </p>
     * <ul>
     * <li>キーを必ずbase64エンコードする</li>
     * <li>文字列を格納する場合にもJavaのシリアライズを行ってからBase64エンコードする</li>
     * <li>圧縮しない(圧縮戦略を設定すると例外を発生させる)</li>
     * </ul>
     * 
     * @param masternodes マスターノード接続先の配列
     * @param minPoolSize 最小のソケットプールサイズ
     * @param compatibilityMode オリジナルOkuyamaClient互換モード
     * @param dumpStream 入出力のストリームをダンプする(主にデバッグ用)
     * @param compressionStrategy 圧縮戦略
     * @throws UnknownHostException 指定したホスト名のIPアドレスが取得できない場合
     * @throws IllegalArgumentException 指定したホストの形式が不正の場合、互換モードを指定しているにもかかわらず圧縮戦略を指定した場合
     */
    public OkuyamaClientFactoryImpl(String[] masternodes, int minPoolSize, boolean compatibilityMode, boolean dumpStream, CompressionStrategy compressionStrategy) throws UnknownHostException {
        socketManager = new SocketManager(masternodes, minPoolSize);
        if (dumpStream) {
            TextDumpFilterStreamFactory dumpFilterStreamFactory = new TextDumpFilterStreamFactory();
            socketManager.setDumpFilterStreamFactory(dumpFilterStreamFactory);
        }

        setCompatibilityMode(compatibilityMode);
        setCompressionStrategy(compressionStrategy);
    }

    /**
     * 互換モードが設定されているかを確認する。
     * @return 互換モードに設定されているなら true
     */
    public boolean isCompatibilityMode() {
        return compatibilityMode;
    }

    /**
     * 互換モードを設定する。trueが設定された場合は圧縮は無効化される。
     * 
     * @param compatibilityMode 互換モードに設定するなら true
     */
    public void setCompatibilityMode(boolean compatibilityMode) {
        if (compatibilityMode) {
            setCompressionMode(false);
        }
        this.compatibilityMode = compatibilityMode;
    }

    /**
     * 圧縮戦略を取得する。
     * @return 設定されている圧縮戦略。未設定ならnull
     */
    public CompressionStrategy getCompressionStrategy() {
        return compressionStrategy;
    }

    /**
     * 圧縮戦略を設定する。簡易的なものは{@link DefaultCompressionStrategy}で提供される。設定を解除する場合はnullを設定する。
     * @param compressionStrategy 圧縮戦略。解除する場合はnull
     */
    public void setCompressionStrategy(CompressionStrategy compressionStrategy) {
        if (compatibilityMode && compressionStrategy != null) {
            throw new IllegalArgumentException("Compression is not supported in compatible mode");
        }
        this.compressionStrategy = compressionStrategy;
    }

    /**
     * 圧縮モードを取得する。
     * @return 圧縮する場合はtrue
     */
    public boolean isCompressionMode() {
        return compressionStrategy != null;
    }

    /**
     * 圧縮モードを設定する。圧縮する場合はデフォルトの圧縮戦略{@link DefaultCompressionStrategy}が利用される。
     * @param doCompress 圧縮する場合はtrue
     */
    public void setCompressionMode(boolean doCompress) {
        if (doCompress) {
            this.compressionStrategy = DEFAULT_COMPRESSION_STRATEGY;
        } else {
            this.compressionStrategy = null;
        }
    }

    /**
     * キーをBase64でエンコードするかを返す。
     * @return キーをBase64でエンコードするならtrue
     */
    public boolean isBase64key() {
        return base64key;
    }

    /**
     * キーをBase64でエンコードする設定をする。(デフォルト:true)
     * @param base64key キーをBase64でエンコードするならtrue
     */
    public void setBase64key(boolean base64key) {
        this.base64key = base64key;
    }

    /**
     * 文字列を格納する際にJavaのシリアライズをするかを返す。
     * @return 文字列を格納する際にJavaのシリアライズをするならtrue
     */
    public boolean isSerializeString() {
        return serializeString;
    }

    /**
     * 文字列を格納する際にJavaのシリアライズをするかを設定する。(デフォルト:false)
     * 互換モードが設定されている場合は強制的にシリアライズされる。
     * @param serializeString 文字列を格納する際にJavaのシリアライズをするならtrue
     */
    public void setSerializeString(boolean serializeString) {
        this.serializeString = serializeString;
    }

    /* DELEGATED METHODS */
    /**
     * 保持するソケットの最大数を取得する。
     * 一時的に同時利用ソケット数がこの上限値を超える場合があるが、同時使用数が落ち着くと、
     * この上限数分のソケットは切断されずに保持される。
     */
    public int getMaxPoolSize() {
        return socketManager.getMaxPoolSize();
    }

    /**
     * 同時に開いたソケットの最大値を取得する。
     * @return 同時に開いたソケットの最大値
     */
    public int getMaxCoucurrentSockets() {
        return socketManager.getMaxCoucurrentSockets();
    }

    /**
     * ソケットの読み取りタイムアウト時間(ミリ秒)を取得する。
     * @return ソケットの読み取りタイムアウト時間(ミリ秒)
     */
    public int getTimeoutToReadInMillis() {
        return socketManager.getTimeoutToReadInMillis();
    }

    /**
     * ソケットの読み取りタイムアウト時間(ミリ秒)を設定する(デフォルト:1000ミリ秒)
     * @param timeoutToReadInMillis ソケットの読み取りタイムアウト時間(ミリ秒)
     */
    public void setTimeoutToReadInMillis(int timeoutToReadInMillis) {
        socketManager.setTimeoutToReadInMillis(timeoutToReadInMillis);
    }

    /**
     * TCP接続が確立するかどうか確認するときのコネクションタイムアウト時間(ミリ秒)を取得する。
     * @return TCP接続が確立するかどうか確認するときのコネクションタイムアウト時間(ミリ秒)
     */
    public int getTimeoutToConnectInMillis() {
        return socketManager.getTimeoutToConnectInMillis();
    }

    /**
     * TCP接続が確立するかどうか確認するときのコネクションタイムアウト時間(ミリ秒)を設定する(デフォルト:1000ミリ秒)。
     * @param timeoutToConnectInMillis TCP接続が確立するかどうか確認するときのコネクションタイムアウト時間(ミリ秒)
     */
    public void setTimeoutToConnectInMillis(int timeoutToConnectInMillis) {
        socketManager.setTimeoutToConnectInMillis(timeoutToConnectInMillis);
    }

    /**
     * オフライン状態からTCP接続が確立後にオンラインにするまでに待つ時間(ミリ秒)を取得
     * @return オフライン状態からTCP接続が確立後にオンラインにするまでに待つ時間(ミリ秒)
     */
    public int getDelayToMarkOnlineInMillis() {
        return socketManager.getDelayToMarkOnlineInMillis();
    }

    /**
     * オフライン状態からTCP接続が確立後にオンラインにするまでに待つ時間(ミリ秒)を設定する(デフォルト:3000ミリ秒)。
     * TCPポートが開いてから実際に待ち受け可能になるまで時間がかかるサーバの場合に指定する。
     * 不要な場合は0でよい。
     * @param delayToMarkOnlineInMillis オフライン状態からTCP接続が確立後にオンラインにするまでに待つ時間(ミリ秒)
     */
    public void setDelayToMarkOnlineInMillis(int delayToMarkOnlineInMillis) {
        socketManager.setDelayToMarkOnlineInMillis(delayToMarkOnlineInMillis);
    }

    public SocketManager getSocketManager() {
        return socketManager;
    }

    public void setSocketManager(SocketManager socketManager) {
        this.socketManager = socketManager;
    }

    /**
     * 新規に作成されてから設定値の期間経過したソケットは、そのソケットがプールから選択されるタイミングで削除され、利用されなくなります。
     * 
     * @param socketTimeToLiveInMilli ソケットの生存期間[ミリ秒](デフォルトは5分)
     */
    public void setSocketTimeToLiveInMilli(long socketTimeToLiveInMilli) {
        this.socketManager.setSocketTimeToLiveInMilli(socketTimeToLiveInMilli);
    }
}
otonari12 commented 8 years ago
package mitonize.datastore;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketAddress;
import java.net.UnknownHostException;

import junit.framework.Assert;
import mitonize.datastore.SocketManager.Endpoint;

import org.junit.Before;
import org.junit.Test;

public class SocketManagerTest {
    ServerSocket[] serverSockets;
    String[] endpoints;
    final int ENDPOINT_COUNT = 3;

    /**
     * サーバソケットを作成してオープンしたソケットのエンドポイント情報を設定する。
     * 
     * @throws Exception
     */
    @Before
    public void setUp() throws Exception {
        serverSockets = new ServerSocket[ENDPOINT_COUNT];
        for (int i = 0; i < ENDPOINT_COUNT; ++i) {
            serverSockets[i] = new ServerSocket(0);
        }
        endpoints = new String[ENDPOINT_COUNT];
        for (int i = 0; i < ENDPOINT_COUNT; ++i) {
            endpoints[i] = "localhost:" + serverSockets[i].getLocalPort();
        }
    }

    String str(Endpoint endpoint) {
        return endpoint.toString();
        // return endpoint.address.getHostName() + ":" + endpoint.port;
    }

    String str(SocketStreams streams) {
        Socket s = streams.getSocket();
        return s.getInetAddress().getHostName() + ":" + s.getPort();
    }

    @Test
    public void testNextEndpoint() throws UnknownHostException {
        SocketManager manager = new SocketManager(endpoints, 3);
        Endpoint endpoint = manager.nextEndpoint();
        assertEquals(endpoints[1], str(endpoint));
        endpoint = manager.nextEndpoint();
        assertEquals(endpoints[2], str(endpoint));
        endpoint = manager.nextEndpoint();
        assertEquals(endpoints[0], str(endpoint));
        endpoint = manager.nextEndpoint();
        assertEquals(endpoints[1], str(endpoint));
        endpoint = manager.nextEndpoint();
        assertEquals(endpoints[2], str(endpoint));
    }

    @Test
    public void testOpenSocket() throws UnknownHostException,
            InterruptedException {
        SocketManager manager = new SocketManager(endpoints, 3);
        try {
            SocketAddress backup = serverSockets[1].getLocalSocketAddress();
            serverSockets[1].close();
            SocketStreams streams;
            streams = manager.openSocket();
            assertEquals(endpoints[2], str(streams));
            streams = manager.openSocket();
            assertEquals(endpoints[0], str(streams));
            streams = manager.openSocket();
            assertEquals(endpoints[2], str(streams));

            serverSockets[1] = new ServerSocket();
            serverSockets[1].bind(backup);
            streams = manager.openSocket();
            assertEquals(endpoints[2], str(streams));
            Thread.sleep(10000);
            streams = manager.openSocket();
            assertEquals(endpoints[0], str(streams));
            streams = manager.openSocket();
            assertEquals(endpoints[1], str(streams));
        } catch (IOException e) {
            System.err.println(e);
        }
    }

    /**
     * ソケットが時間の経過で破棄されるかテスト
     */
    @Test
    public void testSocketInvalidate() throws Exception {

        SocketManager manager = new SocketManager(endpoints, 5);
        manager.setSocketTimeToLiveInMilli(500);

        // 50msecおきに5つソケットを作成する
        SocketStreams[] pool = new SocketStreams[10];
        for (int i = 0; i < 5; i++) {
            pool[i] = manager.aquire();
            Thread.sleep(50);
        }
        // 作ったソケットをリサイクル
        for (int i = 0; i < 5; i++) {
            manager.recycle(pool[i]);
        }

        // 310msec後、最初のソケットが作られてから560msec立つのでソケット[0][1]は破棄されており、取得できない
        Thread.sleep(310);
        for (int i = 2; i < 5; i++) {
            assertEquals(String.valueOf(i), pool[i], manager.aquire());
        }
        for (int i = 0; i < 2; i++) {
            assertTrue(String.valueOf(i), pool[i].isExpired());
        }

        // 作成しているすべてのソケットを取得したので、新しいソケットができる
        pool[5] = manager.aquire();
        // 新しく追加されたものか確認
        for (int i = 0; i < 5; i++) {
            assertTrue(String.valueOf(i), !pool[i].equals(pool[5]));
        }

        // さらに50msec後、リサイクルを行う
        Thread.sleep(50);
        for (int i = 2; i < 6; i++) {
            manager.recycle(pool[i]);
        }

        // ソケット[2]は破棄されており、、取得できない
        for (int i = 3; i < 6; i++) {
            Assert.assertEquals(String.valueOf(i), pool[i], manager.aquire());
        }
        for (int i = 0; i < 3; i++) {
            assertTrue(String.valueOf(i), pool[i].isExpired());
        }

        // 作成しているすべてのソケットを取得したので、、新しいソケットができる
        pool[6] = manager.aquire();
        // 新しく追加されたものか確認
        for (int i = 0; i < 6; i++) {
            assertTrue(String.valueOf(i), !pool[i].equals(pool[6]));
        }
    }
}
mitonize commented 8 years ago

ご指摘ありがとうございます。

6 にてPRにさせていただきましたので、そちらで進行させてください。