长轮询在配置平台的应用

1. 配置平台简介

2. 长轮询简介

传统的短轮询方式存在一个严重缺陷:程序在每次请求时都会新建一个HTTP请求,然而并不是每次都能返回所需的新数据。当同时发起的请求达到一定数目时,会对服务器造成较大负担。这时我们可以采用长轮询方式解决这个问题。

长轮询的基本思想是在每次客户端发出请求后,服务器检查上次返回的数据与此次请求时的数据之间是否有更新,如果有更新则返回新数据并结束此次连接,否则服务器“hold”住此次连接,直到有新数据时再返回相应。而这种长时间的保持连接可以通过设置一个较大的HTTP timeout实现。在服务端消息推送方面,长轮询有着广泛的应用。

3. 请求模型

3.1 同步请求模型

这是我们日常最常用同步请求模型,所有动作都交给同一个 Tomcat 线程处理,所有动作处理完成,线程才会被释放回线程池。

如果业务需要较长时间处理,那么这个 Tomcat 线程其实一直在被占用,随着请求越来越多,可用 I/O 线程越来越少,直到被耗尽。这时后续请求只能等待空闲 Tomcat 线程,这将会加长了请求执行时间,或者直接被拒绝,客户端会报connect refused异常。

如果客户端不关心返回业务结果,这时我们可以自定义线程池,将请求任务提交给线程池,然后立刻返回。

image-20220928175830815

3.2 异步请求模型

Servlet3 引入异步 Servelt 新特性,可以完美解决上面的需求。

异步 Servelt 执行请求流程:

  • 将请求信息解析为 HttpServletRequest
  • 分发到具体 Servlet 处理,将业务提交给自定义业务线程池,请求立刻返回,Tomcat 线程立刻被释放
  • 当业务线程将任务执行结束,将会将结果转交给 Tomcat 线程
  • 通过 HttpServletResponse 将响应结果返回给等待客户端

引入异步 Servelt3 整体流程如下:

image-20220928175843479

3.3 应用场景

1、增加系统吞吐量

拿Tomcat作为Servlet容器来说,无论是计算型请求还是IO型请求,都是交给Tomcat容器线程来建立连接和负责业务逻辑处理,如果将IO型请求或者RT(响应时间)比较高的请求业务逻辑处理,通过异步请求来实现,可以尽早地释放连接线程,业务逻辑交由业务线程池处理,那么连接线程池可以接收更多的请求,从而提高了系统吞吐量。

2、服务端消息推送

消息推送,对于一些服务端发生变更,需要向客户端发送消息通知的场景,可以通过异步请求来实现。

3.4 异步请求实现

1、 AsyncContext
  1. HttpServletRequest#startAsync 获取 AsyncContext 异步上下文对象
  2. 使用自定义的业务线程池处理业务逻辑
  3. 业务线程处理结束,通过 AsyncContext#complete 返回响应结果
ExecutorService executorService = Executors.newFixedThreadPool(10);

@RequestMapping("/hello")
public void hello(HttpServletRequest request) {
    AsyncContext asyncContext = request.startAsync();
    // 超时时间
    asyncContext.setTimeout(10000);
    executorService.submit(() -> {
        try {
            // 休眠 5s,模拟业务操作
            TimeUnit.SECONDS.sleep(5);
            // 输出响应结果
            asyncContext.getResponse().getWriter().println("hello world");
            log.info("异步线程处理结束");
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            asyncContext.complete();
        }
    });
    log.info("servlet 线程处理结束");
}
2、 DeferredResult

Servlet3.0 提供了异步处理请求的特性,DeferredResult 是Spring基于 Servlet 3.0 对异步请求的支持实现,SpringMVC 3.2 之后引入新的类 DeferredResult,目的是对于请求提供异步处理方式,释放容器连接,支持更多的并发。或者基于它的超时机制来做一些长轮询相关的事情。

ExecutorService executorService = Executors.newFixedThreadPool(10);

