netty相关实践
1.Netty的核心组件
- Channel;
- 回调;
- Future;
- 事件和ChannelHandler;
1.1 Channel
Channel是Java NIO的一个基本构造。
它代表一个到实体(如一个硬件设备、一个文件、一个网络套接字或者一个能够执行一个或者多个不同的I/O操作的程序组件)的开放连接,如读操作和写操作。
目前,可以把 Channel看作是传入(入站)或者传出(出站)数据的载体。因此,它可以被打开或者被关闭,连接或者断开连接。
1.2 回调
Netty 在内部使用了回调来处理事件;当一个回调被触发时,相关的事件可以被一个 interfaceChannelHandler 的实现处理。
当一个新的连接已经被建立时,
ChannelHandler 的 channelActive()回调方法将会被调用,并将打印出一条信息。
1 2 3 4 5 6 7
| public class EchoServerHandler extends ChannelInboundHandlerAdapter { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { ctx.writeAndFlush(Unpooled.copiedBuffer("Netty server rock!", CharsetUtil.UTF_8)); } }
|
1.3 Future
Future 提供了另一种在操作完成时通知应用程序的方式。这个对象可以看作是一个异步操作的结果的占位符;它将在未来的某个时刻完成,并提供对其结果的访问。
例如: 创建一个简单的channel载体 并且连接到远程节点,并且注册一个新的ChannelFutureListener。当监听器被通知已经建立连接的时候,会检查对应的状态,如果是操作成功的,那么可做对应处理。否则抛出异常
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| Channel channel = ...;
ChannelFuture future = channel.connect(new InetSocketAddress("192.168.0.1", 25)); future.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) { if (future.isSuccess()){ ByteBuf buffer = Unpooled.copiedBuffer("Hello",Charset.defaultCharset()); ChannelFuture wf = future.channel() .writeAndFlush(buffer); .... } else { Throwable cause = future.cause(); cause.printStackTrace(); } } });
|
2.Netty简单示例(包含服务端与连接端)
项目结构:

2.1 编写Echo服务器
2.1.1 EchoServerHandler类
编写服务器处理类 我们只需要接收传入人的消息,并且做处理,此处原封返回传入端发送的信息。
- channelRead()—对于每个传入的消息都要调用;
- channelReadComplete()—通知ChannelInboundHandler最后一次对channelRead()的调用是当前批量读取中的最后一条消息;
- exceptionCaught()—在读取操作期间,有异常抛出时会调用。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50
| package pers.brew.test.netty;
import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.util.CharsetUtil;
@ChannelHandler.Sharable public class EchoServerHandler extends ChannelInboundHandlerAdapter { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { ctx.writeAndFlush(Unpooled.copiedBuffer("Netty server rock!", CharsetUtil.UTF_8)); }
@Override public void channelRead(ChannelHandlerContext ctx, Object msg){ ByteBuf in = (ByteBuf) msg; System.out.println("Server received:"+in.toString(CharsetUtil.UTF_8)); ctx.write(in); } @Override public void channelReadComplete(ChannelHandlerContext ctx){ ctx.writeAndFlush(Unpooled.EMPTY_BUFFER) .addListener(ChannelFutureListener.CLOSE); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause){ cause.printStackTrace(); ctx.close(); } }
|
2.1.2 EchoServer引导服务器
在主要服务端启动口,我们需要指定开启的端口号,并且开启netty功能
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65
| package pers.brew.test.netty;
import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel;
import java.net.InetSocketAddress;
public class EchoServer {
private final int port;
public EchoServer(int port) { this.port = port; }
public static void main(String[] args) throws InterruptedException { if (args.length != 1) { System.err.println( "Usage: " + EchoServer.class.getSimpleName() + " <port>"); } int port = Integer.parseInt(args[0]); new EchoServer(port).start(); }
private void start() throws InterruptedException { final EchoServerHandler serverHandler = new EchoServerHandler(); EventLoopGroup group = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); b.group(group) .channel(NioServerSocketChannel.class) .localAddress(new InetSocketAddress(port)) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(serverHandler); } }); ChannelFuture f = b.bind().sync(); f.channel().closeFuture().sync(); }finally { group.shutdownGracefully().sync(); } } }
|
- EchoServerHandler 实现了业务逻辑;
- main()方法引导了服务器;引导过程中所需要的步骤如下:
- 创建一个 ServerBootstrap 的实例以引导和绑定服务器;
- 创建并分配一个 NioEventLoopGroup实例以进行事件的处理,如接受新连接以及读/写数据;
- 指定服务器绑定的本地的 InetSocketAddress;
- 使用一个 EchoServerHandler 的实例初始化每一个新的 Channel;
- 调用 ServerBootstrap.bind()方法以绑定服务器。
2.2 编写Echo客户端
Echo 客户端将会:
- 连接到服务器;
- 发送一个或者多个消息;
- 对于每个消息,等待并接收从服务器发回的相同的消息;
- 关闭连接。
2.2.1 EchoClientHandler类
- channelActive()——在到服务器的连接已经建立之后将被调用;
- channelRead0()——当从服务器接收到一条消息时被调用;
- exceptionCaught()——在处理过程中引发异常时被调用。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
| package pers.brew.test.client;
import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.util.CharsetUtil;
@ChannelHandler.Sharable public class EchoClientHandler extends SimpleChannelInboundHandler<ByteBuf> { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { ctx.writeAndFlush(Unpooled.copiedBuffer("Netty rock!", CharsetUtil.UTF_8)); }
@Override public void channelRead0(ChannelHandlerContext channelHandlerContext, ByteBuf in) throws Exception { System.out.println( "Client received: " + in.toString(CharsetUtil.UTF_8)); }
@Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }
|
2.2.2 EchoClient引导客户端
此处需要指定所连接的host 与端口port
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69
| package pers.brew.test.client;
import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel;
import java.net.InetSocketAddress;
public class EchoClient { private final String host; private final int port;
public EchoClient(String host, int port) { this.host = host; this.port = port; }
public static void main(String[] args) throws InterruptedException { if(args.length != 2){ System.err.println( "Usage: " + EchoClient.class.getSimpleName() + " <host> <port>"); return; } String host = args[0]; int port = Integer.parseInt(args[1]); new EchoClient(host, port).start(); }
private void start() throws InterruptedException { EventLoopGroup group = new NioEventLoopGroup(); try{ Bootstrap b = new Bootstrap(); b.group(group) .channel(NioSocketChannel.class) .remoteAddress(new InetSocketAddress(host,port)) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast( new EchoClientHandler() ); } }); ChannelFuture f = b.connect().sync(); f.channel().closeFuture().sync(); }finally { group.shutdownGracefully(); } } }
|
2.3 启动客户端与服务器
Server端接收到:

Client端接收到:
