netty相关实践

1.Netty的核心组件

  • Channel;
  • 回调;
  • Future;
  • 事件和ChannelHandler;

1.1 Channel

Channel是Java NIO的一个基本构造。

它代表一个到实体(如一个硬件设备、一个文件、一个网络套接字或者一个能够执行一个或者多个不同的I/O操作的程序组件)的开放连接,如读操作和写操作。

目前,可以把 Channel看作是传入(入站)或者传出(出站)数据的载体。因此,它可以被打开或者被关闭,连接或者断开连接。

1.2 回调

Netty 在内部使用了回调来处理事件;当一个回调被触发时,相关的事件可以被一个 interfaceChannelHandler 的实现处理。

当一个新的连接已经被建立时,
ChannelHandler 的 channelActive()回调方法将会被调用,并将打印出一条信息。

1
2
3
4
5
6
7
public class EchoServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
//当被通知Channel是活跃的时候,发送一条消息
ctx.writeAndFlush(Unpooled.copiedBuffer("Netty server rock!", CharsetUtil.UTF_8));
}
}

1.3 Future

Future 提供了另一种在操作完成时通知应用程序的方式。这个对象可以看作是一个异步操作的结果的占位符;它将在未来的某个时刻完成,并提供对其结果的访问。

例如: 创建一个简单的channel载体 并且连接到远程节点,并且注册一个新的ChannelFutureListener。当监听器被通知已经建立连接的时候,会检查对应的状态,如果是操作成功的,那么可做对应处理。否则抛出异常

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
Channel channel = ...;
// Does not block
ChannelFuture future = channel.connect(new InetSocketAddress("192.168.0.1", 25));
future.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) {
if (future.isSuccess()){
ByteBuf buffer = Unpooled.copiedBuffer("Hello",Charset.defaultCharset());
ChannelFuture wf = future.channel()
.writeAndFlush(buffer);
....
} else {
//错误连接可进行自定义
Throwable cause = future.cause();
cause.printStackTrace();
}
}
});

2.Netty简单示例(包含服务端与连接端)

项目结构:
TSNT3R.png

2.1 编写Echo服务器

2.1.1 EchoServerHandler类

编写服务器处理类 我们只需要接收传入人的消息,并且做处理,此处原封返回传入端发送的信息。

  • channelRead()—对于每个传入的消息都要调用;
  • channelReadComplete()—通知ChannelInboundHandler最后一次对channelRead()的调用是当前批量读取中的最后一条消息;
  • exceptionCaught()—在读取操作期间,有异常抛出时会调用。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
package pers.brew.test.netty;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;

/**
* @author :zhuanglei
* @description:echo服务器
* @date :2021/12/15 14:40
*/
//@Sharable 标示一个ChannelHandler 可以被多个 Channel 安全地共享
// extends 用来定义响应入站事件的方法 继承ChannelInboundHandlerAdapter
@ChannelHandler.Sharable
public class EchoServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
//当被通知Channel是活跃的时候,发送一条消息
ctx.writeAndFlush(Unpooled.copiedBuffer("Netty server rock!", CharsetUtil.UTF_8));
}

// channelRead()—对于每个传入的消息都要调用;
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg){
ByteBuf in = (ByteBuf) msg;
System.out.println("Server received:"+in.toString(CharsetUtil.UTF_8));
//将接收到的消息写给发送者,而不冲刷出站消息
ctx.write(in);
}
// channelReadComplete()—通知ChannelInboundHandler最后一次对channelRead()的调用是当前批量读取中的最后一条消息;
@Override
public void channelReadComplete(ChannelHandlerContext ctx){
//将未决消息冲刷到远程节点,并且关闭该 Channel
ctx.writeAndFlush(Unpooled.EMPTY_BUFFER)
.addListener(ChannelFutureListener.CLOSE);
}
// exceptionCaught()—在读取操作期间,有异常抛出时会调用。
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause){
//打印异常
cause.printStackTrace();
//关闭该channel
ctx.close();
}
}

2.1.2 EchoServer引导服务器

在主要服务端启动口,我们需要指定开启的端口号,并且开启netty功能

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
package pers.brew.test.netty;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;

import java.net.InetSocketAddress;

