国产片侵犯亲女视频播放_亚洲精品二区_在线免费国产视频_欧美精品一区二区三区在线_少妇久久久_在线观看av不卡

服務(wù)器之家:專注于服務(wù)器技術(shù)及軟件下載分享
分類導(dǎo)航

PHP教程|ASP.NET教程|Java教程|ASP教程|編程技術(shù)|正則表達(dá)式|C/C++|IOS|C#|Swift|Android|VB|R語言|JavaScript|易語言|vb.net|

服務(wù)器之家 - 編程語言 - Java教程 - SpringBoot+Netty+WebSocket實(shí)現(xiàn)消息發(fā)送的示例代碼

SpringBoot+Netty+WebSocket實(shí)現(xiàn)消息發(fā)送的示例代碼

2020-09-20 12:32阿杜同學(xué) Java教程

這篇文章主要介紹了SpringBoot+Netty+WebSocket實(shí)現(xiàn)消息發(fā)送的示例代碼,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧

一.導(dǎo)入Netty依賴

?
1
2
3
4
5
<dependency>
   <groupId>io.netty</groupId>
   <artifactId>netty-all</artifactId>
   <version>4.1.25.Final</version>
  </dependency>

二.搭建websocket服務(wù)器

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
@Component
public class WebSocketServer {
 
 /**
  * 主線程池
  */
 private EventLoopGroup bossGroup;
 /**
  * 工作線程池
  */
 private EventLoopGroup workerGroup;
 /**
  * 服務(wù)器
  */
 private ServerBootstrap server;
 /**
  * 回調(diào)
  */
 private ChannelFuture future;
 
 public void start() {
  future = server.bind(9001);
  System.out.println("netty server - 啟動成功");
 }
 
 public WebSocketServer() {
  bossGroup = new NioEventLoopGroup();
  workerGroup = new NioEventLoopGroup();
 
  server = new ServerBootstrap();
  server.group(bossGroup, workerGroup)
    .channel(NioServerSocketChannel.class)
    .childHandler(new WebsocketInitializer());
 }
}

三.初始化Websocket

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class WebsocketInitializer extends ChannelInitializer<SocketChannel> {
 
 @Override
 protected void initChannel(SocketChannel ch) throws Exception {
  ChannelPipeline pipeline = ch.pipeline();
  // ------------------
  // 用于支持Http協(xié)議
  // ------------------
  // websocket基于http協(xié)議,需要有http的編解碼器
  pipeline.addLast(new HttpServerCodec());
  // 對寫大數(shù)據(jù)流的支持
  pipeline.addLast(new ChunkedWriteHandler());
  // 添加對HTTP請求和響應(yīng)的聚合器:只要使用Netty進(jìn)行Http編程都需要使用
  //設(shè)置單次請求的文件的大小
  pipeline.addLast(new HttpObjectAggregator(1024 * 64));
  //webSocket 服務(wù)器處理的協(xié)議,用于指定給客戶端連接訪問的路由 :/ws
  pipeline.addLast(new WebSocketServerProtocolHandler("/ws"));
  // 添加Netty空閑超時(shí)檢查的支持
  // 1. 讀空閑超時(shí)(超過一定的時(shí)間會發(fā)送對應(yīng)的事件消息)
  // 2. 寫空閑超時(shí)
  // 3. 讀寫空閑超時(shí)
  pipeline.addLast(new IdleStateHandler(4, 8, 12));
  //添加心跳處理
  pipeline.addLast(new HearBeatHandler());
  // 添加自定義的handler
  pipeline.addLast(new ChatHandler());
 
 }
}

四.創(chuàng)建Netty監(jiān)聽器

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@Component
public class NettyListener implements ApplicationListener<ContextRefreshedEvent> {
 
 @Resource
 private WebSocketServer websocketServer;
 
 @Override
 public void onApplicationEvent(ContextRefreshedEvent event) {
  if(event.getApplicationContext().getParent() == null) {
   try {
    websocketServer.start();
   } catch (Exception e) {
    e.printStackTrace();
   }
  }
 }
}

五.建立消息通道

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
public class UserChannelMap {
 /**
  * 用戶保存用戶id與通道的Map對象
  */
// private static Map<String, Channel> userChannelMap;
 
 /* static {
  userChannelMap = new HashMap<String, Channel>();
 }*/
 
