使用@Async異步調用方法
Async簡介
異步方法調用使用場景:處理日志、發送郵件、短信......
spring中提供了@Async來實現異步方法。
@Async修飾類,則該類所有方法都是異步的,@Async修飾方法,則該方法是異步的。
被修飾的方法在被調用時,會在一個新的線程中執行。
Spring中通過在方法上設置@Async注解,可使得方法被異步調用。也就是該方法會在調用時立即返回,而這個方法的實際執行交
給Spring的TaskExecutor去完成
1. 如果此時線程池中的數量小于corePoolSize,即使線程池中的線程都處于空閑狀態,也要創建新的線程來處理被添加的任務。
2. 如果此時線程池中的數量等于corePoolSize,但是緩沖隊列 workQueue未滿,那么任務被放入緩沖隊列。
3. 如果此時線程池中的數量大于corePoolSize,緩沖隊列workQueue滿,并且線程池中的數量小于maxPoolSize,建新的線程來處理被添加的任務。
4. 如果此時線程池中的數量大于corePoolSize,緩沖隊列workQueue滿,并且線程池中的數量等于maxPoolSize,那么通過handler所指定的策略來處理此任務。也就是:處理任務的優先級為:核心線程corePoolSize、任務隊列workQueue、最大線程 maximumPoolSize,如果三者都滿了,使用handler處理被拒絕的任務。
5. 當線程池中的線程數量大于corePoolSize時,如果某線程空閑時間超過keepAliveTime,線程將被終止。這樣,線程池可以動態的調整池中的線程數。
本次記錄Async使用場景
需要調用其他服務,并且主線程需要繼續完成當前線程任務
第一步:需要去做事的類
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
@Component @EnableScheduling public class VideoStatusUpdateServiceImpl implements VideoStatusUpdateService { @Resource private VaCaseVideoExtMapper vaCaseVideoExtMapper; //每隔五秒 @Scheduled (cron = "*/5 * * * * ? " ) @Override public void videoStatusUpdate() throws IOException { //得到一個集合 List<VaCaseVideo> list = vaCaseVideoExtMapper.selectAllVideoes(); //遍歷集合去創建異步線程,去做一些其他事情 for (VaCaseVideo vo : list) { dealTask(vo); } } @Async ( "asyncServiceExecutor" ) public void dealTask(VaCaseVideo vo) throws IOException { System.out.print( "這里在做某件事情" ) } } |
第二步:啟動類上加上注解@EnableAsync,開啟異步
1
2
3
4
5
6
7
8
|
@SpringBootApplication @EnableAsync @EnableCaching public class StartApp { public static void main(String[] args) { SpringApplication.run(StartApp. class , args); } } |
第三步:配置Executor(此步驟可有可無,若不配值則會使用默認值),配置自定義Executor
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
@Configuration public class ExecutorConfig { private static final Logger logger = LoggerFactory.getLogger(ExecutorConfig. class ); @Bean public Executor asyncServiceExecutor() { logger.info( "start asyncServiceExecutor" ); ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); //配置核心線程數 executor.setCorePoolSize( 5 ); //配置最大線程數 executor.setMaxPoolSize( 60 ); executor.setKeepAliveSeconds( 180 ); //配置隊列大小 executor.setQueueCapacity( 60 ); //配置線程池中的線程的名稱前綴 executor.setThreadNamePrefix( "async-service-" ); executor.setRejectedExecutionHandler( new ThreadPoolExecutor.CallerRunsPolicy()); //執行初始化 executor.initialize(); return executor; } } |
第四步:啟動項目,會每隔五秒打印需要做的事情
異步請求與異步調用的區別
兩者的使用場景不同,異步請求用來解決并發請求對服務器造成的壓力,從而提高對請求的吞吐量;而異步調用是用來做一些非主線流程且不需要實時計算和響應的任務,比如同步日志到kafka中做日志分析等。
異步請求是會一直等待response相應的,需要返回結果給客戶端的;而異步調用我們往往會馬上返回給客戶端響應,完成這次整個的請求,至于異步調用的任務后臺自己慢慢跑就行,客戶端不會關心。
異步請求的實現
方式一:Servlet方式實現異步請求
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
|
@RequestMapping (value = "/email/servletReq" , method = GET) public void servletReq (HttpServletRequest request, HttpServletResponse response) { AsyncContext asyncContext = request.startAsync(); //設置監聽器:可設置其開始、完成、異常、超時等事件的回調處理 asyncContext.addListener( new AsyncListener() { @Override public void onTimeout(AsyncEvent event) throws IOException { System.out.println( "超時了..." ); //做一些超時后的相關操作... } @Override public void onStartAsync(AsyncEvent event) throws IOException { System.out.println( "線程開始" ); } @Override public void onError(AsyncEvent event) throws IOException { System.out.println( "發生錯誤:" +event.getThrowable()); } @Override public void onComplete(AsyncEvent event) throws IOException { System.out.println( "執行完成" ); //這里可以做一些清理資源的操作... } }); //設置超時時間 asyncContext.setTimeout( 20000 ); asyncContext.start( new Runnable() { @Override public void run() { try { Thread.sleep( 10000 ); System.out.println( "內部線程:" + Thread.currentThread().getName()); asyncContext.getResponse().setCharacterEncoding( "utf-8" ); asyncContext.getResponse().setContentType( "text/html;charset=UTF-8" ); asyncContext.getResponse().getWriter().println( "這是異步的請求返回" ); } catch (Exception e) { System.out.println( "異常:" +e); } //異步請求完成通知 //此時整個請求才完成 asyncContext.complete(); } }); //此時之類 request的線程連接已經釋放了 System.out.println( "主線程:" + Thread.currentThread().getName()); } |
方式二:使用很簡單,直接返回的參數包裹一層callable即可,可以繼承WebMvcConfigurerAdapter類來設置默認線程池和超時處理
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
|
@RequestMapping (value = "/email/callableReq" , method = GET) @ResponseBody public Callable<String> callableReq () { System.out.println( "外部線程:" + Thread.currentThread().getName()); return new Callable<String>() { @Override public String call() throws Exception { Thread.sleep( 10000 ); System.out.println( "內部線程:" + Thread.currentThread().getName()); return "callable!" ; } }; } @Configuration public class RequestAsyncPoolConfig extends WebMvcConfigurerAdapter { @Resource private ThreadPoolTaskExecutor myThreadPoolTaskExecutor; @Override public void configureAsyncSupport( final AsyncSupportConfigurer configurer) { //處理 callable超時 configurer.setDefaultTimeout( 60 * 1000 ); configurer.setTaskExecutor(myThreadPoolTaskExecutor); configurer.registerCallableInterceptors(timeoutCallableProcessingInterceptor()); } @Bean public TimeoutCallableProcessingInterceptor timeoutCallableProcessingInterceptor() { return new TimeoutCallableProcessingInterceptor(); } } |
方式三:和方式二差不多,在Callable外包一層,給WebAsyncTask設置一個超時回調,即可實現超時處理
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
|
@RequestMapping (value = "/email/webAsyncReq" , method = GET) @ResponseBody public WebAsyncTask<String> webAsyncReq () { System.out.println( "外部線程:" + Thread.currentThread().getName()); Callable<String> result = () -> { System.out.println( "內部線程開始:" + Thread.currentThread().getName()); try { TimeUnit.SECONDS.sleep( 4 ); } catch (Exception e) { // TODO: handle exception } logger.info( "副線程返回" ); System.out.println( "內部線程返回:" + Thread.currentThread().getName()); return "success" ; }; WebAsyncTask<String> wat = new WebAsyncTask<String>(3000L, result); wat.onTimeout( new Callable<String>() { @Override public String call() throws Exception { // TODO Auto-generated method stub return "超時" ; } }); return wat; } |
方式四:DeferredResult可以處理一些相對復雜一些的業務邏輯,最主要還是可以在另一個線程里面進行業務處理及返回,即可在兩個完全不相干的線程間的通信。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
|
@RequestMapping (value = "/email/deferredResultReq" , method = GET) @ResponseBody public DeferredResult<String> deferredResultReq () { System.out.println( "外部線程:" + Thread.currentThread().getName()); //設置超時時間 DeferredResult<String> result = new DeferredResult<String>( 60 *1000L); //處理超時事件 采用委托機制 result.onTimeout( new Runnable() { @Override public void run() { System.out.println( "DeferredResult超時" ); result.setResult( "超時了!" ); } }); result.onCompletion( new Runnable() { @Override public void run() { //完成后 System.out.println( "調用完成" ); } }); myThreadPoolTaskExecutor.execute( new Runnable() { @Override public void run() { //處理業務邏輯 System.out.println( "內部線程:" + Thread.currentThread().getName()); //返回結果 result.setResult( "DeferredResult!!" ); } }); return result; } |
SpringBoot中異步調用的使用
1、介紹
異步請求的處理。除了異步請求,一般上我們用的比較多的應該是異步調用。通常在開發過程中,會遇到一個方法是和實際業務無關的,沒有緊密性的。比如記錄日志信息等業務。這個時候正常就是啟一個新線程去做一些業務處理,讓主線程異步的執行其他業務。
2、使用方式(基于spring下)
需要在啟動類加入@EnableAsync使異步調用@Async注解生效
在需要異步執行的方法上加入此注解即可@Async("threadPool"),threadPool為自定義線程池
代碼略。。。就倆標簽,自己試一把就可以了
3、注意事項
在默認情況下,未設置TaskExecutor時,默認是使用SimpleAsyncTaskExecutor這個線程池,但此線程不是真正意義上的線程池,因為線程不重用,每次調用都會創建一個新的線程。可通過控制臺日志輸出可以看出,每次輸出線程名都是遞增的。所以最好我們來自定義一個線程池。
調用的異步方法,不能為同一個類的方法(包括同一個類的內部類),簡單來說,因為Spring在啟動掃描時會為其創建一個代理類,而同類調用時,還是調用本身的代理類的,所以和平常調用是一樣的。
其他的注解如@Cache等也是一樣的道理,說白了,就是Spring的代理機制造成的。所以在開發中,最好把異步服務單獨抽出一個類來管理。下面會重點講述。
4、什么情況下會導致@Async異步方法會失效?
- a.調用同一個類下注有@Async異步方法:在spring中像@Async和@Transactional、cache等注解本質使用的是動態代理,其實Spring容器在初始化的時候Spring容器會將含有AOP注解的類對象“替換”為代理對象(簡單這么理解),那么注解失效的原因就很明顯了,就是因為調用方法的是對象本身而不是代理對象,因為沒有經過Spring容器,那么解決方法也會沿著這個思路來解決。
- b.調用的是靜態(static )方法
- c.調用(private)私有化方法
5、解決4中問題1的方式(其它2,3兩個問題自己注意下就可以了)
將要異步執行的方法單獨抽取成一個類,原理就是當你把執行異步的方法單獨抽取成一個類的時候,這個類肯定是被Spring管理的,其他Spring組件需要調用的時候肯定會注入進去,這時候實際上注入進去的就是代理類了。
其實我們的注入對象都是從Spring容器中給當前Spring組件進行成員變量的賦值,由于某些類使用了AOP注解,那么實際上在Spring容器中實際存在的是它的代理對象。那么我們就可以通過上下文獲取自己的代理對象調用異步方法。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
|
@Controller @RequestMapping ( "/app" ) public class EmailController { //獲取ApplicationContext對象方式有多種,這種最簡單,其它的大家自行了解一下 @Autowired private ApplicationContext applicationContext; @RequestMapping (value = "/email/asyncCall" , method = GET) @ResponseBody public Map<String, Object> asyncCall () { Map<String, Object> resMap = new HashMap<String, Object>(); try { //這樣調用同類下的異步方法是不起作用的 //this.testAsyncTask(); //通過上下文獲取自己的代理對象調用異步方法 EmailController emailController = (EmailController)applicationContext.getBean(EmailController. class ); emailController.testAsyncTask(); resMap.put( "code" , 200 ); } catch (Exception e) { resMap.put( "code" , 400 ); logger.error( "error!" ,e); } return resMap; } //注意一定是public,且是非static方法 @Async public void testAsyncTask() throws InterruptedException { Thread.sleep( 10000 ); System.out.println( "異步任務執行完成!" ); } } |
6、開啟cglib代理,手動獲取Spring代理類,從而調用同類下的異步方法。
首先,在啟動類上加上@EnableAspectJAutoProxy(exposeProxy = true)注解。
代碼實現,如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
|
@Service @Transactional (value = "transactionManager" , readOnly = false , propagation = Propagation.REQUIRED, rollbackFor = Throwable. class ) public class EmailService { @Autowired private ApplicationContext applicationContext; @Async public void testSyncTask() throws InterruptedException { Thread.sleep( 10000 ); System.out.println( "異步任務執行完成!" ); } public void asyncCallTwo() throws InterruptedException { //this.testSyncTask(); // EmailService emailService = (EmailService)applicationContext.getBean(EmailService.class); // emailService.testSyncTask(); boolean isAop = AopUtils.isAopProxy(EmailController. class ); //是否是代理對象; boolean isCglib = AopUtils.isCglibProxy(EmailController. class ); //是否是CGLIB方式的代理對象; boolean isJdk = AopUtils.isJdkDynamicProxy(EmailController. class ); //是否是JDK動態代理方式的代理對象; //以下才是重點!!! EmailService emailService = (EmailService)applicationContext.getBean(EmailService. class ); EmailService proxy = (EmailService) AopContext.currentProxy(); System.out.println(emailService == proxy ? true : false ); proxy.testSyncTask(); System.out.println( "end!!!" ); } } |
以上為個人經驗,希望能給大家一個參考,也希望大家多多支持服務器之家。
原文鏈接:https://blog.csdn.net/qq_34178998/article/details/95939425