kmansei / chatapp

0 stars 0 forks source link

ExecutorService使用時の最大接続数に関する考察 #2

Open kmansei opened 1 year ago

kmansei commented 1 year ago

Executor Serviceを使用時の実装コードは下記の通り eaaa1f4

サーバー

import java.io.BufferedReader;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

class ClientHandler implements Runnable {
    private Socket socket;
    private PrintWriter writer;

    ClientHandler(Socket socket) {
        this.socket = socket;
    }

    public void run() {
        try (InputStream in = socket.getInputStream();
                BufferedReader reader = new BufferedReader(new InputStreamReader(in));
                PrintWriter tempWriter = new PrintWriter(socket.getOutputStream(), true);
                Socket tempSocket = socket) {

            writer = tempWriter;

            String message;
            // クライアントからの投稿を待ち受ける
            while ((message = reader.readLine()) != null) {
                // 接続している全てのクライアントにブロードキャスト
                broadcast(message);
                // 受け取ったメッセージはサーバー側でも表示
                System.out.println(message);
            }

        } catch (Exception e) {
            System.out.println(e);
        } finally {
            // クライアントが切断したらハンドラを削除
            Server.handlers.remove(this);
        }
    }

    void broadcast(String message) {
        for (var handler : Server.handlers) {
            handler.writer.println(message);
        }
    }
}

// チャットサーバー
public class Server {

    // スレッドセーフなハッシュセット
    public static Set<ClientHandler> handlers = ConcurrentHashMap.newKeySet();

    // スレッドプール
    private static ExecutorService executorService = Executors.newCachedThreadPool();

    public static void main(String[] args) {
        try (ServerSocket serverSocket = new ServerSocket(1234)) {

            System.out.println("チャットサーバー起動");

            // クライアントからの接続を確認するたびにハンドラを生成、スレッドプールに渡す
            while (true) {
                var socket = serverSocket.accept();
                var clientHandler = new ClientHandler(socket);
                handlers.add(clientHandler);
                executorService.execute(clientHandler);
            }

        } catch (Exception e) {
            System.out.println(e);
        } finally {
            System.out.println("チャットサーバー停止");
        }
    }
}

クライアント


import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.PrintWriter;
import java.net.Socket;
import java.util.Scanner;
import java.util.concurrent.Executors;

// サーバーからのブロードキャストを待ち受けるスレッド
class ServerListener implements Runnable {
    private Socket socket;

    ServerListener(Socket socket) {
        this.socket = socket;
    }

    public void run() {
        try (PrintWriter writer = new PrintWriter(socket.getOutputStream(), true);
                BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream()))) {

            // サーバーからのメッセージを待ち受ける
            String message;
            while ((message = reader.readLine()) != null) {
                System.out.println(message);
            }

        } catch (Exception e) {
            System.out.println(e);
        }
    }
}

// サーバーに接続してチャットを行う
public class Client {
    private static final String SERVER_HOST = "localhost";
    private static final int SERVER_PORT = 1234;

    public static void main(String[] args) {
        try (Socket socket = new Socket(SERVER_HOST, SERVER_PORT);
                OutputStream out = socket.getOutputStream();
                PrintWriter writer = new PrintWriter(out, true);
                Scanner scanner = new Scanner(System.in)) {

            // ユーザー名の代わりにプロセスIDを使用
            var PID = ProcessHandle.current().pid();
            System.out.println("チャットサーバーと接続(PID: " + PID + ")");

            // 単一のスレッドプール
            var executorService = Executors.newSingleThreadExecutor();

            // サーバーからのブロードキャストを待ち受けるスレッドを起動
            executorService.execute(new ServerListener(socket));

            String input;
            // 入力を待ち受ける
            while (true) {

                // 空行が入力されたら終了
                input = scanner.nextLine();
                if (input == null || input.equals("")) {
                    break;
                }

                // サーバーにチャット送信
                var message = PID + ": " + input;
                writer.println(message);
            }
        } catch (Exception e) {
            System.out.println(e);
        } finally {
            System.out.println("チャットサーバーとの接続を停止");
        }
    }
}```
kmansei commented 1 year ago

サーバーのメインスレッドは

while (true) {
   var socket = serverSocket.accept();
   var clientHandler = new ClientHandler(socket);
   handlers.add(clientHandler);
   executorService.execute(clientHandler);
}

serverSocket.accept()の部分でクライアントの接続が来るまでブロッキングされています。

サーバーが生成している各スレッドは

// クライアントからの投稿を待ち受ける
while ((message = reader.readLine()) != null) {
    broadcast(message);
    System.out.println(message);
}

reader.readLine()でユーザーから入力が来るまでブロックされています。

kmansei commented 1 year ago

クライアントのメインスレッドは

while (true) {
     input = scanner.nextLine();
     :
     :
}

scanner.nextLine()でブロックされています。

クライアントが生成しているスレッドは

while ((message = reader.readLine()) != null) {
     System.out.println(message);
}

reader.readLine()の部分でブロックされています。

kmansei commented 1 year ago

よって伝統的なwhile(true)のループとは異なり、ユーザーの入力待ちの間はCPUリソースは消費されないと考えます。 しかしながらも、クライアントが退室するまでclienthanderのスレッドは解放されないため、 Executors.newCachedThreadPool(); を使用しているもののクライアントから新たな接続があるたびにClientHandlerのスレッドを生成していると思われます。

実際に上記を Executors.newFixedThreadPool(1); と置き換えたところ、ある一つのclienthandlerが上記のスレッドプールを占有してしまい、他のクライアントからの入力を受け取ることができませんでした。

よってeaaa1f4では、クライアントが接続するたびにサーバーはスレッドを生成しているため、初期実装と余り接続できる数は変わらないと考えます。