/**
* @author :zhuanglei
* @description:echo引导服务类
* @date :2021/12/15 14:56
*/
public class EchoServer {

//端口号
private final int port;

public EchoServer(int port) {
this.port = port;
}

public static void main(String[] args) throws InterruptedException {
if (args.length != 1) {
System.err.println(
"Usage: " + EchoServer.class.getSimpleName() +
" <port>");
}
//设置端口值(如果端口参数的格式不正确,则抛出一个NumberFormatException)
int port = Integer.parseInt(args[0]);
new EchoServer(port).start();
}

private void start() throws InterruptedException {
final EchoServerHandler serverHandler = new EchoServerHandler();
//创建EventLoopGroup
EventLoopGroup group = new NioEventLoopGroup();
try {
//创建ServerBootStrap
ServerBootstrap b = new ServerBootstrap();
b.group(group)
.channel(NioServerSocketChannel.class) //指定所试用的NIO传输Channel
.localAddress(new InetSocketAddress(port)) //使用指定的端口设置套接字地址
//添加一个EchoServerHandler(自定)到子Channel的ChannelPipeline
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
//EchoServerHandler 被标注为@Shareable 所以我们可以总是使用同样的实例
ch.pipeline().addLast(serverHandler);
}
});
//异步地绑定服务器;调用sync()方法阻塞等待直到绑定完成
ChannelFuture f = b.bind().sync();
//获取Channel的CloseFuture,并且阻塞当前线程指到它完成
f.channel().closeFuture().sync();
}finally {
//关闭EventLoopGroup释放所有资源
group.shutdownGracefully().sync();
}
}
}
  • EchoServerHandler 实现了业务逻辑;
  • main()方法引导了服务器;引导过程中所需要的步骤如下:
  1. 创建一个 ServerBootstrap 的实例以引导和绑定服务器;
  2. 创建并分配一个 NioEventLoopGroup实例以进行事件的处理,如接受新连接以及读/写数据;
  3. 指定服务器绑定的本地的 InetSocketAddress;
  4. 使用一个 EchoServerHandler 的实例初始化每一个新的 Channel;
  5. 调用 ServerBootstrap.bind()方法以绑定服务器。

2.2 编写Echo客户端

Echo 客户端将会:

  1. 连接到服务器;
  2. 发送一个或者多个消息;
  3. 对于每个消息,等待并接收从服务器发回的相同的消息;
  4. 关闭连接。
2.2.1 EchoClientHandler类
  • channelActive()——在到服务器的连接已经建立之后将被调用;
  • channelRead0()——当从服务器接收到一条消息时被调用;
  • exceptionCaught()——在处理过程中引发异常时被调用。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
package pers.brew.test.client;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.CharsetUtil;

/**
* @author :zhuanglei
* @description:echo客户端处理
* @date :2021/12/15 15:54
*/
//标记该类的实例可以被多个 Channel 共享
@ChannelHandler.Sharable
public class EchoClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
//当被通知Channel是活跃的时候,发送一条消息
ctx.writeAndFlush(Unpooled.copiedBuffer("Netty rock!", CharsetUtil.UTF_8));
}

@Override
public void channelRead0(ChannelHandlerContext channelHandlerContext, ByteBuf in) throws Exception {
//记录已接收消息的转储
System.out.println(
"Client received: " + in.toString(CharsetUtil.UTF_8));
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}

2.2.2 EchoClient引导客户端

此处需要指定所连接的host 与端口port

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
package pers.brew.test.client;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;

import java.net.InetSocketAddress;

/**
* @author :zhuanglei
* @description:echo客户端引导
* @date :2021/12/15 16:00
*/
public class EchoClient {
private final String host;
private final int port;

public EchoClient(String host, int port) {
this.host = host;
this.port = port;
}

public static void main(String[] args) throws InterruptedException {
if(args.length != 2){
System.err.println(
"Usage: " + EchoClient.class.getSimpleName() +
" <host> <port>");
return;
}
String host = args[0];
int port = Integer.parseInt(args[1]);
new EchoClient(host, port).start();
}

private void start() throws InterruptedException {
EventLoopGroup group = new NioEventLoopGroup();
try{
//创建Bootstrap
Bootstrap b = new Bootstrap();
//指定EventLoopGroup 以处理客户端事件;需要适用于NIO的实现
b.group(group)
//适用NIO传输的Channel类型
.channel(NioSocketChannel.class)
//设置服务器的InetSocketAddress
.remoteAddress(new InetSocketAddress(host,port))
//在创建Channel时,向 ChannelPipeline中添加一个 EchoClientHandler 实例
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(
new EchoClientHandler()
);
}
});
//连接到远程节点,阻塞等待直到连接完成
ChannelFuture f = b.connect().sync();
//阻塞,直到Channel关闭
f.channel().closeFuture().sync();
}finally {
//关闭线程池并且释放所有的资源
group.shutdownGracefully();
}
}
}

2.3 启动客户端与服务器

Server端接收到:
TSdjET.png

Client端接收到:
TSwSC4.png