Netty

前言:
    根据尚硅谷视频以及教学ppt整理
    初版 待整理

线程模型基本介绍

现存线程模型:传统I/O服务模型和Reactor模式

传统阻塞I/O服务模型

特点:

采用阻塞IO模式获取输入的数据

每个连接都需要独立的线程完成数据的输入,业务处理,数据返回

缺陷

当并发数很大,就会创建大量的线程,占用很大系统资源

若线程暂时没有数据可读,该线程会阻塞在Hander对象的read操作,导致线程资源浪费

image-20221031152502979

BIO

Blocking I/O同步阻塞I/O模式,数据的读取都阻塞在一个线程内等待完成,很明显,无法应对高并发

NIO

Non-blocking I/O 同步非阻塞型的I/O模型,提供了Channel,Seletor,Buffer等。支持面向缓冲的,基于通道的I/O操作方法,对于高负载,高并发的的应用,也有很好的支持

AIO

Asynchronous I/O 就是NIO 2,是一种异步非阻塞的IO模型,异步IO是基于时间和回调机制实现的。也就是应用操作之后会直接返回,不会阻塞在那里,当后台处理完成后,操作系统会通知相应的线程进行后续操作。目前AIO的应用还不是特别的广泛。

BIO,NIO,AIO的区别

当客户端发起请求

  • BIO:阻塞等待直到处理完成
  • NIO:通过选择器监听多个通道,非阻塞,处理完成之后就返回
  • AIO:通过回调机制实现,应用操作后直接返回,不会阻塞在那里

Netty简介

Netty是什么

  • Netty是一个基于NIO的client-server框架,使用它可以快速简单的开发网络应用程序
  • 极大简化了TCP 和 UDP的套接字编程,并且在性能和安全性仿麦呢更优
  • 支持多种协议比如SMTP,HTTP等

Netty版本说明

Netty 5出现重大bug 已经被官网废弃,目前推荐使用Netty 4.x的稳定版本

在课程中,使用的是Netty4.1.x版本

Netty的优点

  • 统一的API,支持多种传输类型,阻塞的,非阻塞的
  • 自带编解码器解决TCP粘包/拆包问题
  • 自带各种协议栈
  • 安全性可靠

Netty的核心组件

Bytebuf 字节容器

网络通信最终都是通过字节流进行传输的。ByteBuf就是Netty提供的一个字节容器,内部是一个字节数组,相比Java NIO提供的ByteBuf更加简单方便

Bootstrap和ServerBootstrap 启动引导类

Bootstrap的使用方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
EventLoopGroup group = new NioEventLoopGGroup();
try{
//创建客户端启动引导/辅助类:Bootstrap
Bootstrap b = new Bootstrap();
//指定线程模型
b.group(group).
.....
//尝试建立连接
ChannelFuture f = b.connect(host,port).sync();
f.channel().closeFuture().sync();
} finally{
//关闭线程组资源
group.shutdownGracefully();
}

ServerBootstrap的使用方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// 1.bossGroup用于接收连接,workerGroup 用于具体的额管理
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup(1);
try{
//2.创建服务端启动引导类/辅助类 :ServerBootstrap
ServerBootstrap b = new ServerBootstrap();
//3.给引导类配置两大线程组,确定了线程模型
b.group(bossGroup,workerGroup).
.....
// 6. 绑定端口
ChannelFuture f = b.bind(port).sync();
//等待连接关闭
f.channel().closeFuture().sync();
}finally{
//7.关闭线程组资源
bossGroup.shutdownGracefully();
wokerGroup.shutdownGracefully();
}

Bootstrap 是客户端的启动引导类,通常使用connect()方法连接到远程的主机和端口,作为一个Netty TCP协议通信中的客户端,Bootstrap也可以通过bind()方法绑定本地的一个端口,作为UDP协议通信的一个端口

ServerBootstrap 很明显是服务端的启动引导类,通常使用bind()方法绑定本地的端口,然后等待客户端的连接

