大家都知道,Callable和DeferredResult可以用来进行异步请求处理。利用它们,我们可以异步生成返回值,在具体处理的过程中,我们直接在controller中返回相应的Callable或者DeferredResult,在这之后,servlet线程将被释放,可用于其他连接;DeferredResult另外会有线程来进行结果处理,并setResult。
网站设计制作、网站设计介绍好的网站是理念、设计和技术的结合。创新互联公司拥有的网站设计理念、多方位的设计风格、经验丰富的设计团队。提供PC端+手机端网站建设,用营销思维进行网站设计、采用先进技术开源代码、注重用户体验与SEO基础,将技术与创意整合到网站之中,以契合客户的方式做到创意性的视觉化效果。
在正式开始之前,我们先做一点准备工作,在项目中新建了一个base模块。其中包含一些提供基础支持的java类,在其他模块中可能会用到。
我们定义了一个ResponseMsg的实体类来作为我们的返回值类型:
@Data
@NoArgsConstructor
@AllArgsConstructor
public class ResponseMsg{
private int code;
private String msg;
private T data;
}
非常简单,里面包含了code、msg和data三个字段,其中data为泛型类型。另外类的注解Data、NoArgsConstructor和AllArgsConstructor都是lombok提供的简化我们开发的,主要功能分别是,为我们的类生成set和get方法,生成无参构造器和生成全参构造器。使用idea进行开发的童鞋可以装一下lombok的支持插件。另外,lombok的依赖参见:
org.projectlombok
lombok-maven
1.16.16.0
pom
我们建立了一个TaskService,用来为阻塞调用和Callable调用提供实际结果处理的。代码如下:
@Service
public class TaskService {
private static final Logger log = LoggerFactory.getLogger(TaskService.class);
public ResponseMsggetResult(){
log.info("任务开始执行,持续等待中...");
try {
Thread.sleep(30000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("任务处理完成");
return new ResponseMsg(0,"操作成功","success");
}
}
可以看到,里面实际提供服务的是getResult方法,这边直接返回一个new ResponseMsg(0,“操作成功”,“success”)。但是其中又特意让它sleep了30秒,模拟一个耗时较长的请求。
平时我们用的最普遍的还是阻塞调用,通常请求的处理时间较短,在并发量较小的情况下,使用阻塞调用问题也不是很大。关注公众号:码猿技术专栏,回复关键词:1111 获取阿里内部Java性能调优手册!阻塞调用实现非常简单,我们首先新建一个模块blockingtype,里面只包含一个controller类,用来接收请求并利用TaskService来获取结果。
@RestController
public class BlockController {
private static final Logger log = LoggerFactory.getLogger(BlockController.class);
@Autowired
private TaskService taskService;
@RequestMapping(value = "/get", method = RequestMethod.GET)
public ResponseMsggetResult(){
log.info("接收请求,开始处理...");
ResponseMsgresult = taskService.getResult();
log.info("接收任务线程完成并退出");
return result;
}
}
我们请求的是getResult方法,其中调用了taskService,这个taskService我们是注入得到的。关于怎么跨模块注入的,其实也非常简单,在本模块,加入对其他模块的依赖就可以了。比如这里我们在blockingtype的pom.xml文件中加入对base模块的依赖:
com.sunny
base
1.0-SNAPSHOT
然后我们看一下实际调用效果,这里我们设置端口号为8080,启动日志如下:
2018-06-24 19:02:48.514 INFO 11207 --- [ main] com.sunny.BlockApplication : Starting BlockApplication on xdeMacBook-Pro.local with PID 11207 (/Users/zsunny/IdeaProjects/asynchronoustask/blockingtype/target/classes started by zsunny in /Users/zsunny/IdeaProjects/asynchronoustask)
2018-06-24 19:02:48.519 INFO 11207 --- [ main] com.sunny.BlockApplication : No active profile set, falling back to default profiles: default
2018-06-24 19:02:48.762 INFO 11207 --- [ main] ationConfigEmbeddedWebApplicationContext : Refreshing org.springframework.boot.context.embedded.AnnotationConfigEmbeddedWebApplicationContext@4445629: startup date [Sun Jun 24 19:02:48 CST 2018]; root of context hierarchy
2018-06-24 19:02:50.756 INFO 11207 --- [ main] s.b.c.e.t.TomcatEmbeddedServletContainer : Tomcat initialized with port(s): 8080 (http)
2018-06-24 19:02:50.778 INFO 11207 --- [ main] o.apache.catalina.core.StandardService : Starting service [Tomcat]
2018-06-24 19:02:50.780 INFO 11207 --- [ main] org.apache.catalina.core.StandardEngine : Starting Servlet Engine: Apache Tomcat/8.5.23
2018-06-24 19:02:50.922 INFO 11207 --- [ost-startStop-1] o.a.c.c.C.[Tomcat].[localhost].[/] : Initializing Spring embedded WebApplicationContext
2018-06-24 19:02:50.922 INFO 11207 --- [ost-startStop-1] o.s.web.context.ContextLoader : Root WebApplicationContext: initialization completed in 2200 ms
2018-06-24 19:02:51.156 INFO 11207 --- [ost-startStop-1] o.s.b.w.servlet.ServletRegistrationBean : Mapping servlet: 'dispatcherServlet' to [/]
2018-06-24 19:02:51.162 INFO 11207 --- [ost-startStop-1] o.s.b.w.servlet.FilterRegistrationBean : Mapping filter: 'characterEncodingFilter' to: [/*]
2018-06-24 19:02:51.163 INFO 11207 --- [ost-startStop-1] o.s.b.w.servlet.FilterRegistrationBean : Mapping filter: 'hiddenHttpMethodFilter' to: [/*]
2018-06-24 19:02:51.163 INFO 11207 --- [ost-startStop-1] o.s.b.w.servlet.FilterRegistrationBean : Mapping filter: 'httpPutFormContentFilter' to: [/*]
2018-06-24 19:02:51.163 INFO 11207 --- [ost-startStop-1] o.s.b.w.servlet.FilterRegistrationBean : Mapping filter: 'requestContextFilter' to: [/*]
2018-06-24 19:02:51.620 INFO 11207 --- [ main] s.w.s.m.m.a.RequestMappingHandlerAdapter : Looking for @ControllerAdvice: org.springframework.boot.context.embedded.AnnotationConfigEmbeddedWebApplicationContext@4445629: startup date [Sun Jun 24 19:02:48 CST 2018]; root of context hierarchy
2018-06-24 19:02:51.724 INFO 11207 --- [ main] s.w.s.m.m.a.RequestMappingHandlerMapping : Mapped "{[/get],methods=[GET]}" onto public com.sunny.entity.ResponseMsgcom.sunny.controller.BlockController.getResult()
2018-06-24 19:02:51.730 INFO 11207 --- [ main] s.w.s.m.m.a.RequestMappingHandlerMapping : Mapped "{[/error]}" onto public org.springframework.http.ResponseEntity> org.springframework.boot.autoconfigure.web.BasicErrorController.error(javax.servlet.http.HttpServletRequest)
2018-06-24 19:02:51.731 INFO 11207 --- [ main] s.w.s.m.m.a.RequestMappingHandlerMapping : Mapped "{[/error],produces=[text/html]}" onto public org.springframework.web.servlet.ModelAndView org.springframework.boot.autoconfigure.web.BasicErrorController.errorHtml(javax.servlet.http.HttpServletRequest,javax.servlet.http.HttpServletResponse)
2018-06-24 19:02:51.780 INFO 11207 --- [ main] o.s.w.s.handler.SimpleUrlHandlerMapping : Mapped URL path [/webjars/**] onto handler of type [class org.springframework.web.servlet.resource.ResourceHttpRequestHandler]
2018-06-24 19:02:51.780 INFO 11207 --- [ main] o.s.w.s.handler.SimpleUrlHandlerMapping : Mapped URL path [/**] onto handler of type [class org.springframework.web.servlet.resource.ResourceHttpRequestHandler]
2018-06-24 19:02:51.838 INFO 11207 --- [ main] o.s.w.s.handler.SimpleUrlHandlerMapping : Mapped URL path [/**/favicon.ico] onto handler of type [class org.springframework.web.servlet.resource.ResourceHttpRequestHandler]
2018-06-24 19:02:52.126 INFO 11207 --- [ main] o.s.j.e.a.AnnotationMBeanExporter : Registering beans for JMX exposure on startup
2018-06-24 19:02:52.205 INFO 11207 --- [ main] s.b.c.e.t.TomcatEmbeddedServletContainer : Tomcat started on port(s): 8080 (http)
2018-06-24 19:02:52.211 INFO 11207 --- [ main] com.sunny.BlockApplication : Started BlockApplication in 5.049 seconds (JVM running for 6.118)
可以看到顺利启动了,那么我们就来访问一下:
http://localhost:8080/get
等待了大概30秒左右,得到json数据:
{"code":0,"msg":"操作成功","data":"success"}
然后我们来看看控制台的日志:
2018-06-24 19:04:07.315 INFO 11207 --- [nio-8080-exec-1] com.sunny.controller.BlockController : 接收请求,开始处理...
2018-06-24 19:04:07.316 INFO 11207 --- [nio-8080-exec-1] com.sunny.service.TaskService : 任务开始执行,持续等待中...
2018-06-24 19:04:37.322 INFO 11207 --- [nio-8080-exec-1] com.sunny.service.TaskService : 任务处理完成
2018-06-24 19:04:37.322 INFO 11207 --- [nio-8080-exec-1] com.sunny.controller.BlockController : 接收任务线程完成并退出
可以看到在“ResponseMsg result = taskService.getResult();”的时候是阻塞了大约30秒钟,随后才执行它后面的打印语句“log.info(“接收任务线程完成并退出”);”。
涉及到较长时间的请求处理的话,比较好的方式是用异步调用,比如利用Callable返回结果。异步主要表现在,接收请求的servlet可以不用持续等待结果产生,而可以被释放去处理其他事情。当然,在调用者来看的话,其实还是表现在持续等待30秒。这有利于服务端提供更大的并发处理量。这里我们新建一个callabledemo模块,在这个模块中,我们一样只包含一个TaskController,另外也是需要加入base模块的依赖。只不过这里我们的返回值不是ResponseMsg类型了,而是一个Callable类型。
@RestController
public class TaskController {
private static final Logger log = LoggerFactory.getLogger(TaskController.class);
@Autowired
private TaskService taskService;
@RequestMapping(value = "/get",method = RequestMethod.GET)
public Callable> getResult(){
log.info("接收请求,开始处理...");
Callable> result = (()->{
return taskService.getResult();
});
log.info("接收任务线程完成并退出");
return result;
}
}
在里面,我们创建了一个Callable类型的变量result,并实现了它的call方法,在call方法中,我们也是调用taskService的getResult方法得到返回值并返回。下一步我们就运行一下这个模块,这里我们在模块的application.yml中设置端口号为8081:
server:
port: 8081
启动,可以看到控制台的消息:
2018-06-24 19:38:14.658 INFO 11226 --- [ main] com.sunny.CallableApplication : Starting CallableApplication on xdeMacBook-Pro.local with PID 11226 (/Users/zsunny/IdeaProjects/asynchronoustask/callabledemo/target/classes started by zsunny in /Users/zsunny/IdeaProjects/asynchronoustask)
2018-06-24 19:38:14.672 INFO 11226 --- [ main] com.sunny.CallableApplication : No active profile set, falling back to default profiles: default
2018-06-24 19:38:14.798 INFO 11226 --- [ main] ationConfigEmbeddedWebApplicationContext : Refreshing org.springframework.boot.context.embedded.AnnotationConfigEmbeddedWebApplicationContext@4445629: startup date [Sun Jun 24 19:38:14 CST 2018]; root of context hierarchy
2018-06-24 19:38:16.741 INFO 11226 --- [ main] s.b.c.e.t.TomcatEmbeddedServletContainer : Tomcat initialized with port(s): 8081 (http)
2018-06-24 19:38:16.762 INFO 11226 --- [ main] o.apache.catalina.core.StandardService : Starting service [Tomcat]
2018-06-24 19:38:16.764 INFO 11226 --- [ main] org.apache.catalina.core.StandardEngine : Starting Servlet Engine: Apache Tomcat/8.5.23
2018-06-24 19:38:16.918 INFO 11226 --- [ost-startStop-1] o.a.c.c.C.[Tomcat].[localhost].[/] : Initializing Spring embedded WebApplicationContext
2018-06-24 19:38:16.919 INFO 11226 --- [ost-startStop-1] o.s.web.context.ContextLoader : Root WebApplicationContext: initialization completed in 2126 ms
2018-06-24 19:38:17.144 INFO 11226 --- [ost-startStop-1] o.s.b.w.servlet.ServletRegistrationBean : Mapping servlet: 'dispatcherServlet' to [/]
2018-06-24 19:38:17.149 INFO 11226 --- [ost-startStop-1] o.s.b.w.servlet.FilterRegistrationBean : Mapping filter: 'characterEncodingFilter' to: [/*]
2018-06-24 19:38:17.150 INFO 11226 --- [ost-startStop-1] o.s.b.w.servlet.FilterRegistrationBean : Mapping filter: 'hiddenHttpMethodFilter' to: [/*]
2018-06-24 19:38:17.150 INFO 11226 --- [ost-startStop-1] o.s.b.w.servlet.FilterRegistrationBean : Mapping filter: 'httpPutFormContentFilter' to: [/*]
2018-06-24 19:38:17.150 INFO 11226 --- [ost-startStop-1] o.s.b.w.servlet.FilterRegistrationBean : Mapping filter: 'requestContextFilter' to: [/*]
2018-06-24 19:38:17.632 INFO 11226 --- [ main] s.w.s.m.m.a.RequestMappingHandlerAdapter : Looking for @ControllerAdvice: org.springframework.boot.context.embedded.AnnotationConfigEmbeddedWebApplicationContext@4445629: startup date [Sun Jun 24 19:38:14 CST 2018]; root of context hierarchy
2018-06-24 19:38:17.726 INFO 11226 --- [ main] s.w.s.m.m.a.RequestMappingHandlerMapping : Mapped "{[/get],methods=[GET]}" onto public java.util.concurrent.Callable> com.sunny.controller.TaskController.getResult()
2018-06-24 19:38:17.731 INFO 11226 --- [ main] s.w.s.m.m.a.RequestMappingHandlerMapping : Mapped "{[/error]}" onto public org.springframework.http.ResponseEntity> org.springframework.boot.autoconfigure.web.BasicErrorController.error(javax.servlet.http.HttpServletRequest)
2018-06-24 19:38:17.733 INFO 11226 --- [ main] s.w.s.m.m.a.RequestMappingHandlerMapping : Mapped "{[/error],produces=[text/html]}" onto public org.springframework.web.servlet.ModelAndView org.springframework.boot.autoconfigure.web.BasicErrorController.errorHtml(javax.servlet.http.HttpServletRequest,javax.servlet.http.HttpServletResponse)
2018-06-24 19:38:17.777 INFO 11226 --- [ main] o.s.w.s.handler.SimpleUrlHandlerMapping : Mapped URL path [/webjars/**] onto handler of type [class org.springframework.web.servlet.resource.ResourceHttpRequestHandler]
2018-06-24 19:38:17.777 INFO 11226 --- [ main] o.s.w.s.handler.SimpleUrlHandlerMapping : Mapped URL path [/**] onto handler of type [class org.springframework.web.servlet.resource.ResourceHttpRequestHandler]
2018-06-24 19:38:17.825 INFO 11226 --- [ main] o.s.w.s.handler.SimpleUrlHandlerMapping : Mapped URL path [/**/favicon.ico] onto handler of type [class org.springframework.web.servlet.resource.ResourceHttpRequestHandler]
2018-06-24 19:38:18.084 INFO 11226 --- [ main] o.s.j.e.a.AnnotationMBeanExporter : Registering beans for JMX exposure on startup
2018-06-24 19:38:18.176 INFO 11226 --- [ main] s.b.c.e.t.TomcatEmbeddedServletContainer : Tomcat started on port(s): 8081 (http)
2018-06-24 19:38:18.183 INFO 11226 --- [ main] com.sunny.CallableApplication : Started CallableApplication in 4.538 seconds (JVM running for 5.327)
完美启动了,然后我们还是一样,访问一下:
http://localhost:8081/get
在大约等待了30秒左右,我们在浏览器上得到json数据:
{"code":0,"msg":"操作成功","data":"success"}
和阻塞调用的结果一样——当然一样啦,都是同taskService中得到的结果。然后我们看看控制台的消息:
2018-06-24 19:39:07.738 INFO 11226 --- [nio-8081-exec-1] com.sunny.controller.TaskController : 接收请求,开始处理...
2018-06-24 19:39:07.740 INFO 11226 --- [nio-8081-exec-1] com.sunny.controller.TaskController : 接收任务线程完成并退出
2018-06-24 19:39:07.753 INFO 11226 --- [ MvcAsync1] com.sunny.service.TaskService : 任务开始执行,持续等待中...
2018-06-24 19:39:37.756 INFO 11226 --- [ MvcAsync1] com.sunny.service.TaskService : 任务处理完成
很显然,这里的消息出现的顺序和阻塞模式有所不同了,这里在“接收请求,开始处理…”之后直接打印了“接收任务线程完成并退出”。而不是先出现“任务处理完成”后再出现“接收任务线程完成并退出”。这就说明,这里没有阻塞在从taskService中获得数据的地方,controller中直接执行后面的部分(这里可以做其他很多事,不仅仅是打印日志)。
前面铺垫了那么多,还是主要来说DeferredResult的;和Callable一样,DeferredResult也是为了支持异步调用。两者的主要差异,Sunny觉得主要在DeferredResult需要自己用线程来处理结果setResult,而Callable的话不需要我们来维护一个结果处理线程。总体来说,Callable的话更为简单,同样的也是因为简单,灵活性不够;相对地,DeferredResult更为复杂一些,但是又极大的灵活性。在可以用Callable的时候,直接用Callable;而遇到Callable没法解决的场景的时候,可以尝试使用DeferredResult。这里Sunny将会设计两个DeferredResult使用场景。
/**
* 任务实体类
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
public class Task {
private int taskId;
private DeferredResult> taskResult;
@Override
public String toString() {
return "Task{" +
"taskId=" + taskId +
", taskResult" + "{respnotallow=" + taskResult.getResult() + "}" +
'}';
}
}
看起来非常简单,成员变量又taskId和taskResult,前者是int类型,后者为我们的DeferredResult类型,它的泛型类型为ResponseMsg,注意这里用到ResponseMsg,所以也需要导入base模块的依赖。另外注解之前已经说明了,不过这里再提一句,@Data注解也包含了toString的重写,但是这里为了知道具体的ResponseMsg的内容,Sunny特意手动重写。看完Task类型,我们再来看看任务队列。
@Component
public class TaskQueue {
private static final Logger log = LoggerFactory.getLogger(TaskQueue.class);
private static final int QUEUE_LENGTH = 10;
private BlockingQueuequeue = new LinkedBlockingDeque<>(QUEUE_LENGTH);
private int taskId = 0;
/**
* 加入任务
* @param deferredResult
*/
public void put(DeferredResult> deferredResult){
taskId++;
log.info("任务加入队列,id为:{}",taskId);
queue.offer(new Task(taskId,deferredResult));
}
/**
* 获取任务
* @return
* @throws InterruptedException
*/
public Task take() throws InterruptedException {
Task task = queue.poll();
log.info("获得任务:{}",task);
return task;
}
}
这里我们将它作为一个bean,之后会在其他bean中注入,这里实际的队列为成员变量queue,它是LinkedBlockingDeque类型的。还有一个成员变量为taskId,是用于自动生成任务id的,并且在加入任务的方法中实现自增,以确保每个任务的id唯一性。方法的话又put和take方法,分别用于向队列中添加任务和取出任务;其中,对queue的操作,分别用了offer和poll,这样是实现一个非阻塞的操作,并且在队列为空和队列已满的情况下不会抛出异常。另外,大家实现的时候,可以考虑使用ConcurrentLinkedQueue来高效处理并发,因为它属于无界非阻塞队列,使用过程中需要考虑可能造成的OOM问题。Sunny这里选择阻塞队列LinkedBlockingDeque,它底层使用加锁进行了同步;但是这里使用了TaskQueue进行封装,处理过程中有一些额外操作,调用时需要加锁以防发生某些意料之外的问题。
然后我们来看步骤1中的,启动一个持续从任务队列中获取任务的线程的具体实现。
@Component
public class TaskExecute {
private static final Logger log = LoggerFactory.getLogger(TaskExecute.class);
private static final Random random = new Random();
//默认随机结果的长度
private static final int DEFAULT_STR_LEN = 10;
//用于生成随机结果
private static final String str = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789";
@Autowired
private TaskQueue taskQueue;
/**
* 初始化启动
*/
@PostConstruct
public void init(){
log.info("开始持续处理任务");
new Thread(this::execute).start();
}
/**
* 持续处理
* 返回执行结果
*/
private void execute(){
while (true){
try {
//取出任务
Task task;
synchronized (taskQueue) {
task = taskQueue.take();
}
if(task != null) {
//设置返回结果
String randomStr = getRandomStr(DEFAULT_STR_LEN);
ResponseMsgresponseMsg = new ResponseMsg (0, "success", randomStr);
log.info("返回结果:{}", responseMsg);
task.getTaskResult().setResult(responseMsg);
}
int time = random.nextInt(10);
log.info("处理间隔:{}秒",time);
Thread.sleep(time*1000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
/**
* 获取长度为len的随机串
* @param len
* @return
*/
private String getRandomStr(int len){
int maxInd = str.length();
StringBuilder sb = new StringBuilder();
int ind;
for(int i=0;iind = random.nextInt(maxInd);
sb.append(str.charAt(ind));
}
return String.valueOf(sb);
}
}
这里,我们注入了TaskQueue,成员变量比较简单并且有注释,不再说明,主要来看方法。先看一下最后一个方法getRandomStr,很显然,这是一个获得长度为len的随机串的方法,访问限定为private,为类中其他方法服务的。然后我们看init方法,它执行的其实就是开启了一个线程并且执行execute方法,注意一下它上面的@PostContruct注解,这个注解就是在这个bean初始化的时候就执行这个方法。所以我们需要关注的实际逻辑在execute方法中。可以看到,在execute方法中,用了一个while(true)来保证线程持续运行。因为是并发环境下,考虑对taskQueue加锁,从中取出任务;如果任务不为空,获取用getRandomStr生成一个随机结果并用setResult方法进行返回。最后可以看到,利用random生成来一个[0,10)的随机数,并让线程sleep相应的秒数。这里注意一下,需要设定一个时间间隔,否则,先线程持续跑会出现CPU负载过高的情况。接下来我们就看看controller是如何处理的。
@RestController
public class TaskController {
private static final Logger log = LoggerFactory.getLogger(TaskController.class);
//超时结果
private static final ResponseMsgOUT_OF_TIME_RESULT = new ResponseMsg<>(-1,"超时","out of time");
//超时时间
private static final long OUT_OF_TIME = 3000L;
@Autowired
private TaskQueue taskQueue;
@RequestMapping(value = "/get",method = RequestMethod.GET)
public DeferredResult> getResult() {
log.info("接收请求,开始处理...");
//建立DeferredResult对象,设置超时时间,以及超时返回超时结果
DeferredResult> result = new DeferredResult<>(OUT_OF_TIME, OUT_OF_TIME_RESULT);
result.onTimeout(() -> {
log.info("调用超时");
});
result.onCompletion(() -> {
分享名称:提高系统吞吐量,DeferredResult到底有多强?
当前网址:http://www.csdahua.cn/qtweb/news33/259883.html网站建设、网络推广公司-快上网,是专注品牌与效果的网站制作,网络营销seo公司;服务项目有等
声明:本网站发布的内容(图片、视频和文字)以用户投稿、用户转载内容为主,如果涉及侵权请尽快告知,我们将会在第一时间删除。文章观点不代表本网站立场,如需处理请联系客服。电话:028-86922220;邮箱:631063699@qq.com。内容未经允许不得转载,或转载时需注明来源: 快上网