目录· Linux网络IO模型 · 文件描述符 · 阻塞IO模型 · 非阻塞IO模型 · IO复用模型 ...
目录
· Linux网络IO模型
· 文件描述符
· 阻塞IO模型
· 非阻塞IO模型
· IO复用模型
· 信号驱动IO模型
· 异步IO模型
· BIO编程
· 伪异步IO编程
· NIO编程
· Buffer和Channel
· 深入Buffer
· Selector
· AIO编程
· 四种IO编程对比及选择Netty的原因
· Netty入门
· 开发与部署
· Hello World
· 粘包/拆包问题
· 问题及其解决
· LineBasedFrameDecoder
· DelimiterBasedFrameDecoder
· FixedLengthFrameDecoder
· Java序列化问题
· 问题描述及其解决
· HTTP协议开发
· Netty HTTP
· 文件服务器
· WebSocket协议开发
· 问题及其解决
· 原理(过程)
· 开发
· Netty架构
· 逻辑架构
· 高性能
· 可靠性
· 可定制性
· 可扩展性
· 私有协议栈开发
Linux网络IO模型
文件描述符
1. Linux内核将所有外部设备视为文件来操作。
2. 对一个文件的读写操作会调用内核提供的系统命令,返回一个file descripter(fd,文件描述符)。
3. 对一个socket的读写也会有相应的描述符,称为socketfd(socket描述符)。
阻塞IO模型
1. 最常用的IO模型。
2. 默认的IO模型。
3. 以socket接口为例说明阻塞IO模型。
非阻塞IO模型
1. 一般轮训检查内核数据是否就绪。
2. 如果内核数据未就绪,则直接返回一个EWOULDBLOCK错误。
IO复用模型
1. Linux提供select/poll,进程传递一个或多个fd给select或poll系统调用,阻塞在select操作上,这样select/poll可以帮助进程同时检测多个fd是否就绪。
2. select/poll存在支持fd数量有限、线性轮训等问题,应采用基于事件驱动方式的epoll代替(当有fd就绪时,立即回调函数)。
信号驱动IO模型
进程先系统调用sigaction执行一个非阻塞的信号处理函数,进程继续运行。当数据就绪时,为该进程生成一个SIGIO信号,通知进程调用recvfrom读取数据。
异步IO模型
1. 进程告知内核启动某个操作,并在内核完成整个操作后再通知进程。
2. 与信号驱动IO模型区别:信号驱动IO模型只通知数据就绪;异步IO模型通知操作已完成。
BIO编程
1. 有一个独立的Acceptor线程负责监听客户端连接,接收到连接后为每个客户端创建一个新的线程进行链路处理,处理完之后,通过输出流返回给客户端,线程销毁。
2. 问题:服务端线程个数与客户端并发访问数1:1关系。当客户端并发访问量越来越大时,系统会发生线程堆栈溢出、创建新线程失败等问题,最终导致进程宕机或僵死。
伪异步IO编程
1. 当新客户端接入时,将客户端Socket封装成一个Task(实现Runnable接口)投递到线程池中进行处理。
2. 好处:由于可以设置线程池队列的大小和最大线程数,所以资源占用是可控的,客户端并发数量增加不会导致资源耗尽、宕机。
3. 问题:底层通信依然采用同步阻塞模型,无法从根本上解决应答消息缓慢或网络传输较慢时,长时间阻塞线程的问题。
NIO编程
Buffer和Channel
1. BIO是面向流的,一次处理一个字节;NIO是面向块的,以块的形式处理数据。
2. BIO的java.io.*已经使用NIO重新实现过。
3. Buffer缓冲区存放着准备要写入或读出的数据。通常是一个字节数组,但也可以是其他类型的数组或不是数组。
4. Buffer类型:
a) ByteBuffer(常用)
b) CharBuffer
c) ShortBuffer
d) IntBuffer
e) LongBuffer
f) FloatBuffer
g) DoubleBuffer
5. Channel通道是双向的,可通过它读取或写入数据。所有的数据都要通过Buffer来处理,永远不会将数据直接写入Channel。
6. 写文件示例。
1 import java.io.FileOutputStream; 2 import java.io.IOException; 3 import java.io.UnsupportedEncodingException; 4 import java.nio.ByteBuffer; 5 import java.nio.channels.FileChannel; 6 import java.util.Random; 7 import java.util.UUID; 8 9 public class Test {10 11 private static byte[] getRandomData() {12 int randomLength = new Random().nextInt(100);13 StringBuilder data = new StringBuilder();14 for (int index = 0; index < randomLength; index++) {15 data.append(UUID.randomUUID().toString());16 }17 return data.toString().getBytes();18 }19 20 public static void main(String[] args) {21 FileOutputStream fileOutputStream = null;22 try {23 fileOutputStream = new FileOutputStream("D:/test.txt");24 FileChannel fileChannel = fileOutputStream.getChannel();25 ByteBuffer byteBuffer = null;26 for (int index = 0; index < 1000; index++) {27 byte[] data = getRandomData();28 if (byteBuffer == null) {29 byteBuffer = ByteBuffer.wrap(data);30 } else if (data.length > byteBuffer.capacity()) {31 if (byteBuffer.position() > 0) {32 byteBuffer.flip();33 fileChannel.write(byteBuffer);34 byteBuffer.clear();35 }36 byteBuffer = ByteBuffer.wrap(data);37 } else if (data.length > byteBuffer.remaining()) {38 byteBuffer.flip();39 fileChannel.write(byteBuffer);40 byteBuffer.clear();41 }42 43 byteBuffer.put(data);44 }45 byteBuffer.flip();46 fileChannel.write(byteBuffer);47 byteBuffer.clear();48 49 } catch (IOException e) {50 e.printStackTrace();51 } finally {52 if (fileOutputStream != null) {53 try {54 fileOutputStream.close();55 } catch (IOException e) {56 e.printStackTrace();57 }58 }59 }60 }61 62 }
View Code
7. 读文件示例。
1 import java.io.FileInputStream; 2 import java.io.IOException; 3 import java.nio.ByteBuffer; 4 import java.nio.channels.FileChannel; 5 6 public class Test { 7 8 public static void main(String[] args) { 9 FileInputStream fileInputStream = null;10 try {11 fileInputStream = new FileInputStream("D:/test.txt");12 FileChannel fileChannel = fileInputStream.getChannel();13 ByteBuffer byteBuffer = ByteBuffer.allocate(64);14 while (fileChannel.read(byteBuffer) > 0) {15 byteBuffer.flip();16 while (byteBuffer.hasRemaining()) {17 System.out.print((char) byteBuffer.get());18 }19 byteBuffer.clear();20 }21 22 } catch (IOException e) {23 e.printStackTrace();24 } finally {25 if (fileInputStream != null) {26 try {27 fileInputStream.close();28 } catch (IOException e) {29 e.printStackTrace();30 }31 }32 }33 }34 35 }
View Code
8. 复制文件示例。
1 import java.io.IOException; 2 import java.io.RandomAccessFile; 3 import java.nio.ByteBuffer; 4 import java.nio.channels.FileChannel; 5 6 public class Test { 7 8 public static void main(String[] args) { 9 RandomAccessFile sourceFile = null;10 RandomAccessFile targetFile = null;11 try {12 sourceFile = new RandomAccessFile("D:/test.txt", "r");13 targetFile = new RandomAccessFile("D:/test.txt.bak", "rw");14 FileChannel sourceFileChannel = sourceFile.getChannel();15 FileChannel targetFileChannel = targetFile.getChannel();16 ByteBuffer byteBuffer = ByteBuffer.allocate(64);17 while (sourceFileChannel.read(byteBuffer) > 0) {18 byteBuffer.flip();19 targetFileChannel.write(byteBuffer);20 byteBuffer.clear();21 }22 23 } catch (IOException e) {24 e.printStackTrace();25 }26 }27 28 }
View Code
深入Buffer
1. Buffer可以理解成数组,它通过以下3个值描述状态:
a) position:下一个元素的位置;
b) limit:可读取或写入的元素总数,position总是小于或者等于limit;
c) capacity:Buffer最大容量,limit总是小于或者等于capacity。
2. 以读、写举例说明Buffer。
a) 创建一个8字节的ByteBuffer。position=0,limit=8,capacity=8。
b) 读取3个字节。position=3,limit=8,capacity=8。
c) 读取2个字节。position=5,limit=8,capacity=8。
d) 执行flip()。position=0,limit=5,capacity=8。
e) 写入4个字节。position=4,limit=5,capacity=8。
f) 写入1个字节。position=5,limit=5,capacity=8。
g) 执行clear()。position=0,limit=8,capacity=8。
3. 创建ByteBuffer的两种方法:
a) 创建固定大小的Buffer。
ByteBuffer.allocate(capacity)
b) 将数组及其内容包装成Buffer。
byte array[] = new byte[1024];ByteBuffer buffer = ByteBuffer.wrap(array);
Selector
1. Selector即IO复用模型中的多路复用器。
2. JDK使用了epoll。
AIO编程
1. AIO也称NIO2.0,是异步IO模型。
2. JDK 7时在java.nio.channels包下新增了4个异步Channel。
a) AsynchronousSocketChannel
b) AsynchronousServerSocketChannel
c) AsynchronousFileChannel
d) AsynchronousDatagramChannel
3. 使用Future写文件:异步执行,阻塞Future.get(),直到取得结果。
1 import java.io.IOException; 2 import java.nio.ByteBuffer; 3 import java.nio.channels.AsynchronousFileChannel; 4 import java.nio.file.Path; 5 import java.nio.file.Paths; 6 import java.nio.file.StandardOpenOption; 7 import java.util.ArrayList; 8 import java.util.List; 9 import java.util.Random;10 import java.util.UUID;11 import java.util.concurrent.ExecutionException;12 import java.util.concurrent.Future;13 14 public class Test {15 16 private static byte[] getRandomData() {17 int randomLength = new Random().nextInt(100);18 StringBuilder data = new StringBuilder();19 for (int index = 0; index < randomLength; index++) {20 data.append(UUID.randomUUID().toString());21 }22 return data.append('\n').toString().getBytes();23 }24 25 public static void main (String [] args) {26 Path file = Paths.get("D:/test.txt");27 AsynchronousFileChannel asynchronousFileChannel = null;28 try {29 asynchronousFileChannel = AsynchronousFileChannel.open(file, StandardOpenOption.WRITE);30 List<Future<Integer>> futures = new ArrayList<>();31 for (int index = 0; index < 10; index++) {32 ByteBuffer byteBuffer = ByteBuffer.wrap(getRandomData());33 Future<Integer> future = asynchronousFileChannel.write(byteBuffer, 0);34 futures.add(future);35 }36 for (Future<Integer> future : futures) {37 Integer length = null;38 try {39 length = future.get();40 } catch (InterruptedException | ExecutionException e) {41 e.printStackTrace();42 }43 System.out.println("Bytes written: " + length);44 }45 46 } catch (IOException e) {47 e.printStackTrace();48 } finally {49 if (asynchronousFileChannel != null) {50 try {51 asynchronousFileChannel.close();52 } catch (IOException e) {53 e.printStackTrace();54 }55 }56 }57 }58 }
View Code
4. 使用CompletionHandler写文件:异步执行,回调CompletionHandler。注意:示例中,由于不阻塞主线程,即异步任务是否结果主线程都会结束,有时会看不到结果,所以sleep 5秒。
1 import java.io.IOException; 2 import java.nio.ByteBuffer; 3 import java.nio.channels.AsynchronousFileChannel; 4 import java.nio.channels.CompletionHandler; 5 import java.nio.file.Path; 6 import java.nio.file.Paths; 7 import java.nio.file.StandardOpenOption; 8 import java.util.Random; 9 import java.util.UUID;10 11 public class Test {12 13 private static byte[] getRandomData() {14 int randomLength = new Random().nextInt(100);15 StringBuilder data = new StringBuilder();16 for (int index = 0; index < randomLength; index++) {17 data.append(UUID.randomUUID().toString());18 }19 return data.append('\n').toString().getBytes();20 }21 22 public static void main (String [] args) {23 Path file = Paths.get("D:/test.txt");24 AsynchronousFileChannel asynchronousFileChannel = null;25 try {26 asynchronousFileChannel = AsynchronousFileChannel.open(file, StandardOpenOption.WRITE);27 CompletionHandler<Integer, Object> completionHandler = new CompletionHandler<Integer, Object>() {28 @Override29 public void completed(Integer result, Object attachment) {30 System.out.println("Bytes written: " + result);31 }32 @Override33 public void failed(Throwable exc, Object attachment) {34 }35 };36 for (int index = 0; index < 10; index ++) {37 ByteBuffer byteBuffer = ByteBuffer.wrap(getRandomData());38 asynchronousFileChannel.write(byteBuffer, 0, null, completionHandler);39 }40 41 } catch (IOException e) {42 e.printStackTrace();43 } finally {44 if (asynchronousFileChannel != null) {45 try {46 asynchronousFileChannel.close();47 } catch (IOException e) {48 e.printStackTrace();49 }50 }51 }52 try {53 Thread.sleep(5000);54 } catch (InterruptedException e) {55 e.printStackTrace();56 }57 }58 }
View Code
5. 使用Future读文件:异步执行,阻塞Future.get(),直到取得结果。
1 import java.io.IOException; 2 import java.nio.ByteBuffer; 3 import java.nio.channels.AsynchronousFileChannel; 4 import java.nio.file.Path; 5 import java.nio.file.Paths; 6 import java.nio.file.StandardOpenOption; 7 import java.util.concurrent.ExecutionException; 8 import java.util.concurrent.Future; 9 10 public class Test {11 12 public static void main (String [] args) {13 Path file = Paths.get("D:/test.txt");14 AsynchronousFileChannel asynchronousFileChannel = null;15 try {16 asynchronousFileChannel = AsynchronousFileChannel.open(file, StandardOpenOption.READ);17 ByteBuffer byteBuffer = ByteBuffer.allocate(64);18 int position = 0;19 int length = 0;20 do {21 Future<Integer> future = asynchronousFileChannel.read(byteBuffer, position);22 length = future.get();23 if (length > 0) {24 byteBuffer.flip();25 System.out.print(new String(byteBuffer.array()));26 byteBuffer.clear();27 }28 position += length;29 } while (length > 0);30 31 } catch (IOException e) {32 e.printStackTrace();33 } catch (InterruptedException e) {34 e.printStackTrace();35 } catch (ExecutionException e) {36 e.printStackTrace();37 } finally {38 if (asynchronousFileChannel != null) {39 try {40 asynchronousFileChannel.close();41 } catch (IOException e) {42 e.printStackTrace();43 }44 }45 }46 }47 }
View Code
6. 使用CompletionHandler读文件:异步执行,回调CompletionHandler。注意:示例中,由于不阻塞主线程,即异步任务是否结果主线程都会结束,有时会看不到结果,所以sleep 5秒。
1 import java.io.IOException; 2 import java.nio.ByteBuffer; 3 import java.nio.channels.AsynchronousFileChannel; 4 import java.nio.channels.CompletionHandler; 5 import java.nio.file.Path; 6 import java.nio.file.Paths; 7 import java.nio.file.StandardOpenOption; 8 9 public class Test {10 11 public static void main (String [] args) {12 Path file = Paths.get("D:/test.txt");13 AsynchronousFileChannel asynchronousFileChannel = null;14 try {15 asynchronousFileChannel = AsynchronousFileChannel.open(file, StandardOpenOption.READ);16 // 10个异步任务分别读取文件头64个字节,5秒后分别输出。17 CompletionHandler<Integer, ByteBuffer> completionHandler = new CompletionHandler<Integer, ByteBuffer>() {18 @Override19 public void completed(Integer result, ByteBuffer byteBuffer) {20 byteBuffer.flip();21 System.out.print(new String(byteBuffer.array()));22 byteBuffer.clear();23 }24 @Override25 public void failed(Throwable exc, ByteBuffer byteBuffer) {26 }27 };28 for (int index = 0; index < 10; index++) {29 ByteBuffer byteBuffer = ByteBuffer.allocate(64);30 asynchronousFileChannel.read(byteBuffer, byteBuffer.limit() * index, byteBuffer, completionHandler);31 }32 33 } catch (IOException e) {34 e.printStackTrace();35 } finally {36 if (asynchronousFileChannel != null) {37 try {38 asynchronousFileChannel.close();39 } catch (IOException e) {40 e.printStackTrace();41 }42 }43 }44 try {45 Thread.sleep(5000);46 } catch (InterruptedException e) {47 e.printStackTrace();48 }49 }50 }
View Code
四种IO编程对比及选择Netty的原因
1. 对比。
2. 选择NIO框架Netty,而不选择JDK的NIO类库的理由。
a) NIO类库和API繁杂。
b) 需另具备Java多线程编程等技能。
c) 可靠性不高,工作量和难度非常大。
d) 臭名昭著的epoll Bug导致Selector空轮训。
Netty入门
开发与部署
1. 开发环境:CLASSPATH中导入“netty-all-x.y.z.jar”即可。
2. 打包部署:由于是非Web应用,构建成jar包部署即可。
Hello World
1. 配置Maven的pom.
<dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>5.0.0.Alpha1</version></dependency>
View Code
2. 时间服务器TimeServer
1 import io.netty.bootstrap.ServerBootstrap; 2 import io.netty.channel.ChannelFuture; 3 import io.netty.channel.ChannelInitializer; 4 import io.netty.channel.ChannelOption; 5 import io.netty.channel.EventLoopGroup; 6 import io.netty.channel.nio.NioEventLoopGroup; 7 import io.netty.channel.socket.SocketChannel; 8 import io.netty.channel.socket.nio.NioServerSocketChannel; 9 10 public class TimeServer {11 12 public void bind(int port) throws Exception {13 // 服务器NIO线程组线14 EventLoopGroup bossGroup = new NioEventLoopGroup();15 EventLoopGroup workerGroup = new NioEventLoopGroup();16 try {17 ServerBootstrap serverBootstrap = new ServerBootstrap();18 serverBootstrap.group(bossGroup, workerGroup)19 .channel(NioServerSocketChannel.class)20 .option(ChannelOption.SO_BACKLOG, 1024)21 .childHandler(new ChildChannelHandler());22 // 绑定端口,同步等待成功23 ChannelFuture channelFuture = serverBootstrap.bind(port).sync();24 // 等待服务器监听端口关闭25 channelFuture.channel().closeFuture().sync();26 } finally {27 // 优雅退出,释放线程池资源28 workerGroup.shutdownGracefully();29 bossGroup.shutdownGracefully();30 }31 }32 33 private class ChildChannelHandler extends ChannelInitializer<SocketChannel> {34 35 @Override36 protected void initChannel(SocketChannel socketChannel) throws Exception {37 socketChannel.pipeline().addLast(new TimeServerHandler());38 }39 40 }41 42 public static void main(String[] args) throws Exception {43 new TimeServer().bind(8080);44 }45 46 }
View Code
3. 时间服务器TimeServerHandler
1 import java.util.Date; 2 3 import io.netty.buffer.ByteBuf; 4 import io.netty.buffer.Unpooled; 5 import io.netty.channel.ChannelHandlerAdapter; 6 import io.netty.channel.ChannelHandlerContext; 7 8 public class TimeServerHandler extends ChannelHandlerAdapter { 9 10 @Override11 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {12 ByteBuf reqBuf = (ByteBuf) msg;13 byte[] req = new byte[reqBuf.readableBytes()];14 reqBuf.readBytes(req);15 String reqString = new String(req, "UTF-8");16 String respString = "QUERY TIME ORDER".equalsIgnoreCase(reqString) ? new Date().toString() : "BAD ORDER";17 ByteBuf respBuf = Unpooled.copiedBuffer(respString.getBytes());18 ctx.write(respBuf);19 }20 21 @Override22 public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {23 ctx.flush();24 }25 26 @Override27 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {28 ctx.close();29 }30 31 }
View Code
4. 时间客户端TimeClient
1 import io.netty.bootstrap.Bootstrap; 2 import io.netty.channel.ChannelFuture; 3 import io.netty.channel.ChannelInitializer; 4 import io.netty.channel.ChannelOption; 5 import io.netty.channel.EventLoopGroup; 6 import io.netty.channel.nio.NioEventLoopGroup; 7 import io.netty.channel.socket.SocketChannel; 8 import io.netty.channel.socket.nio.NioSocketChannel; 9 10 public class TimeClient {11 12 public void connect(String host, int port) throws Exception {13 EventLoopGroup group = new NioEventLoopGroup();14 try {15 // 客户端NIO线程组16 Bootstrap bootstrap = new Bootstrap();17 bootstrap.group(group).channel(NioSocketChannel.class)18 .option(ChannelOption.TCP_NODELAY, true)19 .handler(new ChildChannelHandler());20 // 发起异步连接操作21 ChannelFuture channelFuture = bootstrap.connect(host, port).sync();22 // 等待客户端链路关闭23 channelFuture.channel().closeFuture().sync();24 25 } finally {26 // 优雅退出,释放NIO线程组27 group.shutdownGracefully();28 }29 }30 31 private class ChildChannelHandler extends ChannelInitializer<SocketChannel> {32 33 @Override34 protected void initChannel(SocketChannel socketChannel) throws Exception {35 socketChannel.pipeline().addLast(new TimeClientHandler());36 }37 38 }39 40 public static void main(String[] args) throws Exception {41 new TimeClient().connect("127.0.0.1", 8080);42 }43 44 }
View Code
5. 时间客户端TimeClientHandler
1 import io.netty.buffer.ByteBuf; 2 import io.netty.buffer.Unpooled; 3 import io.netty.channel.ChannelHandlerAdapter; 4 import io.netty.channel.ChannelHandlerContext; 5 6 public class TimeClientHandler extends ChannelHandlerAdapter { 7 8 private final ByteBuf reqBuf; 9 10 public TimeClientHandler() {11 byte[] req = "QUERY TIME ORDER".getBytes();12 reqBuf = Unpooled.buffer(req.length);13 reqBuf.writeBytes(req);14 }15 16 @Override17 public void channelActive(ChannelHandlerContext ctx) throws Exception {18 ctx.writeAndFlush(reqBuf);19 }20 21 @Override22 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {23 ByteBuf respBuf = (ByteBuf) msg;24 byte[] resp = new byte[respBuf.readableBytes()];25 respBuf.readBytes(resp);26 String respString = new String(resp, "UTF-8");27 System.out.println(respString);28 }29 30 @Override31 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {32 ctx.close();33 }34 35 }
View Code
粘包/拆包问题
问题及其解决
1. TCP是一个“流协议”,是没有界限的一串数据。
2. TCP底层并不了解上层业务数据的具体含义,它会根据TCP缓冲区的实际情况进行包的划分。所以在业务上认为,一个完整的包可能会被TCP拆包发送,也可能封装多个
小包成大包发送。
3. 业界主流协议的解决方案归纳:
a) 消息定长。如每个报文的大小固定长度200字节,不足时空位补空格。
b) 在包尾增加回车换行符进行分割。如FTP协议。
c) 将消息分为消息头、消息体,消息头中包含消息总长度(或消息体长度)的字段。
d) 更复杂的应用层协议。
4. Netty提供了多种编码器用于解决粘包/拆包问题。
LineBasedFrameDecoder
1. 原理:遍历ByteBuf中的可读字节,发现“\n”或“\r\n”时就结束。
2. 支持携带结束符或不携带结束符两种编码方式;支持配置单行的最大长度(超过最大长度未发现换行符则抛出异常,同时忽略掉之前读到的异常码流)。
3. StringDecoder功能:将接受到的对象转成字符串,然后继续调用后面的Handler。
4. 使用LineBasedFrameDecoder优化后的时间服务器。
a) 时间服务器TimeServer
1 import io.netty.bootstrap.ServerBootstrap; 2 import io.netty.channel.ChannelFuture; 3 import io.netty.channel.ChannelInitializer; 4 import io.netty.channel.ChannelOption; 5 import io.netty.channel.EventLoopGroup; 6 import io.netty.channel.nio.NioEventLoopGroup; 7 import io.netty.channel.socket.SocketChannel; 8 import io.netty.channel.socket.nio.NioServerSocketChannel; 9 import io.netty.handler.codec.LineBasedFrameDecoder;10 import io.netty.handler.codec.string.StringDecoder;11 12 public class TimeServer {13 14 public void bind(int port) throws Exception {15 // 服务器NIO线程组线16 EventLoopGroup bossGroup = new NioEventLoopGroup();17 EventLoopGroup workerGroup = new NioEventLoopGroup();18 try {19 ServerBootstrap serverBootstrap = new ServerBootstrap();20 serverBootstrap.group(bossGroup, workerGroup)21 .channel(NioServerSocketChannel.class)22 .option(ChannelOption.SO_BACKLOG, 1024)23 .childHandler(new ChildChannelHandler());24 // 绑定端口,同步等待成功25 ChannelFuture channelFuture = serverBootstrap.bind(port).sync();26 // 等待服务器监听端口关闭27 channelFuture.channel().closeFuture().sync();28 } finally {29 // 优雅退出,释放线程池资源30 workerGroup.shutdownGracefully();31 bossGroup.shutdownGracefully();32 }33 }34 35 private class ChildChannelHandler extends ChannelInitializer<SocketChannel> {36 37 @Override38 protected void initChannel(SocketChannel socketChannel) throws Exception {39 socketChannel.pipeline().addLast(new LineBasedFrameDecoder(1024));40 socketChannel.pipeline().addLast(new StringDecoder());41 socketChannel.pipeline().addLast(new TimeServerHandler());42 }43 44 }45 46 public static void main(String[] args) throws Exception {47 new TimeServer().bind(8080);48 }49 50 }
View Code
b) 时间服务器TimeServerHandler
1 import java.util.Date; 2 3 import io.netty.buffer.ByteBuf; 4 import io.netty.buffer.Unpooled; 5 import io.netty.channel.ChannelHandlerAdapter; 6 import io.netty.channel.ChannelHandlerContext; 7 8 public class TimeServerHandler extends ChannelHandlerAdapter { 9 10 @Override11 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {12 String reqString = (String) msg;13 String respString = "QUERY TIME ORDER".equalsIgnoreCase(reqString) ? new Date().toString() : "BAD ORDER";14 respString += "\n";15 ByteBuf respBuf = Unpooled.copiedBuffer(respString.getBytes());16 ctx.write(respBuf);17 }18 19 @Override20 public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {21 ctx.flush();22 }23 24 @Override25 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {26 ctx.close();27 }28 29 }
View Code
c) 时间客户端TimeClient
1 import io.netty.bootstrap.Bootstrap; 2 import io.netty.channel.ChannelFuture; 3 import io.netty.channel.ChannelInitializer; 4 import io.netty.channel.ChannelOption; 5 import io.netty.channel.EventLoopGroup; 6 import io.netty.channel.nio.NioEventLoopGroup; 7 import io.netty.channel.socket.SocketChannel; 8 import io.netty.channel.socket.nio.NioSocketChannel; 9 import io.netty.handler.codec.LineBasedFrameDecoder;10 import io.netty.handler.codec.string.StringDecoder;11 12 public class TimeClient {13 14 public void connect(String host, int port) throws Exception {15 EventLoopGroup group = new NioEventLoopGroup();16 try {17 // 客户端NIO线程组18 Bootstrap bootstrap = new Bootstrap();19 bootstrap.group(group).channel(NioSocketChannel.class)20 .option(ChannelOption.TCP_NODELAY, true)21 .handler(new ChildChannelHandler());22 // 发起异步连接操作23 ChannelFuture channelFuture = bootstrap.connect(host, port).sync();24 // 等待客户端链路关闭25 channelFuture.channel().closeFuture().sync();26 27 } finally {28 // 优雅退出,释放NIO线程组29 group.shutdownGracefully();30 }31 }32 33 private class ChildChannelHandler extends ChannelInitializer<SocketChannel> {34 35 @Override36 protected void initChannel(SocketChannel socketChannel) throws Exception {37 socketChannel.pipeline().addLast(new LineBasedFrameDecoder(1024));38 socketChannel.pipeline().addLast(new StringDecoder());39 socketChannel.pipeline().addLast(new TimeClientHandler());40 }41 42 }43 44 public static void main(String[] args) throws Exception {45 new TimeClient().connect("127.0.0.1", 8080);46 }47 48 }
View Code
d) 时间客户端TimeClientHandler
1 import io.netty.buffer.ByteBuf; 2 import io.netty.buffer.Unpooled; 3 import io.netty.channel.ChannelHandlerAdapter; 4 import io.netty.channel.ChannelHandlerContext; 5 6 public class TimeClientHandler extends ChannelHandlerAdapter { 7 8 private final ByteBuf reqBuf; 9 10 public TimeClientHandler() {11 byte[] req = "QUERY TIME ORDER\n".getBytes();12 reqBuf = Unpooled.buffer(req.length);13 reqBuf.writeBytes(req);14 }15 16 @Override17 public void channelActive(ChannelHandlerContext ctx) throws Exception {18 ctx.writeAndFlush(reqBuf);19 }20 21 @Override22 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {23 String respString = (String) msg;24 System.out.println(respString);25 }26 27 @Override28 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {29 ctx.close();30 }31 32 }
View Code
DelimiterBasedFrameDecoder
1. 功能:以分隔符作为码流结束标识符的消息解码。
2. 时间服务器TimeServer
1 import io.netty.bootstrap.ServerBootstrap; 2 import io.netty.buffer.ByteBuf; 3 import io.netty.buffer.Unpooled; 4 import io.netty.channel.ChannelFuture; 5 import io.netty.channel.ChannelInitializer; 6 import io.netty.channel.ChannelOption; 7 import io.netty.channel.EventLoopGroup; 8 import io.netty.channel.nio.NioEventLoopGroup; 9 import io.netty.channel.socket.SocketChannel;10 import io.netty.channel.socket.nio.NioServerSocketChannel;11 import io.netty.handler.codec.DelimiterBasedFrameDecoder;12 import io.netty.handler.codec.string.StringDecoder;13 14 public class TimeServer {15 16 public void bind(int port) throws Exception {17 // 服务器NIO线程组线18 EventLoopGroup bossGroup = new NioEventLoopGroup();19 EventLoopGroup workerGroup = new NioEventLoopGroup();20 try {21 ServerBootstrap serverBootstrap = new ServerBootstrap();22 serverBootstrap.group(bossGroup, workerGroup)23 .channel(NioServerSocketChannel.class)24 .option(ChannelOption.SO_BACKLOG, 1024)25 .childHandler(new ChildChannelHandler());26 // 绑定端口,同步等待成功27 ChannelFuture channelFuture = serverBootstrap.bind(port).sync();28 // 等待服务器监听端口关闭29 channelFuture.channel().closeFuture().sync();30 } finally {31 // 优雅退出,释放线程池资源32 workerGroup.shutdownGracefully();33 bossGroup.shutdownGracefully();34 }35 }36 37 private class ChildChannelHandler extends ChannelInitializer<SocketChannel> {38 39 @Override40 protected void initChannel(SocketChannel socketChannel) throws Exception {41 ByteBuf delimiter = Unpooled.copiedBuffer("*&*".getBytes());42 socketChannel.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, delimiter));43 socketChannel.pipeline().addLast(new StringDecoder());44 socketChannel.pipeline().addLast(new TimeServerHandler());45 }46 47 }48 49 public static void main(String[] args) throws Exception {50 new TimeServer().bind(8080);51 }52 53 }
View Code
3. 时间服务器TimeServerHandler
1 import java.util.Date; 2 3 import io.netty.buffer.ByteBuf; 4 import io.netty.buffer.Unpooled; 5 import io.netty.channel.ChannelHandlerAdapter; 6 import io.netty.channel.ChannelHandlerContext; 7 8 public class TimeServerHandler extends ChannelHandlerAdapter { 9 10 @Override11 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {12 String reqString = (String) msg;13 String respString = "QUERY TIME ORDER".equalsIgnoreCase(reqString) ? new Date().toString() : "BAD ORDER";14 respString += "*&*";15 ByteBuf respBuf = Unpooled.copiedBuffer(respString.getBytes());16 ctx.write(respBuf);17 }18 19 @Override20 public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {21 ctx.flush();22 }23 24 @Override25 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {26 ctx.close();27 }28 29 }
View Code
4. 时间客户端TimeClient
1 import io.netty.bootstrap.Bootstrap; 2 import io.netty.buffer.ByteBuf; 3 import io.netty.buffer.Unpooled; 4 import io.netty.channel.ChannelFuture; 5 import io.netty.channel.ChannelInitializer; 6 import io.netty.channel.ChannelOption; 7 import io.netty.channel.EventLoopGroup; 8 import io.netty.channel.nio.NioEventLoopGroup; 9 import io.netty.channel.socket.SocketChannel;10 import io.netty.channel.socket.nio.NioSocketChannel;11 import io.netty.handler.codec.DelimiterBasedFrameDecoder;12 import io.netty.handler.codec.string.StringDecoder;13 14 public class TimeClient {15 16 public void connect(String host, int port) throws Exception {17 EventLoopGroup group = new NioEventLoopGroup();18 try {19 // 客户端NIO线程组20 Bootstrap bootstrap = new Bootstrap();21 bootstrap.group(group).channel(NioSocketChannel.class)22 .option(ChannelOption.TCP_NODELAY, true)23 .handler(new ChildChannelHandler());24 // 发起异步连接操作25 ChannelFuture channelFuture = bootstrap.connect(host, port).sync();26 // 等待客户端链路关闭27 channelFuture.channel().closeFuture().sync();28 29 } finally {30 // 优雅退出,释放NIO线程组31 group.shutdownGracefully();32 }33 }34 35 private class ChildChannelHandler extends ChannelInitializer<SocketChannel> {36 37 @Override38 protected void initChannel(SocketChannel socketChannel) throws Exception {39 ByteBuf delimiter = Unpooled.copiedBuffer("*&*".getBytes());40 socketChannel.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, delimiter));41 socketChannel.pipeline().addLast(new StringDecoder());42 socketChannel.pipeline().addLast(new TimeClientHandler());43 }44 45 }46 47 public static void main(String[] args) throws Exception {48 new TimeClient().connect("127.0.0.1", 8080);49 }50 51 }
View Code
5. 时间客户端TimeClientHandler
1 import io.netty.buffer.ByteBuf; 2 import io.netty.buffer.Unpooled; 3 import io.netty.channel.ChannelHandlerAdapter; 4 import io.netty.channel.ChannelHandlerContext; 5 6 public class TimeClientHandler extends ChannelHandlerAdapter { 7 8 private final ByteBuf reqBuf; 9 10 public TimeClientHandler() {11 byte[] req = "QUERY TIME ORDER*&*".getBytes();12 reqBuf = Unpooled.buffer(req.length);13 reqBuf.writeBytes(req);14 }15 16 @Override17 public void channelActive(ChannelHandlerContext ctx) throws Exception {18 ctx.writeAndFlush(reqBuf);19 }20 21 @Override22 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {23 String respString = (String) msg;24 System.out.println(respString);25 }26 27 @Override28 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {29 ctx.close();30 }31 32 }
View Code
FixedLengthFrameDecoder
1. 原理:无论一次接受到多少数据包,它都会按照设置的固定长度解码,如果是半包消息,则缓存半包消息并等待下个包到达后进行拼包,直到读取到一个完整的包。
2. 回显服务器EchoServer
1 import io.netty.bootstrap.ServerBootstrap; 2 import io.netty.channel.ChannelFuture; 3 import io.netty.channel.ChannelInitializer; 4 import io.netty.channel.ChannelOption; 5 import io.netty.channel.EventLoopGroup; 6 import io.netty.channel.nio.NioEventLoopGroup; 7 import io.netty.channel.socket.SocketChannel; 8 import io.netty.channel.socket.nio.NioServerSocketChannel; 9 import io.netty.handler.codec.FixedLengthFrameDecoder;10 import io.netty.handler.codec.string.StringDecoder;11 12 public class EchoServer {13 14 public void bind(int port) throws Exception {15 // 服务器NIO线程组线16 EventLoopGroup bossGroup = new NioEventLoopGroup();17 EventLoopGroup workerGroup = new NioEventLoopGroup();18 try {19 ServerBootstrap serverBootstrap = new ServerBootstrap();20 serverBootstrap.group(bossGroup, workerGroup)21 .channel(NioServerSocketChannel.class)22 .option(ChannelOption.SO_BACKLOG, 1024)23 .childHandler(new ChildChannelHandler());24 // 绑定端口,同步等待成功25 ChannelFuture channelFuture = serverBootstrap.bind(port).sync();26 // 等待服务器监听端口关闭27 channelFuture.channel().closeFuture().sync();28 } finally {29 // 优雅退出,释放线程池资源30 workerGroup.shutdownGracefully();31 bossGroup.shutdownGracefully();32 }33 }34 35 private class ChildChannelHandler extends ChannelInitializer<SocketChannel> {36 37 @Override38 protected void initChannel(SocketChannel socketChannel) throws Exception {39 socketChannel.pipeline().addLast(new FixedLengthFrameDecoder(20));40 socketChannel.pipeline().addLast(new StringDecoder());41 socketChannel.pipeline().addLast(new EchoServerHandler());42 }43 44 }45 46 public static void main(String[] args) throws Exception {47 new EchoServer().bind(8080);48 }49 50 }
View Code
3. 回显服务器EchoServerHandler
1 import io.netty.channel.ChannelHandlerAdapter; 2 import io.netty.channel.ChannelHandlerContext; 3 4 public class EchoServerHandler extends ChannelHandlerAdapter { 5 6 @Override 7 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { 8 System.out.println(msg); 9 }10 11 @Override12 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {13 ctx.close();14 }15 16 }
View Code
4. 使用telnet命令测试,当长度达到20个字符时,服务器打印。
Java序列化问题
问题描述及其解决
1. 无法跨语言。Java序列化是Java语言内部的私有协议,其他语言并不支持。
2. 序列化后的码流太大。编码后的字节数组越大,存储的时候就越占空间,存储的硬件成本就越高,网络传输时更占带宽,导致系统的吞吐量降低。
3. 序列化性能太低。编解码耗时长。
4. 解决:编解码框架,如Google Protobuf、MessagePack。此处不深入展开。
HTTP协议开发
Netty HTTP
1. 由于HTTP协议的通用性,很多异构系统间的通信交互采用HTTP协议,如非常流行的HTTP +
2. 与Web容器相比,Netty开发HTTP的优势:轻量级;安全。
3. 这里以文件服务器举例,至于HTTP +
文件服务器
1. 文件服务器HttpFileServer
1 import io.netty.bootstrap.ServerBootstrap; 2 import io.netty.channel.ChannelFuture; 3 import io.netty.channel.ChannelInitializer; 4 import io.netty.channel.EventLoopGroup; 5 import io.netty.channel.nio.NioEventLoopGroup; 6 import io.netty.channel.socket.SocketChannel; 7 import io.netty.channel.socket.nio.NioServerSocketChannel; 8 import io.netty.handler.codec.http.HttpObjectAggregator; 9 import io.netty.handler.codec.http.HttpRequestDecoder;10 import io.netty.handler.codec.http.HttpResponseEncoder;11 import io.netty.handler.stream.ChunkedWriteHandler;12 13 public class HttpFileServer {14 15 public void run(int port, String folderPath) throws Exception {16 EventLoopGroup bossGroup = new NioEventLoopGroup();17 EventLoopGroup workerGroup = new NioEventLoopGroup();18 try {19 ServerBootstrap serverBootstrap = new ServerBootstrap();20 serverBootstrap.group(bossGroup, workerGroup)21 .channel(NioServerSocketChannel.class)22 .childHandler(new ChannelInitializer<SocketChannel>() {23 24 @Override25 protected void initChannel(SocketChannel socketChannel) throws Exception {26 socketChannel.pipeline().addLast(new HttpRequestDecoder());27 socketChannel.pipeline().addLast(new HttpObjectAggregator(65536));28 socketChannel.pipeline().addLast(new HttpResponseEncoder());29 socketChannel.pipeline().addLast(new ChunkedWriteHandler());30 socketChannel.pipeline().addLast(new HttpFileServerHandler(folderPath));31 }32 33 });34 ChannelFuture channelFuture = serverBootstrap.bind(port).sync();35 channelFuture.channel().closeFuture().sync();36 } finally {37 workerGroup.shutdownGracefully();38 bossGroup.shutdownGracefully();39 }40 }41 42 public static void main(String[] args) throws Exception {43 int port = 8080;44 String folderPath = "E:/workspace";45 new HttpFileServer().run(port, folderPath);46 }47 48 }
View Code
2. 文件服务器HttpFileServerHandler
1 import io.netty.buffer.ByteBuf; 2 import io.netty.buffer.Unpooled; 3 import io.netty.channel.ChannelFutureListener; 4 import io.netty.channel.ChannelHandlerContext; 5 import io.netty.channel.SimpleChannelInboundHandler; 6 import io.netty.handler.codec.http.DefaultFullHttpResponse; 7 import io.netty.handler.codec.http.DefaultHttpResponse; 8 import io.netty.handler.codec.http.FullHttpRequest; 9 import io.netty.handler.codec.http.FullHttpResponse; 10 import io.netty.handler.codec.http.HttpHeaders; 11 import io.netty.handler.codec.http.HttpMethod; 12 import io.netty.handler.codec.http.HttpResponse; 13 import io.netty.handler.codec.http.HttpResponseStatus; 14 import io.netty.handler.codec.http.HttpVersion; 15 import io.netty.handler.stream.ChunkedFile; 16 import io.netty.util.CharsetUtil; 17 18 import java.io.File; 19 import java.io.FileNotFoundException; 20 import java.io.RandomAccessFile; 21 import java.net.URLDecoder; 22 23 public class HttpFileServerHandler extends SimpleChannelInboundHandler<FullHttpRequest> { 24 25 private String folderPath; 26 27 public HttpFileServerHandler(String folderPath) { 28 this.folderPath = folderPath; 29 } 30 31 @Override 32 protected void messageReceived(ChannelHandlerContext ctx, FullHttpRequest req) throws Exception { 33 if (!req.getDecoderResult().isSuccess()) { 34 sendStatus(ctx, HttpResponseStatus.BAD_REQUEST); 35 return; 36 } 37 if (!HttpMethod.GET.equals(req.getMethod())) { 38 sendStatus(ctx, HttpResponseStatus.METHOD_NOT_ALLOWED); 39 return; 40 } 41 String uri = req.getUri(); 42 File file = getFile(uri); 43 if (file == null || file.isHidden() || !file.exists()) { 44 sendStatus(ctx, HttpResponseStatus.NOT_FOUND); 45 return; 46 } 47 try { 48 if (file.isDirectory()) { 49 listFiles(ctx, file, uri); 50 } else { 51 returnFile(ctx, req, file); 52 } 53 } catch (Exception e) { 54 sendStatus(ctx, HttpResponseStatus.INTERNAL_SERVER_ERROR); 55 } 56 } 57 58 private File getFile(String uri) throws Exception { 59 uri = URLDecoder.decode(uri, "UTF-8"); 60 return new File(folderPath + uri); 61 } 62 63 private void listFiles(ChannelHandlerContext ctx, File folder, String uri) throws Exception { 64 uri = uri.endsWith("/") ? uri : uri + "/"; 65 StringBuilder html = new StringBuilder("<h1>Index of ").append(URLDecoder.decode(uri, "UTF-8")).append("</h1><hr/><pre><a href=\"").append(uri).append("../\">../</a>\n"); 66 File[] subfiles = folder.listFiles(); 67 if (subfiles != null && subfiles.length > 0) { 68 for (File subfile : subfiles) { 69 String name = subfile.getName(); 70 html.append("<a href=\"").append(uri).append(name).append("\">").append(name).append("</a>\n"); 71 } 72 } 73 html.append("</pre><hr/>"); 74 FullHttpResponse resp = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK); 75 resp.headers().add(HttpHeaders.Names.CONTENT_TYPE, "text/html;charset=UTF-8"); 76 ByteBuf content = Unpooled.copiedBuffer(html, CharsetUtil.UTF_8); 77 resp.content().writeBytes(content); 78 ctx.writeAndFlush(resp).addListener(ChannelFutureListener.CLOSE); 79 } 80 81 private void returnFile(ChannelHandlerContext ctx, FullHttpRequest req, File file) throws Exception { 82 83 RandomAccessFile randomAccessFile = null; 84 try { 85 randomAccessFile = new RandomAccessFile(file, "r"); 86 HttpResponse resp = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK); 87 resp.headers().set(HttpHeaders.Names.CONTENT_LENGTH, randomAccessFile.length()) 88 .set(HttpHeaders.Names.CONTENT_TYPE, "application/octet-stream"); 89 if (HttpHeaders.Values.KEEP_ALIVE.toString().equalsIgnoreCase(req.headers().get(HttpHeaders.Names.CONNECTION))) { 90 resp.headers().set(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.KEEP_ALIVE); 91 } 92 ctx.write(resp); 93 ctx.writeAndFlush(new ChunkedFile(randomAccessFile, 0, randomAccessFile.length(), 8192)).addListener(ChannelFutureListener.CLOSE); 94 95 } catch (FileNotFoundException e) { 96 sendStatus(ctx, HttpResponseStatus.NOT_FOUND); 97 } finally { 98 if (randomAccessFile != null) { 99 randomAccessFile.close();100 }101 }102 }103 104 private void sendStatus(ChannelHandlerContext ctx, HttpResponseStatus status) throws Exception {105 HttpResponse resp = new DefaultHttpResponse(HttpVersion.HTTP_1_1, status);106 ctx.writeAndFlush(resp).addListener(ChannelFutureListener.CLOSE);107 }108 109 }
View Code
WebSocket协议开发
问题及其解决
1. 轮训、Comet等服务器推送技术效率低下,大量消耗服务器带宽和资源。
2. WebSocket的特点:
a) 单一的TCP连接,全双工模式。
b) 对代理、防火墙和路由器透明。
c) 无头部信息、Cookie和身份验证。
d) 无安全开销。
e) 通过“ping/pong”帧保持链路激活。
f) 服务器可以主动传递消息给客户端,客户端不再轮训。
原理(过程)
1. 浏览器向服务器发起一个HTTP请求(特别的头信息,Sec-WebSocket-Key是随机的),准备建立WebSocket连接。
GET /chat HTTP/1.1Host: server.example.comUpgrade: websocketConnection: UpgradeSec-WebSocket-Key: x3JJHMbDL1EzLkh9GBhXDw==Sec-WebSocket-Protocol: chat, superchatSec-WebSocket-Version: 13Origin: http://example.com
2. 服务器用Sec-WebSocket-Key加上魔幻字符串“258EAFA5-E914-47DA-95CA-C5AB0DC85B11”,先SHA-1加密,再BASE-64编码,作为Sec-WebSocket-Accept返回浏览器。握手完成。
HTTP/1.1 101 Switching ProtocolsUpgrade: websocketConnection: UpgradeSec-WebSocket-Accept: HSmrc0sMlYUkAGmm5OPpG2HaGWk=Sec-WebSocket-Protocol: chat
3. 服务器和浏览器可通过message方式进行通信。
4. 关闭消息带有一个状态码和一个可选的关闭原因,按协议要求发送一个Close控制帧,当对端接受到关闭控制帧指令时,主动关闭WebSocket连接。
开发
1. 服务器WebSocketServer
1 import io.netty.bootstrap.ServerBootstrap; 2 import io.netty.channel.ChannelFuture; 3 import io.netty.channel.ChannelInitializer; 4 import io.netty.channel.EventLoopGroup; 5 import io.netty.channel.nio.NioEventLoopGroup; 6 import io.netty.channel.socket.SocketChannel; 7 import io.netty.channel.socket.nio.NioServerSocketChannel; 8 import io.netty.handler.codec.http.HttpObjectAggregator; 9 import io.netty.handler.codec.http.HttpRequestDecoder;10 import io.netty.handler.codec.http.HttpResponseEncoder;11 import io.netty.handler.stream.ChunkedWriteHandler;12 13 public class WebSocketServer {14 15 public void run(int port) throws Exception {16 EventLoopGroup bossGroup = new NioEventLoopGroup();17 EventLoopGroup workerGroup = new NioEventLoopGroup();18 try {19 ServerBootstrap serverBootstrap = new ServerBootstrap();20 serverBootstrap.group(bossGroup, workerGroup)21 .channel(NioServerSocketChannel.class)22 .childHandler(new ChannelInitializer<SocketChannel>() {23 24 @Override25 protected void initChannel(SocketChannel socketChannel) throws Exception {26 socketChannel.pipeline().addLast(new HttpRequestDecoder());27 socketChannel.pipeline().addLast(new HttpObjectAggregator(65536));28 socketChannel.pipeline().addLast(new HttpResponseEncoder());29 socketChannel.pipeline().addLast(new ChunkedWriteHandler());30 socketChannel.pipeline().addLast(new WebSocketServerHandler());31 }32 33 });34 ChannelFuture channelFuture = serverBootstrap.bind(port).sync();35 channelFuture.channel().closeFuture().sync();36 } finally {37 workerGroup.shutdownGracefully();38 bossGroup.shutdownGracefully();39 }40 }41 42 public static void main(String[] args) throws Exception {43 int port = 8080;44 new WebSocketServer().run(port);45 }46 47 }
View Code
2. 服务器WebSocketServerHandler
1 import io.netty.channel.ChannelFutureListener; 2 import io.netty.channel.ChannelHandlerContext; 3 import io.netty.channel.SimpleChannelInboundHandler; 4 import io.netty.handler.codec.http.DefaultHttpResponse; 5 import io.netty.handler.codec.http.FullHttpRequest; 6 import io.netty.handler.codec.http.HttpHeaders; 7 import io.netty.handler.codec.http.HttpResponse; 8 import io.netty.handler.codec.http.HttpResponseStatus; 9 import io.netty.handler.codec.http.HttpVersion;10 import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;11 import io.netty.handler.codec.http.websocketx.PingWebSocketFrame;12 import io.netty.handler.codec.http.websocketx.PongWebSocketFrame;13 import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;14 import io.netty.handler.codec.http.websocketx.WebSocketFrame;15 import io.netty.handler.codec.http.websocketx.WebSocketServerHandshaker;16 import io.netty.handler.codec.http.websocketx.WebSocketServerHandshakerFactory;17 18 import java.util.Date;19 20 public class WebSocketServerHandler extends SimpleChannelInboundHandler<Object> {21 22 private WebSocketServerHandshaker handshaker;23 24 @Override25 protected void messageReceived(ChannelHandlerContext ctx, Object msg) throws Exception {26 // 传统HTTP27 if (msg instanceof FullHttpRequest) {28 handleHttpRequest(ctx, (FullHttpRequest) msg);29 } else if (msg instanceof WebSocketFrame) {30 handleWebSocketFrame(ctx, (WebSocketFrame) msg);31 }32 }33 34 @Override35 public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {36 ctx.flush();37 }38 39 private void handleHttpRequest(ChannelHandlerContext ctx, FullHttpRequest req) throws Exception {40 if (!req.getDecoderResult().isSuccess()41 || !HttpHeaders.Values.WEBSOCKET.toString().equalsIgnoreCase(req.headers().get(HttpHeaders.Names.UPGRADE))) {42 sendStatus(ctx, HttpResponseStatus.BAD_REQUEST);43 return;44 }45 WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory("ws://localhost:8080/testws", null, false);46 handshaker = wsFactory.newHandshaker(req);47 if (handshaker == null) {48 WebSocketServerHandshakerFactory.sendUnsupportedWebSocketVersionResponse(ctx.channel());49 } else {50 handshaker.handshake(ctx.channel(), req);51 }52 }53 54 private void handleWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) throws Exception {55 if (frame instanceof CloseWebSocketFrame) {56 handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain());57 return;58 }59 if (frame instanceof PingWebSocketFrame) {60 ctx.channel().write(new PongWebSocketFrame(frame.content().retain()));61 return;62 }63 if (!(frame instanceof TextWebSocketFrame)) {64 throw new UnsupportedOperationException();65 }66 String req = ((TextWebSocketFrame) frame).text();67 ctx.channel().write(new TextWebSocketFrame("欢迎" + req + ",现在时刻" + new Date()));68 }69 70 private void sendStatus(ChannelHandlerContext ctx, HttpResponseStatus status) throws Exception {71 HttpResponse resp = new DefaultHttpResponse(HttpVersion.HTTP_1_1, status);72 ctx.writeAndFlush(resp).addListener(ChannelFutureListener.CLOSE);73 }74 75 }
View Code
3. 浏览器websocketclient.html
1 <script type="text/javascript"> 2 var socket; 3 function initSocket() { 4 if (socket) return; 5 if (!window.WebSocket) window.WebSocket = window.MozWebSocket; 6 if (!window.WebSocket) { 7 alert('浏览器不支持WebSocket'); 8 return; 9 }10 socket = new WebSocket('ws://localhost:8080/testws');11 socket.onmessage = function(event) {12 alert(event.data);13 };14 socket.onopen = function(event) {15 alert('WebSocket连接建立成功');16 };17 socket.onclose = function(event) {18 alert('WebSocket连接已关闭');19 };20 }21 22 function sendMsg() {23 initSocket();24 if (socket && WebSocket && socket.readyState == WebSocket.OPEN) {25 var msg = document.getElementById('msg').value;26 socket.send(msg);27 }28 }29 </script>30 <input type="text" id="msg"/>31 <input type="button" value="Send" onclick="sendMsg()"/>
View Code
Netty架构
逻辑架构
1. Netty采用三层网络架构设计和开发。
2. Reactor通信调度层(第1层)。负责监听网络的读写和连接操作。将网络层的数据读取到内存缓存区,然后触发各种网络事件,如连接创建、连接激活、读事件、写事件等,将这些事件触发到Pipeline中,有Pipeline管理的责任链进行后续处理。
3. 责任链ChannelPipleline(第2层)。负责事件在责任链中的有序传播,同时动态地编排责任链。通常,由编解码Handler将外部协议消息转换成内部POJO对象,这样上层业务只需关心业务逻辑处理。
4. 业务逻辑编排层Service ChannelHandler(第3层)。通常有两类:存储的业务逻辑编排和其他应用层协议插件,用于特定协议相关的会话和链路管理。
5. 通常,开发者值需关系责任链和业务逻辑编排层。
高性能
Netty的高性能是如何实现的?
1. 采用异步非阻塞IO类库,基于Reactor模式实现,解决了传统同步阻塞IO模式下一个服务端无法平滑处理线性增长的客户端的问题。
2. TCP接收和发送缓冲区使用直接内存代替堆内存,避免内存复制,提升了IO读写性能。俗称“零拷贝”(Zero-Copy)。
3. 通过内存池方式循环利用ByteBuf,避免了频繁创建和销毁ByteBuf带来的性能损耗。
4. 可配置IO线程数、TCP参数等,为不同场景提供定制化的调优参数,满足不同的性能场景。
5. 采用环形数组缓冲区实现无锁化并发编程,代替传统的线程安全容器和锁。
6. 合理使用线程安全容器、原子类等,提升系统的并发处理能力。
7. 关键资源的处理使用单线程串行化方式,避免了多线程并发访问带来的锁竞争和额外的CPU资源消耗问题。
8. 通过引用计数器及时申请释放不再被引用的对象,细粒度的内存管理降低了GC频繁,减少了频繁GC带来的延时和CPU损耗。
可靠性
Netty的可靠性是如何实现的?
1. 链路有效性检测。
a) 长连接无需每次发送消息时创建链路,也无需在消息交互完成后关闭链路,因此相对短链接更高。
b) 为保证长连接有效性,需要周期性心跳检测。一旦发现问题,可以及时关闭链路,重建TCP链接。
2. 内存保护机制。
a) 通过对象引用计数器对ByteBuf等内置对象进行细粒度的内存申请和释放,对非法对象引用进行检测和保护。
b) 通过内存池方式循环利用ByteBuf,节省内存。
c) 可设置内存容量上限,包括ByteBuf、线程池线程数等。
3. 优雅停机。
a) 当系统退出时,JVM通过注册的Shutdown Hook拦截到退出信号量,然后执行退出操作,释放相关模块的资源,将缓冲区的消息处理完成或清空,将待刷新的数据持久化到磁盘或数据库,完成后再退出。
b) 需设置超时时间T,如果达到T后仍然没有退出,则通过“kill -9 pid”强杀进程。
可定制性
Netty的可定制性是如何实现的?
1. 责任链模式:ChannelPipeline基于责任链模式,便于业务逻辑的拦截、定制和扩展。
2. 基于接口开发:关键类库都提供了接口或抽象类。
3. 提供大量工厂类,重载工厂类可创建用户实现的对象。
4. 提供大量系统参数供用户设置。
可扩展性
可定义私有协议栈。
私有协议栈开发
1. 开发时编写的代码。
a) 数据结构NettyMessage;
b) 消息编解码器NettyMessageEncoder和NettyMessageDecoder;
c) 握手认证Handler LoginAuthReqHanlder和LoginAuthRespHanlder;
d) 心跳检测Handler HearBeatReqHanlder和HearBeatRespHanlder。
2. 私有协议栈细节待补充。
作者:netoxi
出处
原标题:Netty笔记——技术点汇总
关键词:net
*特别声明:以上内容来自于网络收集,著作权属原作者所有,如有侵权,请联系我们:
admin#shaoqun.com
(#换成@)。