 /**
  * 定義一個channel組,管理所有的channel
  * GlobalEventExecutor.INSTANCE 是全局的事件執(zhí)行器,是一個單例
  */
 private static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
 
 /**
  * 存放用戶與Chanel的對應(yīng)信息,用于給指定用戶發(fā)送消息
  */
 private static ConcurrentHashMap<String,Channel> userChannelMap = new ConcurrentHashMap<>();
 
 private UserChannelMap(){}
 /**
  * 添加用戶id與channel的關(guān)聯(lián)
  * @param userNum
  * @param channel
  */
 public static void put(String userNum, Channel channel) {
  userChannelMap.put(userNum, channel);
 }
 
 /**
  * 根據(jù)用戶id移除用戶id與channel的關(guān)聯(lián)
  * @param userNum
  */
 public static void remove(String userNum) {
  userChannelMap.remove(userNum);
 }
 
 /**
  * 根據(jù)通道id移除用戶與channel的關(guān)聯(lián)
  * @param channelId 通道的id
  */
 public static void removeByChannelId(String channelId) {
  if(!StringUtils.isNotBlank(channelId)) {
   return;
  }
  for (String s : userChannelMap.keySet()) {
   Channel channel = userChannelMap.get(s);
   if(channelId.equals(channel.id().asLongText())) {
    System.out.println("客戶端連接斷開,取消用戶" + s + "與通道" + channelId + "的關(guān)聯(lián)");
    userChannelMap.remove(s);
    UserService userService = SpringUtil.getBean(UserService.class);
    userService.logout(s);
    break;
   }
  }
 }
 
 /**
  * 打印所有的用戶與通道的關(guān)聯(lián)數(shù)據(jù)
  */
 public static void print() {
  for (String s : userChannelMap.keySet()) {
   System.out.println("用戶id:" + s + " 通道:" + userChannelMap.get(s).id());
  }
 }
 
 /**
  * 根據(jù)好友id獲取對應(yīng)的通道
  * @param receiverNum 接收人編號
  * @return Netty通道
  */
 public static Channel get(String receiverNum) {
  return userChannelMap.get(receiverNum);
 }
 
 /**
  * 獲取channel組
  * @return
  */
 public static ChannelGroup getChannelGroup() {
  return channelGroup;
 }
 
 /**
  * 獲取用戶channel map
  * @return
  */
 public static ConcurrentHashMap<String,Channel> getUserChannelMap(){
  return userChannelMap;
 }
}

六.自定義消息類型

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
public class Message {
 /**
  * 消息類型
  */
 private Integer type;
 /**
  * 聊天消息
  */
 private String message;
 /**
  * 擴(kuò)展消息字段
  */
 private Object ext;
 public Integer getType() {
  return type;
 }
 
 public void setType(Integer type) {
  this.type = type;
 }
 
 public MarketChatRecord getChatRecord() {
  return marketChatRecord;
 }
 public void setChatRecord(MarketChatRecord chatRecord) {
  this.marketChatRecord = chatRecord;
 }
 
 public Object getExt() {
  return ext;
 }
 
 public void setExt(Object ext) {
  this.ext = ext;
 }
 
 @Override
 public String toString() {
  return "Message{" +
    "type=" + type +
    ", marketChatRecord=" + marketChatRecord +
    ", ext=" + ext +
    '}';
 }
 
}

七.創(chuàng)建處理消息的handler

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
public class ChatHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
 private static final Logger log = LoggerFactory.getLogger(WebSocketServer.class);
 
 
 /**
  * 用來保存所有的客戶端連接
  */
 private static ChannelGroup clients = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
 
 /**
  *當(dāng)Channel中有新的事件消息會自動調(diào)用
  */
 @Override
 protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
  // 當(dāng)接收到數(shù)據(jù)后會自動調(diào)用
  // 獲取客戶端發(fā)送過來的文本消息
  Gson gson = new Gson();
  log.info("服務(wù)器收到消息:{}",msg.text());
  System.out.println("接收到消息數(shù)據(jù)為:" + msg.text());
  Message message = gson.fromJson(msg.text(), Message.class);