Bootstrap只需要配置一个线程组 EventLoopGroup,而ServerBootstrap需要配置两个线程组,一个用于接收连接,一个用于具体的IO处理

Channel 网络操作抽象类

Netty通过Channel进行IO操作。一旦客户端成功连接服务端,就会新建一个Channel和这个客户端进行绑定

1
2
3
4
5
6
7
8
9
10
11
public Channel doConnect(InetSocketAddresss inetSocketAddress){
CompletableFuture<Channel> completableFuture = new CompletableFuture<>();
bootstrap.connect(inetSocketAddress).addListener((channelFutureListener) future ->{
if(future,isSuccess()){
completableFuture.complete(future.channel());
}else{
throw new IllegalStateException();
}
});
return completableFuture.get();
}

常用的有两个实现类

NioServerSocketChannel (服务端)

NioSocketChannel (客户端)

EventLoop 事件循环

EventLoop介绍

EventLoop定义了Netty的核心抽象,用于处理来了连接的生命周期中所发生的事件,通俗的讲就是它的主要作用就是负责监听网络事件处理器进行相关 I/O操作(读写)的处理

EventLoop和channel的关系

channel为Netty网络操作抽象类,Event Loop负责处理注册到其上的Channel的I/O操作,两者配合进行I/O操作

EventLoopGroup和EventLoop的关系

EventLoopGroup包含多个EventLoop管理着所有的EventLoop的生命周期

EventLoop处理的I/O事件都将在它专有的Thread上被处理,即ThreadEventLoop属于1:1的关系,从而保证线程安全

image-20221203162809780

ChannelHander (消息处理器) 和 ChannelPipeline (ChannelHander 对象链表)

ChannelHandler是消息的具体处理器,主要负责处理客户端/服务端接收和发送的数据。

当channel被创建的时候,会自动地分配到它专属的ChannelPipelineChannelPipelineChannelHander的链,一个pipeline上可以有多个ChannelHandler

image-20221203164046760

ChannelFuture 操作执行结果

Netty中所有的I/O操作都是异步的,我们不能立刻得到操作是否执行成功,不过可以通过ChannelFuture接口的addListener方法注册一个ChannelFutureListener,当操作执行成功或者失败时,监听就会自动触发返回结果

1
2
3
4
5
6
7
ChannelFuture f = b.connect(host,port).addListener(future -> {
if(future.isSuccess()){
System.out.println("连接成功!");
}else{
System.err.println("连接失败!");
}
}).sync();

还可以通过ChannelFuturechannel()方法获取连接相关联的Channel

1
Channel channel = f.channel();

还可以通过ChannelFuture接口的sync()方法让异步的操作变成同步的

1
ChannelFuture f = b.bind(port).sync();

NioEventLoopGroup默认的构造函数实际会起的线程数为CPU核心数*2

Reactor 线程模型

Reactor是一种经典的线程模型,它基于事件驱动,特别适合处理海量的I/O事件

别名

  • 反应器模式
  • 分发者模式(Dispatcher)
  • 通知者模式(notifier)

image-20221031153540687

Reactor 模式 通过一个或多个输入同时传递给服务处理器的模式

服务器端程序处理传入的多个请求,并将他们同步分派到相应的处理线程

Reator模式使用IO复用监听事件,收到事件后,分发给某个线程

原先有多个Hander阻塞,现在只用一个ServiceHandler连接

Reactor模式分类

单线程Reactor

image-20221101195028780

单线程Reactor的优点是对系统资源的消耗特别小,但是,没办法支撑大量请求的应用场景并且处理请求的时间可能非常慢,所以一般实际项目中不会使用单线程Reactor

多线程Reactor

image-20221101193557851

一个线程负责接收请求,一组NIO请求处理IO操作

大部分场景下多线程Reactor模型没有啥问题,但是在一些并发数连接比较多(百万级别)的情况下,一个线程负责接收客户端就存在性能问题了

主从Reactor多线程

image-20221101193327320

一组NIO线程负责接收请求,一组NIO线程处理IO操作

更通俗的理解

单Reactor单线程 前台接待员和服务员是同一个人,全程为顾客服务

单Reactor多线程 一个前台接待员,多个服务员,接待员只负责接待

主从Reactor多线程,多个前台接待员,多个服务生

Netty线程模型

大部分的网络框架都是基于Reactor模式设计开发的,在Netty主要靠NioEventLoopGroup线程池来实现具体的线程模型。

实现服务端的时候,一般会初始化两个线程组

  • bossGroup : 接收连接
  • workerGroup: 负责具体的处理,交由对应的Hander处理

单线程模型

一个线程执行处理所有的accept,read,decode,process,encode,send事件,无法应对性能要求比较高的场景

如:

1
2
3
4
5
6
//1.eventGroup既用于处理客户端连接,又负责具体的处理
EventLoopGroup eventGroup = new NioEventLoopGroup(1);
//2.创建服务端启动引导/辅助类:ServerBootstrap
ServerBootstrap b = new ServerBootstrap();
b.group(eventGroup,eventGroup)
//....

多线程模型

一个Acceptor线程只负责监听客户端的连接,一个NIO线程池负责具体的处理accept,read,decode,process,encode,send事件,但是和上面说的一样,可以处理连接量不太大的情况,但是连接量巨大的时候(百万级别)就可能出现性能瓶颈

can can 代码:

1
2
3
4
5
6
7
8
9
10
// 1.bossGroup 用于接收连接,workerGroup用于具体的处理
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workGroup = new NioEventLoopGroup();
try{
//2.创建服务端启动引导/辅助类:ServerBootstrap
ServerBootstrap b = new ServerBootstrap();
//3.给引导类配置两大线程组,确定线程模型
b.group(bossGroup,workerGroup)
//.....
}

主从多线程模型

从一个主线程NIO线程池中选择一个线程作为Acceptor线程,绑定监听端口,接受客户端连接的连接,其他线程负责后续的接入认证等工作。连接完成后,SubNIO线程池负责具体处理I/O读写

1
2
3
4
5
6
7
8
9
10
//1.bossGroup 用于接收连接,workerGroup用于具体的处理
EventLoopGroup bossGroup = new NioEventLoopGroup();//这里没有1
EventLoopGroup workGroup = new NioEventLoopGroup();
try{
//2.创建服务端启动引导/辅助类:ServerBootstrap
ServerBootstrap b = new ServerBootstrap();
//3.给引导类配置两大线程组,确定线程模型
b.group(bossGroup,workerGroup)
//.....
}

Netty服务端和客户端的启动过程

服务端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// 1.bossGroup 用于接收连接,workerGroup用于具体的处理
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workGroup = new NioEventLoopGroup();
try{
//2.创建服务端启动引导/辅助类:ServerBootstrap
ServerBootstrap b = new ServerBootstrap();
//3.给引导类配置两大线程组,确定线程模型
b.group(bossGroup,workerGroup)
// 4.指定IO模型
.channel(NioSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
/**
* 自定义序列化编解码器
*/
ChannelPipeline p = ch.pipeline();
//5.可以自定义客户端消息的业务处理逻辑
p.addLast(new HelloServerHandler());
}
});
// 6.绑定端口,调用sync方法阻塞直到绑定完成
ChannelFuture f = b.bind(port).sync();
// 7.阻塞等待直到服务器Channel关闭
f.channel().closeFuture().sync();
}finally{
//8.关闭线程组资源
bossGroup.shutdownGracefully();
wokerGroup.shutdownGracefully();
}

客户端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// 1.创建一个NioEventLoopGroup对象实例
EventLoopGroup workGroup = new NioEventLoopGroup();
try{
//2.创建服务端启动引导/辅助类:Bootstrap
Bootstrap b = new Bootstrap();
//3.指定线程组
b.group(bossGroup,workerGroup)
// 4.指定IO模型
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {

ChannelPipeline p = ch.pipeline();
//5.可以自定义客户端消息的业务处理逻辑
p.addLast(new HelloServerHandler(message));
}
});
// 6.尝试建立连接
ChannelFuture f = b.connect(host,port).sync();
// 7.等待连接关闭
f.channel().closeFuture().sync();
}finally{
//8.关闭线程组资源
group.shutdownGracefully();
}

这里的connect方法返回的是一个ChannelFuture对象

1
2
3
4
5
6
7
8
9
10
11
public ChannelFuture connect(SocketAddress remoteAddress) {
ObjectUtil.checkNotNull(remoteAddress, "remoteAddress");
this.validate();
return this.doResolveAndConnect(remoteAddress, this.config.localAddress());
}

public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress) {
ObjectUtil.checkNotNull(remoteAddress, "remoteAddress");
this.validate();
return this.doResolveAndConnect(remoteAddress, localAddress);
}

因为他是异步的,也可以像上面ChannelFuture那儿说的,通过addListener方法增加一个监听器打印消息,看是否连接成功

TCP 粘包/拆包

TCP粘包/拆包就是基于TCP发送数据的时候,出现了多个字符串“粘”在一起或者一个字符串被“拆”开的情况

解决方案

  1. 使用Netty自带的解码器
  • lineBasedFrameDecoder:发送数据包的时候,每个数据包之间以换行符作为分隔,lineBaseFrameDecoder就依次遍历ByteBuf中的可读字节,判断是否有换行符,然后截取

  • DelimiterBasedFrameDecoder :可以自定义分隔符解码器,lineBasedFrameDecoder实际上是一种特殊的DelimiterBasedFrameDecoder解码器

  • FixedLengthFrameDecoder:固定长度解码器

  • LengthFieldBaseFrameDecoder:基于长度字段的解码器

  1. 自定义序列化编解码器

在Java中自带有Serializable接口来实现序列化,但是性能,安全等并不理想。

所以一般使用Protostuff,Hessian2,json,Kryo等等

Netty长连接,心跳机制

长连接和短连接

短连接就是server端与client端连接之后,读写完成后就关闭掉,如果下一次再要互相发送消息,就要重新连接,优点就是管理和实现都比较简单,缺点即每次读写都建立连接会带来大量的消耗

长连接就是client和server双方建立连接之后,即使client和server完成一次读写,他们之间的连接不会主动关闭,后续读写继续使用这个连接。长连接可以省去较多的TCP建立和关闭的操作,降低对网络的依赖,节约时间。

心跳机制

在保持长连接的过程中,可能会出现网络异常出现,那么怎么发现对方已经掉线呢,答案就是心跳机制

心跳机制就是client与server之间没有数据交互的时候,客户端或服务器就会发送一个特殊的数据包给对方,接收方收到数据后,也会发送一个特殊的数据报文回应对方,这样,就知道了对象仍然在线,确保TCP连接的有效性

Netty层面实现的话,核心类是IdleStateHander

Netty的零拷贝

Zero-copy 即计算机执行操作时,CPU不需要先将数据从某处内存复制到另一个特定区域。这种技术通常用于网络传输文件时节省CPU周期和内存带宽

在OS层面上的零拷贝通常用于避免在用户态与内存态之间来回拷贝数据,在Netty层面,主要体现咋子对于数据操作的优化

Netty中

  • 使用Netty提供的CompositeByteBuf类,可以合并多个ByteBuf为一个逻辑上的ByteBuf,避免多个ByteBuf之间的拷贝
  • ByteBuf支持slice操作,因此可以将ByteBuf分解为多个共享同一个存储区域的ByteBuf避免内存拷贝
  • 通过FileRegion包装的FileChannel.tranferTo实现问文件传输,可以直接将文件缓冲区的数据传输到目标Channel,避免了传统的通过循环write方式导致的内存拷贝问题

Netty
http://example.com/2022/10/19/Netty/
作者
Mercury
发布于
2022年10月19日
许可协议