NeoZephyr / Post-It-Note

0 stars 0 forks source link

轮询 #4

Open NeoZephyr opened 2 years ago

NeoZephyr commented 2 years ago
@WebServlet(urlPatterns = "/long-polling-async", asyncSupported = true)
public class LongPollingAsyncServlet extends HttpServlet {

    private Random random = new Random();
    private final AtomicLong sequence = new AtomicLong();
    private final AtomicLong value = new AtomicLong();

    private static ThreadPoolExecutor executor = new ThreadPoolExecutor(100, 200, 50000L,
            TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(100));

    @Override
    protected void doGet(HttpServletRequest request, HttpServletResponse response)
            throws ServletException, IOException {
        final long currentSequence = sequence.incrementAndGet();
        AsyncContext asyncContext = request.startAsync();

        asyncContext.setTimeout(51000);
        asyncContext.addListener(new AsyncListener() {
            @Override
            public void onComplete(AsyncEvent event) throws IOException {

            }

            // 超时处理,asyncContext.complete() 表示请求处理完成
            @Override
            public void onTimeout(AsyncEvent event) throws IOException {
                AsyncContext asyncContext = event.getAsyncContext();
                asyncContext.complete();
            }

            @Override
            public void onError(AsyncEvent event) throws IOException {}

            @Override
            public void onStartAsync(AsyncEvent event) throws IOException {}
        });

        executor.submit(new HandlePollingTask(currentSequence, asyncContext));
    }

    class HandlePollingTask implements Runnable {
        private AsyncContext asyncContext;
        private long sequense;

        public HandlePollingTask(long sequense, AsyncContext asyncContext) {
            this.sequense = sequense;
            this.asyncContext = asyncContext;
        }

        @Override
        public void run() {
            try {
                PrintWriter out = asyncContext.getResponse().getWriter();
                int sleepSecends = random.nextInt(100);

                try {
                    TimeUnit.SECONDS.sleep(sleepSecends);
                } catch (InterruptedException e) {
                }

                long result = value.getAndIncrement();

                out.write(Long.toString(result));

            } catch (Exception e) {
                System.out.println(sequense + "handle polling failed");
            } finally {
                asyncContext.complete();
            }
        }
    }
}
NeoZephyr commented 2 years ago

轮询:客户端定时向服务器发送Ajax请求,服务器接到请求后马上返回响应信息并关闭连接。 优点:后端程序编写比较容易。 缺点:请求中有大半是无用,浪费带宽和服务器资源。 实例:适于小型应用。

长轮询:客户端向服务器发送Ajax请求,服务器接到请求后hold住连接,直到有新消息才返回响应信息并关闭连接,客户端处理完响应信息后再向服务器发送新的请求。 优点:在无消息的情况下不会频繁的请求。 缺点:服务器hold连接会消耗资源。 实例:WebQQ、Hi网页版、Facebook IM。

NeoZephyr commented 2 years ago

https://web.stanford.edu/~ouster/cgi-bin/papers/longPoll.pdf

NeoZephyr commented 2 years ago
@Slf4j
public class ConfigClient {

    private CloseableHttpClient httpClient;
    private RequestConfig requestConfig;

    public ConfigClient() {
        this.httpClient = HttpClientBuilder.create().build();
        // httpClient 客户端超时时间要大于长轮询约定的超时时间
        this.requestConfig = RequestConfig.custom().setSocketTimeout(40000).build();
    }

    @SneakyThrows
    public void longPolling(String url, String dataId) {
        String endpoint = url + "?dataId=" + dataId;
        HttpGet request = new HttpGet(endpoint);
        CloseableHttpResponse response = httpClient.execute(request);
        switch (response.getStatusLine().getStatusCode()) {
            case 200: {
                BufferedReader rd = new BufferedReader(new InputStreamReader(response.getEntity()
                        .getContent()));
                StringBuilder result = new StringBuilder();
                String line;
                while ((line = rd.readLine()) != null) {
                    result.append(line);
                }
                response.close();
                String configInfo = result.toString();
                log.info("dataId: [{}] changed, receive configInfo: {}", dataId, configInfo);
                longPolling(url, dataId);
                break;
            }
            case 304: {
                log.info("longPolling dataId: [{}] once finished, configInfo is unchanged, longPolling again", dataId);
                longPolling(url, dataId);
                break;
            }
            default: {
                throw new RuntimeException("unExcepted HTTP status code");
            }
        }
    }

    public static void main(String[] args) {
        // httpClient 会打印很多 debug 日志,关闭掉
        Logger logger = (Logger) LoggerFactory.getLogger("org.apache.http");
        logger.setLevel(INFO);
        logger.setAdditive(false);

        ConfigClient configClient = new ConfigClient();
        configClient.longPolling("http://127.0.0.1:8080/listener", "user");
    }
}

@RestController
@Slf4j
@SpringBootApplication
public class ConfigServer {

    @Data
    private static class AsyncTask {
        // 长轮询请求的上下文,包含请求和响应体
        private AsyncContext asyncContext;
        // 超时标记
        private boolean timeout;

        public AsyncTask(AsyncContext asyncContext, boolean timeout) {
            this.asyncContext = asyncContext;
            this.timeout = timeout;
        }
    }

    // guava 提供的多值 Map,一个 key 可以对应多个 value
    private Multimap<String, AsyncTask> dataIdContext = Multimaps
            .synchronizedSetMultimap(HashMultimap.create());

    private ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("longPolling-timeout-checker-%d")
            .build();
    private ScheduledExecutorService timeoutChecker = new ScheduledThreadPoolExecutor(1, threadFactory);

    // 配置监听接入点
    @RequestMapping("/listener")
    public void addListener(HttpServletRequest request, HttpServletResponse response) {
        String dataId = request.getParameter("dataId");

        // 开启异步
        AsyncContext asyncContext = request.startAsync(request, response);
        AsyncTask asyncTask = new AsyncTask(asyncContext, true);

        // 维护 dataId 和异步请求上下文的关联
        dataIdContext.put(dataId, asyncTask);

        // 启动定时器,30s 后写入 304 响应
        timeoutChecker.schedule(() -> {
            if (asyncTask.isTimeout()) {
                dataIdContext.remove(dataId, asyncTask);
                response.setStatus(HttpServletResponse.SC_NOT_MODIFIED);
                asyncContext.complete();
            }
        }, 30000, TimeUnit.MILLISECONDS);
    }

    // 配置发布接入点
    @RequestMapping("/publishConfig")
    @SneakyThrows
    public String publishConfig(String dataId, String configInfo) {
        log.info("publish configInfo dataId: [{}], configInfo: {}", dataId, configInfo);
        Collection<AsyncTask> asyncTasks = dataIdContext.removeAll(dataId);
        for (AsyncTask asyncTask : asyncTasks) {
            asyncTask.setTimeout(false);
            HttpServletResponse response = (HttpServletResponse)asyncTask.getAsyncContext().getResponse();
            response.setStatus(HttpServletResponse.SC_OK);
            response.getWriter().println(configInfo);
            asyncTask.getAsyncContext().complete();
        }
        return "success";
    }

    public static void main(String[] args) {
        SpringApplication.run(ConfigServer.class, args);
    }
}