在Spring整合websocket整合應用示例(上)文章中,我們已經實現了websocket,但還有一個核心的業務實現類沒有實現,這里我們就實現這個業務核心類,因為老夫參與的這個系統使用websocket發送消息,所以其實現就是如何發送消息了。
7. NewsListenerImpl的實現
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
|
package cn.bridgeli.websocket; import com.google.gson.Gson; import com.google.gson.GsonBuilder; import com.lagou.common.base.util.date.DateUtil; import com.lagou.platform.news.api.enumeration.PlatNewsCategoryType; import com.lagou.platform.news.web.dao.ext.model.PlatNewsVo; import com.lagou.platform.news.web.dao.ext.model.SearchCondition; import com.lagou.platform.news.web.quartz.impl.TimingJob; import com.lagou.platform.news.web.service.PlatNewsService; import org.apache.commons.lang.StringUtils; import org.json.simple.JSONArray; import org.json.simple.JSONObject; import org.quartz.*; import org.quartz.impl.StdSchedulerFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import org.springframework.web.socket.TextMessage; import java.io.IOException; import java.util.Date; import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /** * @Description : 站內消息監聽器實現 * @Date : 16-3-7 */ @Component public class NewsListenerImpl implements NewsListener{ private static final Logger logger = LoggerFactory.getLogger(NewsListenerImpl. class ); Gson gson = new GsonBuilder().setDateFormat( "yyyy-MM-dd HH:mm:ss" ).create(); //線程池 private ExecutorService executorService = Executors.newCachedThreadPool(); //任務調度 private SchedulerFactory sf = new StdSchedulerFactory(); @Autowired private PlatNewsService platNewsService; @Override public void afterPersist(PlatNewsVo platNewsVo) { logger.info( "監聽到有新消息添加。。。" ); logger.info( "新消息為:" +gson.toJson(platNewsVo)); //啟動線程 if ( null != platNewsVo && !StringUtils.isBlank(platNewsVo.getCurrentoperatoremail())){ //如果是定時消息 if (platNewsVo.getNewsType() == PlatNewsCategoryType.TIMING_TIME.getCategoryId()){ startTimingTask(platNewsVo); //定時推送 } else { //立即推送 executorService.execute( new AfterConnectionEstablishedTask(platNewsVo.getCurrentoperatoremail())); } } } @Override public void afterConnectionEstablished(String email) { logger.info( "建立websocket連接后推送新消息。。。" ); if (!StringUtils.isBlank(email)){ executorService.execute( new AfterConnectionEstablishedTask(email)); } } /** * @Description : 如果新添加了定時消息,啟動定時消息任務 * @param platNewsVo */ private void startTimingTask(PlatNewsVo platNewsVo){ logger.info( "開始定時推送消息任務。。。" ); Date timingTime = platNewsVo.getTimingTime(); if ( null == timingTime){ logger.info( "定時消息時間為null。" ); return ; } logger.info( "定時推送任務時間為:" +DateUtil.date2String(timingTime)); JobDetail jobDetail= JobBuilder.newJob(TimingJob. class ) .withIdentity(platNewsVo.getCurrentoperatoremail()+ "定時消息" +platNewsVo.getId(), "站內消息" ) .build(); //傳遞參數 jobDetail.getJobDataMap().put( "platNewsService" ,platNewsService); jobDetail.getJobDataMap().put( "userEmail" ,platNewsVo.getCurrentoperatoremail()); Trigger trigger= TriggerBuilder .newTrigger() .withIdentity( "定時消息觸發" +platNewsVo.getId(), "站內消息" ) .startAt(timingTime) .withSchedule(SimpleScheduleBuilder.simpleSchedule() .withIntervalInSeconds( 0 ) //時間間隔 .withRepeatCount( 0 ) //重復次數 ) .build(); //啟動定時任務 try { Scheduler sched = sf.getScheduler(); sched.scheduleJob(jobDetail,trigger); if (!sched.isShutdown()){ sched.start(); } } catch (SchedulerException e) { logger.info(e.toString()); } logger.info( "完成開啟定時推送消息任務。。。" ); } /** * @Description : 建立websocket鏈接后的推送線程 */ class AfterConnectionEstablishedTask implements Runnable{ String email ; public AfterConnectionEstablishedTask(String email){ this .email = email; } @Override public void run() { logger.info( "開始推送消息給用戶:" +email+ "。。。" ); if (!StringUtils.isBlank(email)){ SearchCondition searchCondition = new SearchCondition(); searchCondition.setOperatorEmail(email); JSONArray jsonArray = new JSONArray(); for (PlatNewsCategoryType type : PlatNewsCategoryType.values()){ searchCondition.setTypeId(type.getCategoryId()); int count = platNewsService.countPlatNewsByExample(searchCondition); JSONObject object = new JSONObject(); object.put( "name" ,type.name()); object.put( "description" ,type.getDescription()); object.put( "count" ,count); jsonArray.add(object); } if ( null != jsonArray && jsonArray.size()> 0 ){ UserSocketVo userSocketVo = WSSessionLocalCache.get(email); TextMessage reMessage = new TextMessage(gson.toJson(jsonArray)); try { if ( null != userSocketVo){ //推送消息 userSocketVo.getWebSocketSession().sendMessage(reMessage); //更新推送時間 userSocketVo.setLastSendTime(DateUtil.getNowDate()); logger.info( "完成推送新消息給用戶:" +userSocketVo.getUserEmail()+ "。。。" ); } } catch (IOException e) { logger.error(e.toString()); logger.info( "站內消息推送失敗。。。" +e.toString()); } } } logger.info( "結束推送消息給" +email+ "。。。" ); } } } |
這個類就是websocket的核心業務的實現,其具體肯定和業務相關,由于業務的不同,實現肯定不同,因為老夫參與的系統是發送消息,所以里面最核心的一句就是:
1
|
userSocketVo.getWebSocketSession().sendMessage(reMessage); |
通過WebSocketSession的sendMessage方法把我們的消息發送出去。另外,這主要是后端的實現,至于前端的實現,因為老夫是后端程序猿比較關注后端,所以前端就不多做介紹了,大家可以自己去網上查資料。最后需要說明的是,老夫之前搜一些學習資料的時候,發現老夫該同事的寫法和有一篇文章幾乎一樣,我想該同事應該是參考了這篇文章,所以列在下面,算作參考資料。