//根據(jù)業(yè)務(wù)要求進(jìn)行消息處理
  switch (message.getType()) {
   // 處理客戶端連接的消息
   case 0:
    // 建立用戶與通道的關(guān)聯(lián)
   // 處理客戶端發(fā)送好友消息
    break;
   case 1:
   // 處理客戶端的簽收消息
    break;
   case 2:
    // 將消息記錄設(shè)置為已讀
    break;
   case 3:
    // 接收心跳消息
    break;
   default:
    break;
  }
 
 }
 
 // 當(dāng)有新的客戶端連接服務(wù)器之后,會自動調(diào)用這個方法
 @Override
 public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
  log.info("handlerAdded 被調(diào)用"+ctx.channel().id().asLongText());
  // 添加到channelGroup 通道組
  UserChannelMap.getChannelGroup().add(ctx.channel());
//  clients.add(ctx.channel());
 }
 
 @Override
 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
  log.info("{異常:}"+cause.getMessage());
  // 刪除通道
  UserChannelMap.getChannelGroup().remove(ctx.channel());
  UserChannelMap.removeByChannelId(ctx.channel().id().asLongText());
  ctx.channel().close();
 }
 
 @Override
 public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
  log.info("handlerRemoved 被調(diào)用"+ctx.channel().id().asLongText());
  //刪除通道
  UserChannelMap.getChannelGroup().remove(ctx.channel());
  UserChannelMap.removeByChannelId(ctx.channel().id().asLongText());
  UserChannelMap.print();
 }
 
}

八.處理心跳

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class HearBeatHandler extends ChannelInboundHandlerAdapter {
 
 @Override
 public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
  if(evt instanceof IdleStateEvent) {
   IdleStateEvent idleStateEvent = (IdleStateEvent)evt;
 
   if(idleStateEvent.state() == IdleState.READER_IDLE) {
    System.out.println("讀空閑事件觸發(fā)...");
   }
   else if(idleStateEvent.state() == IdleState.WRITER_IDLE) {
    System.out.println("寫空閑事件觸發(fā)...");
   }
   else if(idleStateEvent.state() == IdleState.ALL_IDLE) {
    System.out.println("---------------");
    System.out.println("讀寫空閑事件觸發(fā)");
    System.out.println("關(guān)閉通道資源");
    ctx.channel().close();
   }
  }
 }
}

搭建完成后調(diào)用測試

1.頁面訪問http://localhost:9001/ws
 2.端口號9001和訪問路徑ws都是我們在上邊配置的,然后傳入我們自定義的消息message類型。
3.大概流程:消息發(fā)送 :用戶1先連接通道,然后發(fā)送消息給用戶2,用戶2若是在線直接可以發(fā)送給用戶,若沒在線可以將消息暫存在redis或者通道里,用戶2鏈接通道的話,兩者可以直接通訊。
消息推送 :用戶1連接通道,根據(jù)通道id查詢要推送的人是否在線,或者推送給所有人,這里我只推送給指定的人。

到此這篇關(guān)于SpringBoot+Netty+WebSocket實(shí)現(xiàn)消息發(fā)送的示例代碼的文章就介紹到這了,更多相關(guān)SpringBoot Netty WebSocket消息發(fā)送內(nèi)容請搜索服務(wù)器之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持服務(wù)器之家!

原文鏈接:https://blog.csdn.net/qq_35142561/article/details/108664780

延伸 · 閱讀

精彩推薦
主站蜘蛛池模板: 91精品久久久久久久久久久久久久久 | 国产青青草| 国产午夜精品一区二区三区 | 色99在线 | 亚洲成人午夜电影 | 欧美精品在线视频 | 午夜免费剧场 | 免费的一级毛片 | 国产人久久人人人人爽 | 欧美日本韩国一区二区三区 | 亚洲国产成人精品女人久久久 | 亚洲三区在线观看 | 国产剧情一区二区 | 亚洲一区二区在线 | 色狠狠综合天天综合综合 | 国产精品福利视频 | 免费观看污视频 | 免费成人在线视频网站 | 亚洲欧美久久 | 久久久精品一区二区三区 | 日韩专区视频 | 黄色毛片在线看 | 好吊妞国产欧美日韩免费观看视频 | 在线观看免费视频国产 | 亚洲综合精品 | 国产精品一区二区在线观看 | 午夜免费小视频 | 久久久国产视频 | 欧美日韩成人在线观看 | 国产噜噜噜噜噜久久久久久久久 | 久久av网 | 免费h在线观看 | 国产不卡免费视频 | 欧美精品一区二区三区一线天视频 | 久久精品电影 | 依人九九宗合九九九 | 瑟瑟视频在线观看 | 国产黄色免费网站 | 国产精品不卡一区二区三区 | 精品国产99| 青青草精品|