前言

博文也好久没有更新了,这个月都忙于技术调研,比如配置中心的方案、服务端音频处理技术等,没空下来去整理相应的技术点。自在调研配置中心Apollo的时候发现一个以前不用的功能—— DeferredResult,今天介绍下这个功能点,这是Servlet3异步请求带来的新特性,Spring的实现方式。

一、背景

1.1 同步请求模型

WechatIMG14

传统的同步请求模式,所有的请求都交给tomcat线程处理,虽然tomcat会使用线程池技术提升吞吐量,但是针对一个请求是由一个tomcat线程进行处理,如果一个请求响应很慢,线程就会无法释放,一直被占用;如果出现大量响应耗时的请求,导致无可用线程处理请求,导致整个服务不可用。

1.2 Servlet3 异步请求模型

WechatIMG14

此异步并非正在的异步,该模型中,tomcat线程仅仅需要处理请求解析的过程,真正的业务处理由业务线程进行处理,这样一来tomcat可以处理更多的请求。

  1. 浏览器发起请求至Servlet容器
  2. 请求解析成HttpServletRequest分发到具体的Servlet处理,同时业务由业务线程进行处理,tomcat线程立即释放
  3. 业务线程处理结束,将执行结果转交给tomcat线程,通过HttpServletResponse将结果响应给浏览器

二、Spring DeferredResult 使用

2.1 基础代码

使用Springboot构建,引入web依赖即可。

2.2 测试代码
2.2.1 超时响应
@RestController
public class DemoController {

private ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10, 60, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(1024));

@GetMapping("/test")
public DeferredResult<String> test() {
// 传入timeout时间和默认值
DeferredResult<String> result = new DeferredResult<>(2000L, "failover");

// 各种时间函数
result.onTimeout(() -> System.out.println("timeout"));
result.onCompletion(() -> System.out.println("complete"));
result.onError(throwable -> System.out.println("error"));

executor.execute(() -> {
try {
// 模拟业务处理
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
return result;
}

}

访问 http://localhost:8080/test 可以发现2s后返回 failover。

2.2.2 正常响应
@GetMapping("/test")
public DeferredResult<String> test() {
DeferredResult<String> result = new DeferredResult<>(2000L, "failover");

result.onTimeout(() -> System.out.println("timeout"));
result.onCompletion(() -> System.out.println("complete"));
result.onError(throwable -> System.out.println("error"));

executor.execute(() -> {
try {
result.setResult("hello world");
} catch (InterruptedException e) {
e.printStackTrace();
}
});
return result;
}

访问 http://localhost:8080/test 可以发现返回 hello world

2.2.3 普通写法
@GetMapping("/servlet/test")
public void test(HttpServletRequest request) {
// 这个会开启异步模式
AsyncContext asyncContext = request.startAsync();
asyncContext.setTimeout(2000L);
executor.execute(() -> {
try {
Thread.sleep(1000);
System.out.println("demo");
asyncContext.getResponse().getWriter().println("hello world");
} catch (Exception e) {
e.printStackTrace();
} finally {
asyncContext.complete();
}
});
}

三、原理

3.1 异步过程(来自官方文档)
  • The controller returns a DeferredResult and saves it in some in-memory queue or list where it can be accessed.
  • Spring MVC calls request.startAsync().
  • Meanwhile, the DispatcherServlet and all configured filters exit the request processing thread, but the response remains open.
  • The application sets the DeferredResult from some thread, and Spring MVC dispatches the request back to the Servlet container.
  • The DispatcherServlet is invoked again, and processing resumes with the asynchronously produced return value.
3.2 核心代码

debug源码会发现,上面test方法执行结束后,会对返回值进行处理

// ServletInvocableHandlerMethod.invokeAndHandle
public void invokeAndHandle(ServletWebRequest webRequest, ModelAndViewContainer mavContainer,
Object... providedArgs) throws Exception {
// 获取返回值
Object returnValue = invokeForRequest(webRequest, mavContainer, providedArgs);
setResponseStatus(webRequest);
// .....
try {
// 对返回值进行处理 DeferredResultMethodReturnValueHandler
this.returnValueHandlers.handleReturnValue(
returnValue, getReturnValueType(returnValue), mavContainer, webRequest);
}
catch (Exception ex) {
if (logger.isTraceEnabled()) {
logger.trace(formatErrorForReturnValue(returnValue), ex);
}
throw ex;
}
}

利用 DeferredResultMethodReturnValueHandler 对返回值DeferredResult进行处理

public void handleReturnValue(@Nullable Object returnValue, MethodParameter returnType,
ModelAndViewContainer mavContainer, NativeWebRequest webRequest) throws Exception {
if (returnValue == null) {
mavContainer.setRequestHandled(true);
return;
}
DeferredResult<?> result;
if (returnValue instanceof DeferredResult) {
result = (DeferredResult<?>) returnValue;
}
// ....
WebAsyncUtils.getAsyncManager(webRequest).startDeferredResultProcessing(result, mavContainer);
}

WebAsyncManager 对各种拦截器进行处理,并返回结果。

WebAsyncManager.startCallableProcessing 中会有对应 timeout、error、complete handler 拦截器。

四、参考

https://docs.spring.io/spring-framework/docs/current/reference/html/web.html

https://www.infoworld.com/article/2077995/java-concurrency-asynchronous-processing-support-in-servlet-3-0.html