什么Netty?
Netty是由JBOSS提供的一個java開源框架。Netty提供異步的、事件驅(qū)動的網(wǎng)絡(luò)應(yīng)用程序框架和工具,用以快速開發(fā)高性能、高可靠性的網(wǎng)絡(luò)服務(wù)器和客戶端程序。
也就是說,Netty 是一個基于NIO的客戶、服務(wù)器端編程框架,使用Netty 可以確保你快速和簡單的開發(fā)出一個網(wǎng)絡(luò)應(yīng)用,例如實現(xiàn)了某種協(xié)議的客戶,服務(wù)端應(yīng)用。Netty相當(dāng)簡化和流線化了網(wǎng)絡(luò)應(yīng)用的編程開發(fā)過程,例如,TCP和UDP的socket服務(wù)開發(fā)。
我們下面編寫四個類
1.用于接收數(shù)據(jù)的服務(wù)器端Socket
2.用于接收客戶端的消息,用于接收和反饋客戶端發(fā)出的消息類ServertHandler
3.用于發(fā)送數(shù)據(jù)的服務(wù)器端Client
4.用于發(fā)送數(shù)據(jù)和接收服務(wù)器端發(fā)出的數(shù)據(jù)處理類ClientHandler
Socket.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
|
import io.netty.bootstrap.ServerBootstrap; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.DelimiterBasedFrameDecoder; import io.netty.handler.codec.string.StringDecoder; public class Server { public static void main(String[] args) throws InterruptedException { //1.第一個線程組是用于接收Client端連接的 EventLoopGroup bossGroup = new NioEventLoopGroup(); //2.第二個線程組是用于實際的業(yè)務(wù)處理的 EventLoopGroup workerGroup = new NioEventLoopGroup(); ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup); //綁定兩個線程池 b.channel(NioServerSocketChannel. class ); //指定NIO的模式,如果是客戶端就是NioSocketChannel b.option(ChannelOption.SO_BACKLOG, 1024 ); //TCP的緩沖區(qū)設(shè)置 b.option(ChannelOption.SO_SNDBUF, 32 * 1024 ); //設(shè)置發(fā)送緩沖的大小 b.option(ChannelOption.SO_RCVBUF, 32 * 1024 ); //設(shè)置接收緩沖區(qū)大小 b.option(ChannelOption.SO_KEEPALIVE, true ); //保持連續(xù) b.childHandler( new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel sc) throws Exception { ByteBuf buf = Unpooled.copiedBuffer( "$_" .getBytes()); //拆包粘包定義結(jié)束字符串(第一種解決方案) sc.pipeline().addLast( new DelimiterBasedFrameDecoder( 1024 ,buf)); //在管道中加入結(jié)束字符串 // sc.pipeline().addLast(new FixedLengthFrameDecoder(200));第二種定長 sc.pipeline().addLast( new StringDecoder()); //定義接收類型為字符串把ByteBuf轉(zhuǎn)成String sc.pipeline().addLast( new ServertHandler()); //在這里配置具體數(shù)據(jù)接收方法的處理 } }); ChannelFuture future = b.bind( 8765 ).sync(); //綁定端口 future.channel().closeFuture().sync(); //等待關(guān)閉(程序阻塞在這里等待客戶端請求) bossGroup.shutdownGracefully(); //關(guān)閉線程 workerGroup.shutdownGracefully(); //關(guān)閉線程 } } |
1.在上面這個Server.java中,我們都要定義兩個線程池,boss和worker,boss是用于管理連接到server端的client的連接數(shù)的線程池,而woeker是用于管理實際操作的線程池。
2.ServerBootstrap用一個ServerSocketChannelFactory 來實例化。ServerSocketChannelFactory 有兩種選擇,一種是NioServerSocketChannelFactory,一種是OioServerSocketChannelFactory。 前者使用NIO,后則使用普通的阻塞式IO。它們都需要兩個線程池實例作為參數(shù)來初始化,一個是boss線程池,一個是worker線程池。
3.然后使ServerBookstrap管理boss和worker線程池。并且設(shè)置各個緩沖區(qū)的大小。
4.這里的事件處理類經(jīng)常會被用來處理一個最近的已經(jīng)接收的Channel。ChannelInitializer是一個特殊的處理類,他的目的是幫助使用者配置一個新的Channel。也許你想通過增加一些處理類比如NettyServerHandler來配置一個新的Channel 或者其對應(yīng)的ChannelPipeline來實現(xiàn)你的網(wǎng)絡(luò)程序。 當(dāng)你的程序變的復(fù)雜時,可能你會增加更多的處理類到pipline上,然后提取這些匿名類到最頂層的類上。
5.在使用原始的encoder、decoder的情況下,Netty發(fā)送接收數(shù)據(jù)都是按照ByteBuf的形式,其它形式都是不合法的。 而在上面這個Socket中,我使用sc.pipeline().addLast()
這個方法設(shè)置了接收為字符串類型,注意:只能設(shè)置接收為字符串類型,發(fā)送還是需要發(fā)送ByteBuf類型的數(shù)據(jù)。而且在這里我還設(shè)置了以$_為結(jié)尾的字符串就代表了本次請求字符串的結(jié)束。
6.通過b.bind
綁定端口,用于監(jiān)聽的端口號。
ServerHandler.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
public class ServertHandler extends ChannelHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { String body = (String) msg; System.out.println( "server" +body); //前面已經(jīng)定義了接收為字符串,這里直接接收字符串就可以 //服務(wù)端給客戶端的響應(yīng) String response= " hi client!$_" ; //發(fā)送的數(shù)據(jù)以定義結(jié)束的字符串結(jié)尾 ctx.writeAndFlush(Unpooled.copiedBuffer(response.getBytes())); //發(fā)送必須還是ByteBuf類型 } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } } |
ServertHandler繼承自 ChannelHandlerAdapter,這個類實現(xiàn)了ChannelHandler接口,ChannelHandler提供了許多事件處理的接口方法,然后你可以覆蓋這些方法。現(xiàn)在僅僅只需要繼承ChannelHandlerAdapter類而不是你自己去實現(xiàn)接口方法。
1.由于我們再server端開始的時候已經(jīng)定義了接收類型為String,所以在這里我們接收到的msg直接強轉(zhuǎn)成String就可以了。同時也要定義以什么為一次請求的結(jié)尾。
Client.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
public class Client { public static void main(String[] args) throws InterruptedException { EventLoopGroup worker = new NioEventLoopGroup(); Bootstrap b = new Bootstrap(); b.group(worker) .channel(NioSocketChannel. class ) .handler( new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel sc) throws Exception { ByteBuf buf = Unpooled.copiedBuffer( "$_" .getBytes()); sc.pipeline().addLast( new DelimiterBasedFrameDecoder( 1024 ,buf)); sc.pipeline().addLast( new StringDecoder()); sc.pipeline().addLast( new ClientHandler()); } }); ChannelFuture f=b.connect( "127.0.0.1" , 8765 ).sync(); f.channel().writeAndFlush(Unpooled.copiedBuffer( " hi server2$_" .getBytes())); f.channel().writeAndFlush(Unpooled.copiedBuffer( " hi server3$_" .getBytes())); f.channel().writeAndFlush(Unpooled.copiedBuffer( " hi server4$_" .getBytes())); f.channel().closeFuture().sync(); worker.shutdownGracefully(); } } |
client端和Socket端幾乎代碼相同,只是client端用的不是ServerBootstrap而是Bootstrap來管理連接。這里沒什么好說的。
ClientHandler.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
public class ClientHandler extends ChannelHandlerAdapter{ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { try { System.out.println( "client" +msg.toString()); } finally { ReferenceCountUtil.release(msg); //釋放緩沖區(qū) } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } } |
ClientHandler和ServertHandler代碼和原理也是一樣,只是在client端我們要釋放緩沖區(qū)。為什么在ServerHandler我們不需要釋放呢 ?因為在ServertHandler我們調(diào)用ctx.writeAndFlush
方法的時候,這個方法默認已經(jīng)幫我們釋放了緩沖區(qū)。
總結(jié)
以上就是這篇文章的全部內(nèi)容了,希望本文的內(nèi)容對大家的學(xué)習(xí)或者工作能帶來一定的幫助,如果有疑問大家可以留言交流,謝謝大家對服務(wù)器之家的支持。
原文鏈接:http://blog.csdn.net/a347911/article/details/53734255