前言
在工作中是否會遇到實用websocket客戶端連接服務端的時候,網絡波動,服務端斷連的情況。會導致客戶端被動斷開連接。為了解決這個問題,需要對被動斷開連接的情況進行捕獲,并重新創建連接。這篇文章主要是提供可以直接使用的斷線重連websocket客戶端代碼。
Maven依賴
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
< dependency > < groupId >org.projectlombok</ groupId > < artifactId >lombok</ artifactId > < optional >true</ optional > </ dependency > < dependency > < groupId >cn.hutool</ groupId > < artifactId >hutool-all</ artifactId > < version >5.5.2</ version > </ dependency > < dependency > < groupId >org.java-websocket</ groupId > < artifactId >Java-WebSocket</ artifactId > < version >1.5.1</ version > </ dependency > |
代碼
不廢話,上代碼。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
|
package ai.guiji.csdn.ws.client; import cn.hutool.core.thread.ThreadUtil; import cn.hutool.core.util.StrUtil; import lombok.extern.slf4j.Slf4j; import org.java_websocket.WebSocket; import org.java_websocket.client.WebSocketClient; import org.java_websocket.framing.Framedata; import org.java_websocket.handshake.ServerHandshake; import javax.net.ssl.*; import java.net.Socket; import java.net.URI; import java.nio.ByteBuffer; import java.security.cert.CertificateException; import java.security.cert.X509Certificate; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; /** @Author huyi @Date 2021/10/15 20:03 @Description: 重連websocket客戶端 */ @Slf4j public class ReConnectWebSocketClient { /** 字符串消息回調 */ private Consumer<String> msgStr; /** 字節流消息回調 */ private Consumer<ByteBuffer> msgByte; /** 異常回調 */ private Consumer<Exception> error; /** 連接標識 */ private String key; /** ws服務端連接 */ private URI serverUri; /** 嘗試重連標識 */ private AtomicBoolean tryReconnect; /** 需要ping標識 */ private AtomicBoolean needPing; /** websocket連接實體 */ private WebSocketClient webSocketClient; /** 重連次數 */ private AtomicInteger reConnectTimes; /** 連接結束標識 */ private AtomicBoolean end; /** 連接后初始發送報文,這里也可以不需要,如果服務端主動斷開連接,重連后可以繼續推送報文的話。 */ private String initReConnectReq; /** 結束回調 */ private Consumer<String> endConsumer; public ReConnectWebSocketClient( URI serverUri, String key, Consumer<String> msgStr, Consumer<ByteBuffer> msgByte, Consumer<Exception> error) { this .msgStr = msgStr; this .msgByte = msgByte; this .error = error; this .key = key; this .serverUri = serverUri; this .tryReconnect = new AtomicBoolean( false ); this .needPing = new AtomicBoolean( true ); this .reConnectTimes = new AtomicInteger( 0 ); this .end = new AtomicBoolean( false ); this .endConsumer = this ::close; init(); } /** 初始化連接 */ public void init() { // 創建連接 createWebSocketClient(); // ping線程 circlePing(); } private void needReconnect() throws Exception { ThreadUtil.sleep( 10 , TimeUnit.SECONDS); int cul = reConnectTimes.incrementAndGet(); if (cul > 3 ) { close( "real stop" ); throw new Exception( "服務端斷連,3次重連均失敗" ); } log.warn( "[{}]第[{}]次斷開重連" , key, cul); if (tryReconnect.get()) { log.error( "[{}]第[{}]次斷開重連結果 -> 連接正在重連,本次重連請求放棄" , key, cul); needReconnect(); return ; } try { tryReconnect.set( true ); if (webSocketClient.isOpen()) { log.warn( "[{}]第[{}]次斷開重連,關閉舊連接" , key, cul); webSocketClient.closeConnection( 2 , "reconnect stop" ); } webSocketClient = null ; createWebSocketClient(); connect(); if (StrUtil.hasBlank(initReConnectReq)) { send(initReConnectReq); } } catch (Exception exception) { log.error( "[{}]第[{}]次斷開重連結果 -> 連接正在重連,重連異常:[{}]" , key, cul, exception.getMessage()); needReconnect(); } finally { tryReconnect.set( false ); } } private void createWebSocketClient() { webSocketClient = new WebSocketClient(serverUri) { @Override public void onOpen(ServerHandshake serverHandshake) { log.info( "[{}]ReConnectWebSocketClient [onOpen]連接成功{}" , key, getRemoteSocketAddress()); tryReconnect.set( false ); } @Override public void onMessage(String text) { log.info( "[{}]ReConnectWebSocketClient [onMessage]接收到服務端數據:text={}" , key, text); msgStr.accept(text); } @Override public void onMessage(ByteBuffer bytes) { log.info( "[{}]ReConnectWebSocketClient [onMessage]接收到服務端數據:bytes={}" , key, bytes); msgByte.accept(bytes); } @Override public void onWebsocketPong(WebSocket conn, Framedata f) { log.info( "[{}]ReConnectWebSocketClient [onWebsocketPong]接收到服務端數據:opcode={}" , key, f.getOpcode()); } @Override public void onClose( int i, String s, boolean b) { log.info( "[{}]ReConnectWebSocketClient [onClose]關閉,s={},b={}" , key, s, b); if (StrUtil.hasBlank(s) || s.contains( "https" )) { if (end.get()) { return ; } try { needReconnect(); } catch (Exception exception) { endConsumer.accept( "reconnect error" ); error.accept(exception); } } } @Override public void onError(Exception e) { log.info( "[{}]ReConnectWebSocketClient [onError]異常,e={}" , key, e); endConsumer.accept( "error close" ); error.accept(e); } }; if (serverUri.toString().contains( "wss://" )) { trustAllHosts(webSocketClient); } } public void circlePing() { new Thread( () -> { while (needPing.get()) { if (webSocketClient.isOpen()) { webSocketClient.sendPing(); } ThreadUtil.sleep( 5 , TimeUnit.SECONDS); } log.warn( "[{}]Ping循環關閉" , key); }) .start(); } /** * 連接 * * @throws Exception 異常 */ public void connect() throws Exception { webSocketClient.connectBlocking( 10 , TimeUnit.SECONDS); } /** * 發送 * * @param msg 消息 * @throws Exception 異常 */ public void send(String msg) throws Exception { this .initReConnectReq = msg; if (webSocketClient.isOpen()) { webSocketClient.send(msg); } } /** * 關閉 * * @param msg 關閉消息 */ public void close(String msg) { needPing.set( false ); end.set( true ); if (webSocketClient != null ) { webSocketClient.closeConnection( 3 , msg); } } /** * 忽略證書 * * @param client */ public void trustAllHosts(WebSocketClient client) { TrustManager[] trustAllCerts = new TrustManager[] { new X509ExtendedTrustManager() { @Override public void checkClientTrusted( X509Certificate[] x509Certificates, String s, Socket socket) throws CertificateException {} @Override public void checkServerTrusted( X509Certificate[] x509Certificates, String s, Socket socket) throws CertificateException {} @Override public void checkClientTrusted( X509Certificate[] x509Certificates, String s, SSLEngine sslEngine) throws CertificateException {} @Override public void checkServerTrusted( X509Certificate[] x509Certificates, String s, SSLEngine sslEngine) throws CertificateException {} @Override public void checkClientTrusted(X509Certificate[] x509Certificates, String s) throws CertificateException {} @Override public void checkServerTrusted(X509Certificate[] x509Certificates, String s) throws CertificateException {} @Override public X509Certificate[] getAcceptedIssuers() { return null ; } } }; try { SSLContext ssl = SSLContext.getInstance( "SSL" ); ssl.init( null , trustAllCerts, new java.security.SecureRandom()); SSLSocketFactory socketFactory = ssl.getSocketFactory(); client.setSocketFactory(socketFactory); } catch (Exception e) { log.error( "ReConnectWebSocketClient trustAllHosts 異常,e={0}" , e); } } } |
代碼說明:
1、參數的重連次數可以配置。
2、增加異步pingpong線程,一旦結束連接會自動關閉。
3、對字符串、字節流、異常都有回調措施。
測試代碼方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
public static void main(String[] args) throws Exception { ReConnectWebSocketClient client = new ReConnectWebSocketClient( new URI(String.format( "wss://192.168.1.77:24009" )), "test" , // 字符串消息處理 msg -> { // todo 字符串消息處理 System.out.println( "字符串消息:" + msg); }, null , // 異常回調 error -> { // todo 字符串消息處理 System.out.println( "異常:" + error.getMessage()); }); client.connect(); client.send( "haha" ); } |
驗證結果
16:08:54.468 [WebSocketConnectReadThread-12] INFO ai.guiji.csdn.ws.client.ReConnectWebSocketClient - [test]ReConnectWebSocketClient [onOpen]連接成功/192.168.1.77:24009
16:08:54.475 [WebSocketConnectReadThread-12] INFO ai.guiji.csdn.ws.client.ReConnectWebSocketClient - [test]ReConnectWebSocketClient [onMessage]接收到服務端數據:text=connect success from tcp4:192.168.6.63:11018!
字符串消息:connect success from tcp4:192.168.6.63:11018!
16:08:56.080 [WebSocketConnectReadThread-12] INFO ai.guiji.csdn.ws.client.ReConnectWebSocketClient - [test]ReConnectWebSocketClient [onClose]關閉,s=,b=true
16:09:06.097 [WebSocketConnectReadThread-12] WARN ai.guiji.csdn.ws.client.ReConnectWebSocketClient - [test]第[1]次斷開重連
16:09:06.150 [WebSocketConnectReadThread-16] INFO ai.guiji.csdn.ws.client.ReConnectWebSocketClient - [test]ReConnectWebSocketClient [onOpen]連接成功/192.168.1.77:24009
16:09:06.150 [WebSocketConnectReadThread-16] INFO ai.guiji.csdn.ws.client.ReConnectWebSocketClient - [test]ReConnectWebSocketClient [onMessage]接收到服務端數據:text=connect success from tcp4:192.168.6.63:11038!
字符串消息:connect success from tcp4:192.168.6.63:11038!
16:09:09.369 [WebSocketConnectReadThread-16] INFO ai.guiji.csdn.ws.client.ReConnectWebSocketClient - [test]ReConnectWebSocketClient [onWebsocketPong]接收到服務端數據:opcode=PONG
16:09:14.370 [WebSocketConnectReadThread-16] INFO ai.guiji.csdn.ws.client.ReConnectWebSocketClient - [test]ReConnectWebSocketClient [onWebsocketPong]接收到服務端數據:opcode=PONG
16:09:19.371 [WebSocketConnectReadThread-16] INFO ai.guiji.csdn.ws.client.ReConnectWebSocketClient - [test]ReConnectWebSocketClient [onWebsocketPong]接收到服務端數據:opcode=PONG
16:09:24.379 [WebSocketConnectReadThread-16] INFO ai.guiji.csdn.ws.client.ReConnectWebSocketClient - [test]ReConnectWebSocketClient [onWebsocketPong]接收到服務端數據:opcode=PONG
16:09:29.382 [WebSocketConnectReadThread-16] INFO ai.guiji.csdn.ws.client.ReConnectWebSocketClient - [test]ReConnectWebSocketClient [onWebsocketPong]接收到服務端數據:opcode=PONG
16:09:34.398 [WebSocketConnectReadThread-16] INFO ai.guiji.csdn.ws.client.ReConnectWebSocketClient - [test]ReConnectWebSocketClient [onWebsocketPong]接收到服務端數據:opcode=PONG
16:09:39.402 [WebSocketConnectReadThread-16] INFO ai.guiji.csdn.ws.client.ReConnectWebSocketClient - [test]ReConnectWebSocketClient [onWebsocketPong]接收到服務端數據:opcode=PONG
16:09:44.404 [WebSocketConnectReadThread-16] INFO ai.guiji.csdn.ws.client.ReConnectWebSocketClient - [test]ReConnectWebSocketClient [onWebsocketPong]接收到服務端數據:opcode=PONG
16:09:49.415 [WebSocketConnectReadThread-16] INFO ai.guiji.csdn.ws.client.ReConnectWebSocketClient - [test]ReConnectWebSocketClient [onWebsocketPong]接收到服務端數據:opcode=PONG
16:09:54.429 [WebSocketConnectReadThread-16] INFO ai.guiji.csdn.ws.client.ReConnectWebSocketClient - [test]ReConnectWebSocketClient [onWebsocketPong]接收到服務端數據:opcode=PONG
16:09:59.437 [WebSocketConnectReadThread-16] INFO ai.guiji.csdn.ws.client.ReConnectWebSocketClient - [test]ReConnectWebSocketClient [onWebsocketPong]接收到服務端數據:opcode=PONG
16:10:04.449 [WebSocketConnectReadThread-16] INFO ai.guiji.csdn.ws.client.ReConnectWebSocketClient - [test]ReConnectWebSocketClient [onWebsocketPong]接收到服務端數據:opcode=PONG
16:10:06.154 [WebSocketConnectReadThread-16] INFO ai.guiji.csdn.ws.client.ReConnectWebSocketClient - [test]ReConnectWebSocketClient [onWebsocketPong]接收到服務端數據:opcode=PONG
16:10:09.455 [WebSocketConnectReadThread-16] INFO ai.guiji.csdn.ws.client.ReConnectWebSocketClient - [test]ReConnectWebSocketClient [onWebsocketPong]接收到服務端數據:opcode=PONG
16:10:14.462 [WebSocketConnectReadThread-16] INFO ai.guiji.csdn.ws.client.ReConnectWebSocketClient - [test]ReConnectWebSocketClient [onWebsocketPong]接收到服務端數據:opcode=PONG
16:10:19.468 [WebSocketConnectReadThread-16] INFO ai.guiji.csdn.ws.client.ReConnectWebSocketClient - [test]ReConnectWebSocketClient [onWebsocketPong]接收到服務端數據:opcode=PONG
16:10:19.644 [WebSocketConnectReadThread-16] INFO ai.guiji.csdn.ws.client.ReConnectWebSocketClient - [test]ReConnectWebSocketClient [onClose]關閉,s=,b=true
16:10:29.654 [WebSocketConnectReadThread-16] WARN ai.guiji.csdn.ws.client.ReConnectWebSocketClient - [test]第[2]次斷開重連
16:10:31.710 [WebSocketConnectReadThread-19] INFO ai.guiji.csdn.ws.client.ReConnectWebSocketClient - [test]ReConnectWebSocketClient [onError]異常,e={}
java.net.ConnectException: Connection refused: connect
at java.net.DualStackPlainSocketImpl.connect0(Native Method)
at java.net.DualStackPlainSocketImpl.socketConnect(DualStackPlainSocketImpl.java:79)
at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
at java.net.PlainSocketImpl.connect(PlainSocketImpl.java:172)
at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
at java.net.Socket.connect(Socket.java:589)
at sun.security.ssl.SSLSocketImpl.connect(SSLSocketImpl.java:673)
at org.java_websocket.client.WebSocketClient.run(WebSocketClient.java:461)
at java.lang.Thread.run(Thread.java:748)
16:10:31.710 [WebSocketConnectReadThread-19] INFO ai.guiji.csdn.ws.client.ReConnectWebSocketClient - [test]ReConnectWebSocketClient [onClose]關閉,s=error close,b=false
異常:Connection refused: connect
16:10:34.473 [Thread-0] WARN ai.guiji.csdn.ws.client.ReConnectWebSocketClient - [test]Ping循環關閉
這里我才用的是手動關閉服務端方式觸發,客戶端被動斷連情況。重連兩次,第二次服務端還未啟動導致異常觸發。
到此這篇關于java WebSocket客戶端斷線重連的實現方法的文章就介紹到這了,更多相關java WebSocket客戶端斷線重連內容請搜索服務器之家以前的文章或繼續瀏覽下面的相關文章希望大家以后多多支持服務器之家!
原文鏈接:https://blog.csdn.net/zhiweihongyan1/article/details/120956762