@RequestMapping("/hello_v1")
public DeferredResult<String> hello_v1() {
    // 设置超时时间
    DeferredResult<String> deferredResult = new DeferredResult<>(7000L);
    // 异步线程处理结束,将会执行该回调方法
    deferredResult.onCompletion(() -> {
        log.info("异步线程处理结束");
    });
    // 如果异步线程执行时间超过设置超时时间,将会执行该回调方法
    deferredResult.onTimeout(() -> {
        log.info("异步线程超时");
        // 设置返回结果
        deferredResult.setErrorResult("timeout error");
    });
    deferredResult.onError(throwable -> {
        log.error("异常" throwable);
        // 设置返回结果
        deferredResult.setErrorResult("other error");
    });
    executorService.submit(() -> {
        try {
            TimeUnit.SECONDS.sleep(5);
            deferredResult.setResult("hello_v1");
            // 设置返回结果
        } catch (Exception e) {
            e.printStackTrace();
            // 若异步方法内部异常
            deferredResult.setErrorResult("error");
        }
    });
    log.info("servlet 线程处理结束");
    return deferredResult;

}

4. 长轮询的简易实现

服务端

@Slf4j
@RestController
public class LongPollingController {

    //模拟配置表
    private HashMap<String, HashMap<String, Item>> configs = new HashMap<>();

    //长轮询超时时间
    private static final long TIMEOUT = 30 * 1000;//30 seconds
  
    //key:配置ID,value:DeferredResult
    private Multimap<String, DeferredResult<ResponseEntity<Item>>>
            deferredResults = Multimaps.synchronizedSetMultimap(HashMultimap.create());
  
    //长轮询超时默认返回
    private static final ResponseEntity<String>
            NOT_MODIFIED_RESPONSE = new ResponseEntity<>(HttpStatus.NOT_MODIFIED);

    /**
     * 初始化配置内容
     */
    @PostConstruct
    public void init() {
        HashMap<String, Item> config = new HashMap<>();
        config.put("key1", new Item("key1", "value1"));
        config.put("key2", new Item("key2", "value2"));
        config.put("key3", new Item("key3", "value3"));

        configs.put("1", config);
    }

    /**
     * 更新并发布配置内容
     *
     * @param configId
     * @param key
     * @param value
     */
    @RequestMapping("/updateAndPublish/{configId}/{key}/{value}")
    public void updateAndPublish(@PathVariable String configId, @PathVariable String key, @PathVariable String value) {
        HashMap<String, Item> config = configs.get(configId);
        if (config == null || !config.containsKey(key)) {
            return;
        }
        Item item = new Item(key, value);
        config.put(key, item);

        if (!deferredResults.containsKey(configId)) {
            return;
        }
        Collection<DeferredResult<ResponseEntity<Item>>> results = deferredResults.get(configId);
        Iterator<DeferredResult<ResponseEntity<Item>>> iterator = results.iterator();
        while (iterator.hasNext()) {
            DeferredResult deferredResult = iterator.next();
            deferredResult.setResult(new ResponseEntity<>(item, HttpStatus.OK));
        }
    }

    /**
     * 长轮询接口
     * @param configId
     * @return
     */
    @GetMapping("/longPolling/{configId}")
    public DeferredResult<ResponseEntity<Item>> longPolling(@PathVariable String configId) {
        DeferredResult<ResponseEntity<Item>> deferredResult = new DeferredResult<>(TIMEOUT, NOT_MODIFIED_RESPONSE);
        deferredResult.onTimeout(() -> {
            log.info("长轮询超时");
        });
        deferredResult.onCompletion(() -> {
            log.info("调用完成, 返回内容:{} ", deferredResult.getResult());
            deferredResults.remove(configId, deferredResult);
        });
        deferredResults.put(configId, deferredResult);
        return deferredResult;
    }
}

客户端

    public void pull() throws InterruptedException {
        OkHttpClient client = new OkHttpClient().newBuilder().readTimeout(35, TimeUnit.SECONDS).build();
        Request request = new Request.Builder().url("http://localhost:8081/longPolling/1").get().build();
        Response response;
        for (; ; ) {
            try {
                response = client.newCall(request).execute();
                if (response.code() == 304) {
                    log.info("长轮询超时,重新发起请求");
                } else if (response.code() == 200) {
                    log.info("收到通知,内容:{}", response.body().string());
                }
            } catch (IOException e) {
                log.info("调用异常,即将重试,{}",e.getMessage());
                TimeUnit.SECONDS.sleep(5);
            }
        }
    }

5. 详细设计

5.1 集群问题

5.1.1 问题描述

  • ConfigClient:长轮询客户端,拉取配置。
  • ConfigService:长轮询服务端,提供拉取配置,发布配置接口。
  • AdminClient:管理后台客户端,发布配置。

​ 由于配置服务需要集群部署,ConfigClient 和 AdminClient 在调用时可能负载均衡到不同的 ConfigService,而 DeferredResult 只保存在服务器本地,AdminClient发布配置通知不到客户端。

