zceolrj / Community

a demo for nodejs and angularjs
0 stars 0 forks source link

tomcat #6

Open zceolrj opened 6 years ago

zceolrj commented 6 years ago

NioEndpoint

NIO tailored thread pool, providing the following services:

  1. Socket acceptor thread
  2. Socket poller thread
  3. Worker thread pool
class NioEndpoint extends AbstractJsseEndpoint<NioChannel, SocketChannel> {
    Poller[] pollers;// in NioEndpoint
    List<Acceptor<U>> acceptors;// in AbstractEndpoint

    public void startInternal() {
        // ...... omit some code

        // Start poller threads
        pollers = new Poller[getPollerThreadCount()];
        for (int i=0;i<pollers.length;i++) {
            pollers[i] = new Poller();
            Thread pollerThread = new Thread(pollers[i], getName() + "-ClientPoller-" + i);
            pollerThread.setPriority(threadPriority);
            pollerThread.setDaemon(true);
            pollerThread.start();
        }

        // Start acceptor thread
        startAcceptorThreads();
    }
}
class AbstractEndpoint<S, U> {// S=NioChannel, U=SocketChannel
    List<Acceptor<U>> acceptors;

    protected final void startAcceptorThreads() {
        int count = getAcceptorThreadCount();
        acceptors = new ArrayList<>(count);

        for (int i=0;i<count;i++) {
            Acceptor<U> acceptor = new Acceptor<>(this);
            String threadName = getName() + "-Acceptor" + i;
            acceptor.setThreadName(threadName);
            acceptors.add(acceptor);

            Thread acceptorThread = new Thread(acceptor, threadName);
            acceptorThread.setPriority(getAcceptorThreadPriority());
            acceptorThread.setDaemon(getDaemon());
            acceptorThread.start();
        }
    }
}
class Acceptor<U> implements Runnable {// U=SocketChannel
    private final AbstractEndpoint<?, U> endpoint;

    public void run() {
        // ...... omit some code

        while(endpoint.isRunning()) {
            try {
                // ...... omit some code
                U socketChannel = endpoint.serverSocketAccept();

                // ...... omit some code
                endpoint.setSocketOptions(socketChannel);
            }
            catch(Throwable t) {
                // ...... omit some code
            }
        }
    }
}
class NioEndpoint {
    protected boolean setSocketOptions(SocketChannel socketChannel) {
        // Process the connection
        try {
            // disable blocking
            socketChannel.configureBlocking(false);
            Socket socket = socketChannel.socket();
            socketProperties.setProperties(socket);

            NioChannel channel = nioChannels.pop();
            if (channel == null) {
                // ...... omit some code
                channel = new NioChannel(socketChannel, bufhandler);
            }
            else {
                channel.setIOChannel(socket);
                channel.reset();
            }

            getPoller0().register(channel);
        }
        catch (Throwable t) {

        }
    }

    class Poller implements Runnable {
        private Selector selector;

        private final SynchronizedQueue<PollerEvent> events = new SynchronizedQueue<>();

        // Register a newly created channel with the poller
        public void register(final NioChannel channel) {
            channel.setPoller(this);
            NioSocketWrapper nioSocketWrapper = new NioSocketWrapper(channel, NioEndpoint.this);
            channel.setSocketWrapper(nioSocketWrapper);

            nioSocketWrapper.setPoller(this);
            // ...... omit some code
            PollerEvent pollerEvent = eventCache.pop();
            if (pollerEvent == null) {
                pollerEvent = new PollerEvent(channel, nioSocketWrapper, OP_REGISTER);
            }
            else {
                pollerEvent.reset(channel, nioSocketWrapper, OP_REGISTER);
            }

            addEvent(pollerEvent);
        }

        private void addEvent(PollerEvent pollerEvent) {
            events.offer(event);// add to queue
            // ...... omit some code
        }

