外观
README
约 2607 字大约 9 分钟
2025-08-23
一、Netty 简介
1.1 什么是 Netty?
Netty 是一个异步事件驱动的网络应用框架,用于快速开发高性能、高可靠性的网络服务器和客户端。它极大地简化了网络编程,特别是 TCP 和 UDP 套接字服务器的开发,使开发者能够专注于业务逻辑而非底层网络细节。
1.2 Netty 核心特点
- 高性能:基于 NIO 的非阻塞 I/O 模型,支持高并发
- 设计优雅:责任链模式实现的 ChannelPipeline,易于扩展
- 功能强大:提供多种编解码器、SSL/TLS 支持、HTTP/2 支持等
- 可定制性:高度可定制的线程模型和内存管理
- 健壮性:经过大规模生产环境验证
- 社区活跃:拥有活跃的社区和丰富的文档
1.3 Netty 与传统 IO 对比
| 特性 | 传统 IO (BIO) | Netty (NIO) |
|---|---|---|
| 模型 | 阻塞式 I/O | 非阻塞式 I/O |
| 线程模型 | 一个连接一个线程 | 少量线程处理大量连接 |
| 资源消耗 | 高(每个连接占用一个线程) | 低(线程复用) |
| 吞吐量 | 低 | 高 |
| 编程复杂度 | 简单 | 中等(Netty 简化了 NIO 编程) |
| 适用场景 | 连接数少、数据量小 | 高并发、高性能场景 |
二、核心概念
2.1 Channel
Channel 是 Netty 的网络操作抽象,代表一个网络连接。它类似于 Java NIO 中的 Channel,但功能更强大。
常用 Channel 实现:
NioSocketChannel:非阻塞 TCP 客户端 ChannelNioServerSocketChannel:非阻塞 TCP 服务器 ChannelNioDatagramChannel:非阻塞 UDP Channel
2.2 EventLoop 和 EventLoopGroup
EventLoop:
- 一个 EventLoop 通常绑定一个线程
- 负责处理一个或多个 Channel 的所有 I/O 事件
- 实现了 Runnable 接口,包含一个无限循环
EventLoopGroup:
- 一组 EventLoop
- 通常分为两种:
BossGroup:负责接受新连接WorkerGroup:负责处理已建立连接的 I/O 事件
// 创建 EventLoopGroup
EventLoopGroup bossGroup = new NioEventLoopGroup(1); // 通常只需1个线程
EventLoopGroup workerGroup = new NioEventLoopGroup(); // 默认CPU核心数*22.3 ChannelHandler 和 ChannelPipeline
ChannelHandler:
- 处理 I/O 事件或拦截 I/O 操作
- 分为两类:
ChannelInboundHandler:处理入站事件(如连接、读取等)ChannelOutboundHandler:处理出站事件(如写入、连接等)
ChannelPipeline:
- ChannelHandler 的链表结构
- 入站事件从头部流向尾部,出站事件从尾部流向头部
- 可以动态添加、删除 ChannelHandler
// 配置 ChannelPipeline
ChannelPipeline pipeline = channel.pipeline();
pipeline.addLast(new StringDecoder());
pipeline.addLast(new StringEncoder());
pipeline.addLast(new MyBusinessHandler());2.4 ChannelFuture
- 表示异步 I/O 操作的结果
- 非阻塞:不会阻塞当前线程
- 可以添加监听器,在操作完成时得到通知
ChannelFuture future = bootstrap.connect("localhost", 8080);
// 同步等待连接完成
future.sync();
// 添加监听器
future.addListener(future1 -> {
if (future1.isSuccess()) {
System.out.println("连接成功");
} else {
System.out.println("连接失败");
}
});三、基本使用
3.1 服务端创建
public class NettyServer {
public static void main(String[] args) throws InterruptedException {
// 1. 创建 EventLoopGroup
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
// 2. 创建 ServerBootstrap
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new StringDecoder());
pipeline.addLast(new StringEncoder());
pipeline.addLast(new ServerHandler());
}
})
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true);
// 3. 绑定端口
ChannelFuture future = bootstrap.bind(8080).sync();
// 4. 等待服务关闭
future.channel().closeFuture().sync();
} finally {
// 5. 优雅关闭
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
// 业务处理器
static class ServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
System.out.println("收到客户端消息: " + msg);
ctx.writeAndFlush("服务器响应: " + msg);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
}3.2 客户端创建
public class NettyClient {
public static void main(String[] args) throws InterruptedException {
// 1. 创建 EventLoopGroup
EventLoopGroup group = new NioEventLoopGroup();
try {
// 2. 创建 Bootstrap
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new StringDecoder());
pipeline.addLast(new StringEncoder());
pipeline.addLast(new ClientHandler());
}
});
// 3. 连接服务器
ChannelFuture future = bootstrap.connect("localhost", 8080).sync();
// 4. 发送消息
Channel channel = future.channel();
channel.writeAndFlush("Hello Netty");
// 5. 等待连接关闭
future.channel().closeFuture().sync();
} finally {
// 6. 优雅关闭
group.shutdownGracefully();
}
}
// 业务处理器
static class ClientHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
System.out.println("收到服务器响应: " + msg);
}
@Override
public void channelActive(ChannelHandlerContext ctx) {
System.out.println("与服务器建立连接");
}
@Override
public void channelInactive(ChannelHandlerContext ctx) {
System.out.println("与服务器断开连接");
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
}四、编解码器
4.1 常用编解码器
| 编解码器 | 说明 | 使用场景 |
|---|---|---|
StringDecoder / StringEncoder | 字符串编解码 | 简单的文本协议 |
LengthFieldBasedFrameDecoder | 基于长度字段的帧解码器 | 解决粘包/半包问题 |
LineBasedFrameDecoder | 基于行的帧解码器 | 行分隔符协议 |
DelimiterBasedFrameDecoder | 基于分隔符的帧解码器 | 自定义分隔符协议 |
ObjectEncoder / ObjectDecoder | 对象编解码器 | Java 对象序列化 |
ProtobufEncoder / ProtobufDecoder | Protobuf 编解码器 | 高效二进制协议 |
HttpServerCodec | HTTP 服务器编解码器 | HTTP 服务 |
HttpClientCodec | HTTP 客户端编解码器 | HTTP 客户端 |
4.2 解决粘包/半包问题
问题描述:
- TCP 是流式协议,没有消息边界
- 可能导致多个消息粘在一起(粘包)或一个消息被拆分成多个包(半包)
解决方案:
固定长度:
pipeline.addLast(new FixedLengthFrameDecoder(1024));行分隔符:
pipeline.addLast(new LineBasedFrameDecoder(1024));自定义分隔符:
ByteBuf delimiter = Unpooled.copiedBuffer("$".getBytes()); pipeline.addLast(new DelimiterBasedFrameDecoder(1024, delimiter));长度字段(最常用):
/** * lengthFieldOffset: 长度字段的偏移量,头部长度 * lengthFieldLength: 长度字段占的字节数 * lengthAdjustment: 长度调节值,在总长上添加此值 * initialBytesToStrip: 从解码帧中去除的字节数 */ pipeline.addLast(new LengthFieldBasedFrameDecoder( 1024, 0, 4, 0, 4));
五、心跳机制
5.1 心跳机制的重要性
- 检测连接是否存活
- 防止连接被中间设备(如防火墙)断开
- 及时发现并处理失效连接
5.2 实现心跳机制
// 服务端添加心跳处理器
pipeline.addLast(new IdleStateHandler(0, 0, 60)); // 60秒无读写活动
pipeline.addLast(new ChannelDuplexHandler() {
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
if (evt instanceof IdleStateEvent) {
IdleStateEvent event = (IdleStateEvent) evt;
if (event.state() == IdleState.ALL_IDLE) {
System.out.println("检测到空闲连接,关闭连接");
ctx.close();
}
}
}
});
// 客户端添加心跳处理器
pipeline.addLast(new IdleStateHandler(30, 0, 0)); // 30秒无写活动
pipeline.addLast(new ChannelDuplexHandler() {
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
if (evt instanceof IdleStateEvent) {
IdleStateEvent event = (IdleStateEvent) evt;
if (event.state() == IdleState.WRITER_IDLE) {
System.out.println("发送心跳包");
ctx.writeAndFlush(new HeartbeatRequest());
}
}
}
});六、内存管理
6.1 ByteBuf
ByteBuf 是 Netty 的核心数据结构,替代了 JDK 的 ByteBuffer。
主要特点:
- 读写指针分离(readerIndex 和 writerIndex)
- 支持堆内内存和堆外内存
- 池化内存管理,减少 GC 压力
- 自动扩容
// 创建 ByteBuf
ByteBuf buffer = Unpooled.buffer(1024);
// 写入数据
buffer.writeBytes("Hello Netty".getBytes(StandardCharsets.UTF_8));
// 读取数据
byte[] bytes = new byte[buffer.readableBytes()];
buffer.readBytes(bytes);
String message = new String(bytes, StandardCharsets.UTF_8);
// 释放资源
buffer.release();6.2 内存池配置
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);内存池类型:
PooledByteBufAllocator:池化内存分配器,性能更好UnpooledByteBufAllocator:非池化内存分配器
七、线程模型
7.1 Netty 线程模型

关键点:
- BossGroup:负责接受新连接,通常只需 1 个线程
- WorkerGroup:负责处理 I/O 事件,通常为 CPU 核心数 × 2
- 每个 Channel 绑定到一个固定的 EventLoop(线程)
- 业务处理不应阻塞 EventLoop 线程
7.2 自定义业务线程池
// 创建业务线程池
EventExecutorGroup businessGroup = new DefaultEventExecutorGroup(10);
// 在 ChannelPipeline 中使用
pipeline.addLast(businessGroup, new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
// 耗时操作在业务线程池中执行
ctx.executor().execute(() -> {
try {
// 模拟耗时操作
Thread.sleep(1000);
// 处理完成后,切回 I/O 线程发送响应
ctx.channel().eventLoop().execute(() -> {
ctx.writeAndFlush("处理完成");
});
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
});八、性能优化
8.1 连接参数优化
bootstrap.option(ChannelOption.SO_BACKLOG, 1024) // 连接队列大小
.childOption(ChannelOption.TCP_NODELAY, true) // 禁用Nagle算法
.childOption(ChannelOption.SO_KEEPALIVE, true) // 启用TCP心跳
.childOption(ChannelOption.SO_RCVBUF, 65536) // 接收缓冲区
.childOption(ChannelOption.SO_SNDBUF, 65536); // 发送缓冲区8.2 内存优化
使用内存池:
.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)合理设置缓冲区大小:
.childOption(ChannelOption.RCVBUF_ALLOCATOR, new AdaptiveRecvByteBufAllocator(64, 1024, 65536))避免内存泄漏:
- 确保所有 ByteBuf 都被正确释放
- 使用
ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.ADVANCED)检测泄漏
8.3 避免阻塞 I/O 线程
- 不要在 ChannelHandler 中执行耗时操作
- 耗时操作应提交到业务线程池
- 数据库操作、文件 I/O 等应异步执行
// 错误做法:阻塞 I/O 线程
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
// 耗时操作
Thread.sleep(1000);
ctx.writeAndFlush("response");
}
// 正确做法:使用业务线程池
private final EventExecutorGroup businessGroup = new DefaultEventExecutorGroup(10);
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
businessGroup.submit(() -> {
// 耗时操作
String result = process(msg);
ctx.channel().eventLoop().execute(() -> {
ctx.writeAndFlush(result);
});
});
}九、常见协议实现
9.1 HTTP 服务
public class HttpServer {
public static void main(String[] args) throws InterruptedException {
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new HttpServerCodec());
pipeline.addLast(new HttpObjectAggregator(65536));
pipeline.addLast(new HttpContentCompressor());
pipeline.addLast(new HttpHandler());
}
});
ChannelFuture future = bootstrap.bind(8080).sync();
future.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
static class HttpHandler extends SimpleChannelInboundHandler<FullHttpRequest> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest req) {
// 创建响应
FullHttpResponse response = new DefaultFullHttpResponse(
HttpVersion.HTTP_1_1,
HttpResponseStatus.OK,
Unpooled.wrappedBuffer("Hello Netty HTTP".getBytes()));
// 设置响应头
response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain");
response.headers().set(HttpHeaderNames.CONTENT_LENGTH, response.content().readableBytes());
// 发送响应
ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
}
}
}9.2 WebSocket 服务
public class WebSocketServer {
public static void main(String[] args) throws InterruptedException {
// ... 与HTTP服务相同
bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new HttpServerCodec());
pipeline.addLast(new HttpObjectAggregator(65536));
pipeline.addLast(new ChunkedWriteHandler());
pipeline.addLast(new WebSocketServerProtocolHandler("/ws"));
pipeline.addLast(new WebSocketHandler());
}
});
}
static class WebSocketHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) {
System.out.println("收到WebSocket消息: " + msg.text());
ctx.writeAndFlush(new TextWebSocketFrame("服务器: " + msg.text()));
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) {
System.out.println("客户端连接: " + ctx.channel().id().asLongText());
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) {
System.out.println("客户端断开: " + ctx.channel().id().asLongText());
}
}
}十、最佳实践
10.1 异常处理
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
// 记录异常
logger.error("处理消息时发生异常", cause);
// 判断是否是 I/O 异常
if (cause instanceof IOException) {
// 通常是连接断开,直接关闭
ctx.close();
} else {
// 其他异常,可能需要特殊处理
ctx.writeAndFlush(new ErrorFrame(cause.getMessage()))
.addListener(ChannelFutureListener.CLOSE);
}
}10.2 资源管理
正确关闭资源:
@Override public void channelInactive(ChannelHandlerContext ctx) { // 清理资源 cleanupResources(ctx.channel()); ctx.fireChannelInactive(); }防止内存泄漏:
- 确保所有 ByteBuf 都被正确释放
- 使用 try-finally 确保资源释放
ByteBuf buffer = null; try { buffer = allocator.buffer(); // 使用 buffer } finally { if (buffer != null) { buffer.release(); } }
10.3 常见问题解决
问题1:连接过多导致端口耗尽
解决方案:
- 调整系统参数:
# Linux sysctl -w net.ipv4.tcp_tw_reuse=1 sysctl -w net.ipv4.tcp_fin_timeout=30 - 客户端使用连接池,避免频繁创建连接
问题2:内存泄漏
解决方案:
- 开启泄漏检测:
System.setProperty("io.netty.leakDetection.level", "advanced"); - 确保所有 ByteBuf 都被正确释放
- 使用
ResourceLeakDetector.setLevel()设置检测级别
问题3:CPU 占用过高
解决方案:
- 检查是否有死循环或频繁的 GC
- 避免在 I/O 线程执行耗时操作
- 调整 EventLoopGroup 线程数(通常为 CPU 核心数 × 2)