image-20221008153606506

5.1.2 解决

1、负载均衡

ConfigClient和AdminClient使用相同的负载均衡策略,通过对配置名进行Hash取余,使它们根据配置名定位到相同的服务器。

image-20221008160039083

2、广播

AdminClient发布配置时,将这个消息通知到每一个服务器,每个服务器收到消息后,判断当前没有监听这个配置,没有就不进行处理。至于通知的形式,可以采取MQ或者扫描MySQL表。

image-20221008155338252

5.2 消息丢失与重复

5.2.1 问题描述

1、消息丢失
  • 发布的时候其中一台客户端网络闪断,导致接受不到更新通知,那么这个客户端将更新不了,除非在网络正常的时候再次发布。
  • 长轮询需要客户端发起请求,在收到响应和发起下一次请求之间有一瞬间的时间间隙,如果在这个瞬间刚好发布,那么将通知不到客户端。

image-20220929172642836

2、消息重复
  • 在3.1集群问题中,如果采取了广播的形式,无论是MQ或者扫描数据库表都有可能会有消息的延迟,导致不同服务器会收到的通知的时间有先后。

image-20220929172650681

5.2.2 NotificationId机制

  1. 请求远端服务,带上自己的配置ID以及notification信息

  2. 服务端针对传过来的每一个configId和对应的notificationId,检查notificationId是否是最新的

  3. 如果都是最新的,则保持住请求60秒,如果60秒内没有配置变化,则返回HttpStatus 304。如果60秒内有配置变化,则返回对应configId的最新notificationId, HttpStatus 200。

  4. 如果传过来的notification信息中发现有notificationId比服务端老,则直接返回对应configId的最新notificationId, HttpStatus 200。

  5. 客户端拿到服务端返回后,判断返回的HttpStatus

  6. 如果返回的HttpStatus是304,说明配置没有变化,重新执行第1步

  7. 如果返回的HttpStauts是200,说明配置有变化,针对变化的configId重新去服务端拉取配置。

    image-20220929172628099

6.QA

6.1 为什么采用长轮询,而不是基于TCP实现的长连接?

1、长轮询基于 HTTP 协议,有更高的通用性和接入的简便性,考虑到客户端可能有来自各种各样的语言或者平台,而且 HTTP 对客户端来说接入成本相对较低。

2、长连接需要对 socket 做保活,有两种方式:

保活方案 介绍 优点 缺点
传输层 TCP KeepAlive TCP 连接闲置一段时间后,通过发送数据包(ack包)等待回复确认。几次都没有回复的话,认为断开。 使用简单,可以利用 TCP 协议提供的检活。 1、KeepAlive 的目的是探测连接是否存在,无法检测能不能发送数据,比如服务器由于负载过大到处无法响应请求,应用层的的原因导致数据无法传输,但是连接还是正常。 2、如果TCP连接的一端断网或者断电,应用层并不知晓,继续发送数据,这个数据包的优先级是高于 KeepAlive 的数据包,因此这个 KeepAlive 包是无法发送出去的,只有在长时间的重传失败后,我们才能判断连接断开,这段长时间,应用及其容易产生业务逻辑BUG。
应用层 HeartBeat 客户端每隔一小段时间向服务器发送一个数据包,通知服务器自己仍然在线。 自己实现检测机制,有更高的灵活性。 缺点就是要应用层自己实现,自己利用 socket 编程实现。

所以我们需要自己在应用层实现心跳机制,更好的方式是接入 im 系统,但是作为一个基础平台性的服务,应该尽量减少依赖。

3、为了能够支持尽可能多的客户端连接数,需要服务端采用比较合理的 IO 处理模型(Reactor模型),需要 Netty 编程,处理起来相对麻烦。而长轮询是异步请求,跟 NIO 有异曲同工之处,理论上性能表现相差不大。

4、长轮询其实是推拉结合,如果 IM 消息丢失,那么客户端将长时间无法更新,直到下次发布消息到来。长轮询通过NotificationId机制可以防止消息丢失,消息丢失时,下次长轮询请求通过NotificationId对比,将直接通知客户端。

6.2 单机能支持多少个客户端接入

理论上取决于服务器内存的大小和系统对最大连接数的限制。在本机测试时,连接数到 3000 会 OOM,因为 JVM 默认最大堆为 512m,调为 1.5G 后,可以支撑8k 连接(Jmeter线程限制单机4k)。