        public void run() {
            while(true) {
                // ... omit some code
                Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();

                // Walk through the collection of ready keys and dispatch any active event.
                while (iterator != null && iterator.hasNext()) {
                    SelectionKey selectionKey = iterator.next();
                    NioSocketWrapper attachment = (NioSocketWrapper)selectionKey.attachment();

                    iterator.remove();
                    processKey(selectionKey, attachment);
                }
                // ... omit
            }
            // ... omit
        }

        protected void processKey(SelectionKey selectionKey, NioSocketWrapper nioSocketWrapper) {
            // ... omit
            processSocket(nioSocketWrapper, SocketEvent.OPEN_READ, true);
            // ... omit
        }
    }
}
class AbstractEndpoint {
    public boolean processSocket(SocketWrapperBase<S> socketWrapper, SocketEvent event, boolean dispatch) {
        // ... omit
        SocketProcessorBase<E> socketProcessor = processorCache.pop();
        if (socketProcessor == null) {
            socketProcessor = createSocketProcessor(socketWrapper, event);
        }
        else {
            socketProcessor.reset(socketWrapper, event);
        }

        Executor executor = getExecutor();
        if (dispatch && executor != null) {
            executor.execute(socketProcessor);
        }
        else {
            socketProcessor.run();
        }
        // ... omit
    }
}
class NioEndpoint {
    protected SocketProcessorBase<NioChannel> createSocketProcessor(
        SocketWrapperBase<NioChannel> socketWrapper, SocketEvent event) {
        return new SocketProcessor(socketWrapper, event);
    }

    protected class SocketProcessor extends SocketProcessorBase<NioChannel> {
        pubic SocketProcessor(SocketWrapperBase<NioChannel> socketWrapper, SocketEvent event) {
            super(socketWrapper, event);
        }

        protected void doRun() {
            // ... omit
            getHandler().process(socketWrapper, event);
            // ... omit
        }
    }
}
class AbstractProtocol {
    protected static class ConnectionHandler<S> implements AbstractEndpoint.Handler<S> {

        public SocketState process(SocketWrapperBase<S> wrapper, SocketEvent status) {
            S socket = wrapper.getSocket();
            Processor processor = connections.get(socket);
            if (processor == null) {
                processor = getProtocol().createProcessor();
                register(processor);
            }
        }
    }
}

class AbstractHttp11Protocol<S> extends AbstractProtocol<S> {
    protected Processor createProcessor() {
        Http11Processor processor = new Http11Processor(this, adapter);
        return processor;
    }
}
class Http11Processor extends AbstractProcessor {
    public SocketState service(SocketWrapperBase<?> socketWrapper) {
        // ... omit
        getAdapter().service(request, response);
        // ... omit
    }
}

AbstractProcessor extends AbstractProcessorLight {
    public SocketState process(SocketWrapperBase<?> socketWrapper, SocketEvent status) {
        // ... omit
        service(socketWrapper);
        // ... omit
    }
}
class CoyoteAdapter {
    private final Connector connector;

    public void service(org.apache.coyote.Request req, org.apache.coyote.Response res) {
        Request request = connector.createRequest();
        Response response = connector.createResponse();

        // Link objects
        request.setResponse(response);
        response.setRequest(request);

        // Calling the container
        connector.getService().getContainer().getPipeline().getFirst().invoke(request, response);
    }
}

(1)Acceptor为监听线程,Acceptor.run方法中调用SocketChannel socketChannel = endpoint.serverSocketAccept();方法阻塞,本质上调用了ServerSocketChannel.accept(); (2)Acceptor将接收到的SocketChannel添加到Poller池中的一个Poller run方法中endpoint.setSocketOptions(socketChannel); (3)Poller的run方法中将socketChannel包装成SocketProcessor (4)SocketProcessor调用getHandler获取对应的ConnectionHandler (5)ConnectionHandler将socket交由Http11Processor处理,解析http的header和body (6)Http11Processor调用service()将包装好的request和response传给CoyoteAdapter (7)CoyoteAdapter会调用invoke()把request和response传给Container

Container中依次调用各个Valve,每个Valve的作用如下: