Netty学习笔记

netty解决的问题

粘包、拆包

原因

TCP是个流协议,所谓流,就是没有界限的一串数据。大家可以想想合理的流水,,是连成一片没有分界线的。TCP底层并不了解上层业务数据的具体含义,它会根据实际情况进行包的划分,所以在业务上认为,一个完整的包可能会被TCP拆封成多个包进行发送,也有可能把多个小的包封装成一个大的数据包发送,这就是所谓的TCP粘包和拆包的问题

假设客户端分别发送了两个数据包D1和D2给服务端,由于服务端一次读取到的字节数是不确定的,故可能存在以下4种情况。
(1)服务端分两次读取到了两个独立的数据包,分别是D1和D2,没有粘包和拆包;
(2)服务端一次接收到了两个数据包,D1和D2粘合在一起,被称为TCP粘包;
(3)服务端分两次读取到了两个数据包,第一次读取到了完整的D1包和D2包的部分内容,第二次读取到了D2包的剩余内容,这被称为TCP拆包;
(4)服务端分两次读取到了两个数据包,第一次读取到了D1包的部分内容D1_1,第二次读取到了D1包的剩余内容D1_2和D2包的整包。

如果此时服务端TCP接收滑窗非常小,而数据包D1和D2比较大,很有可能会发生第5种可能,即服务端分多次才能将D1和D2包接收完全,期间发生多次拆包。

问题产生的原因有三个,分别如下

  • 应用程序write写入的字节大小大于套接口发送缓冲区大小
  • 进行MSS大小的TCP分段
  • 以太网帧的payload大于MTU进行IP分片

解决策略

只能通过上层应用协议栈来解决,根据业界主流的协议方案,可以归纳如下。

  1. 消息定长,如果每个报文的大小固定长度200字节,如果不够,空位补空格
  2. 在包尾增加回车换行符进行分割,列如ftp协议
  3. 将特殊的分隔符作为消息的结束标志,回车换行符就是一种特殊的结束分隔符
  4. 消息分为消息头和消息体,消息头中间包含消息总长度(或则消息长度)的字段通常涉及思路为消息头第一个字段用int32来表示消息总长度。

netty对上面四种应用做了统一的抽象,提供了4种解码器来解决对应问题,使用起来非常方便。有了这些解码器,用户不需要自己对读取的报文进行人工解码,也不需要考虑TCP的粘包和拆包。

IO编程

我们简化下场景:客户端每隔两秒发送一个带有时间戳的"hello world"给服务端,服务端收到之后打印。

为了方便演示,下面例子中,服务端和客户端各一个类,把这两个类拷贝到你的IDE中,先后运行 IOServer.javaIOClient.java可看到效果。

下面是传统的IO编程中服务端实现

IOServer.java

复制代码/**
 * @author 闪电侠
 */
public class IOServer {
    public static void main(String[] args) throws Exception {

        ServerSocket serverSocket = new ServerSocket(8000);

        // (1) 接收新连接线程
        new Thread(() -> {
            while (true) {
                try {
                    // (1) 阻塞方法获取新的连接
                    Socket socket = serverSocket.accept();

                    // (2) 每一个新的连接都创建一个线程,负责读取数据
                    new Thread(() -> {
                        try {
                            byte[] data = new byte[1024];
                            InputStream inputStream = socket.getInputStream();
                            while (true) {
                                int len;
                                // (3) 按字节流方式读取数据
                                while ((len = inputStream.read(data)) != -1) {
                                    System.out.println(new String(data, 0, len));
                                }
                            }
                        } catch (IOException e) {
                        }
                    }).start();

                } catch (IOException e) {
                }

            }
        }).start();
    }
}

server端首先创建了一个serverSocket来监听8000端口,然后创建一个线程,线程里面不断调用阻塞方法 serversocket.accept();获取新的连接,见(1),当获取到新的连接之后,给每条连接创建一个新的线程,这个线程负责从该连接中读取数据,见(2),然后读取数据是以字节流的方式,见(3)。

下面是传统的IO编程中客户端实现

IOClient.java

复制代码/**
 * @author 闪电侠
 */
public class IOClient {

    public static void main(String[] args) {
        new Thread(() -> {
            try {
                Socket socket = new Socket("127.0.0.1", 8000);
                while (true) {
                    try {
                        socket.getOutputStream().write((new Date() + ": hello world").getBytes());
                        socket.getOutputStream().flush();
                        Thread.sleep(2000);
                    } catch (Exception e) {
                    }
                }
            } catch (IOException e) {
            }
        }).start();
    }
}

客户端的代码相对简单,连接上服务端8000端口之后,每隔2秒,我们向服务端写一个带有时间戳的 "hello world"。

IO编程模型在客户端较少的情况下运行良好,但是对于客户端比较多的业务来说,单机服务端可能需要支撑成千上万的连接,IO模型可能就不太合适了,我们来分析一下原因。

上面的demo,从服务端代码中我们可以看到,在传统的IO模型中,每个连接创建成功之后都需要一个线程来维护,每个线程包含一个while死循环,那么1w个连接对应1w个线程,继而1w个while死循环,这就带来如下几个问题:

  1. 线程资源受限:线程是操作系统中非常宝贵的资源,同一时刻有大量的线程处于阻塞状态是非常严重的资源浪费,操作系统耗不起
  2. 线程切换效率低下:单机cpu核数固定,线程爆炸之后操作系统频繁进行线程切换,应用性能急剧下降。
  3. 除了以上两个问题,IO编程中,我们看到数据读写是以字节流为单位,效率不高。

为了解决这三个问题,JDK在1.4之后提出了NIO。

NIO编程

关于NIO相关的文章网上也有很多,这里不打算详细深入分析,下面简单描述一下NIO是如何解决以上三个问题的。

线程资源受限

NIO编程模型中,新来一个连接不再创建一个新的线程,而是可以把这条连接直接绑定到某个固定的线程,然后这条连接所有的读写都由这个线程来负责,那么他是怎么做到的?我们用一幅图来对比一下IO与NIO

如上图所示,IO模型中,一个连接来了,会创建一个线程,对应一个while死循环,死循环的目的就是不断监测这条连接上是否有数据可以读,大多数情况下,1w个连接里面同一时刻只有少量的连接有数据可读,因此,很多个while死循环都白白浪费掉了,因为读不出啥数据。

而在NIO模型中,他把这么多while死循环变成一个死循环,这个死循环由一个线程控制,那么他又是如何做到一个线程,一个while死循环就能监测1w个连接是否有数据可读的呢? 这就是NIO模型中selector的作用,一条连接来了之后,现在不创建一个while死循环去监听是否有数据可读了,而是直接把这条连接注册到selector上,然后,通过检查这个selector,就可以批量监测出有数据可读的连接,进而读取数据,下面我再举个非常简单的生活中的例子说明IO与NIO的区别。

在一家幼儿园里,小朋友有上厕所的需求,小朋友都太小以至于你要问他要不要上厕所,他才会告诉你。幼儿园一共有100个小朋友,有两种方案可以解决小朋友上厕所的问题:

  1. 每个小朋友配一个老师。每个老师隔段时间询问小朋友是否要上厕所,如果要上,就领他去厕所,100个小朋友就需要100个老师来询问,并且每个小朋友上厕所的时候都需要一个老师领着他去上,这就是IO模型,一个连接对应一个线程。
  2. 所有的小朋友都配同一个老师。这个老师隔段时间询问所有的小朋友是否有人要上厕所,然后每一时刻把所有要上厕所的小朋友批量领到厕所,这就是NIO模型,所有小朋友都注册到同一个老师,对应的就是所有的连接都注册到一个线程,然后批量轮询。

这就是NIO模型解决线程资源受限的方案,实际开发过程中,我们会开多个线程,每个线程都管理着一批连接,相对于IO模型中一个线程管理一条连接,消耗的线程资源大幅减少

线程切换效率低下

由于NIO模型中线程数量大大降低,线程切换效率因此也大幅度提高

IO读写以字节为单位

NIO解决这个问题的方式是数据读写不再以字节为单位,而是以字节块为单位。IO模型中,每次都是从操作系统底层一个字节一个字节地读取数据,而NIO维护一个缓冲区,每次可以从这个缓冲区里面读取一块的数据, 这就好比一盘美味的豆子放在你面前,你用筷子一个个夹(每次一个),肯定不如要勺子挖着吃(每次一批)效率来得高。

简单讲完了JDK NIO的解决方案之后,我们接下来使用NIO的方案替换掉IO的方案,我们先来看看,如果用JDK原生的NIO来实现服务端,该怎么做

前方高能预警:以下代码可能会让你感觉极度不适,如有不适,请跳过

NIOServer.java

复制代码/**
 * @author 闪电侠
 */
public class NIOServer {
    public static void main(String[] args) throws IOException {
        Selector serverSelector = Selector.open();
        Selector clientSelector = Selector.open();

        new Thread(() -> {
            try {
                // 对应IO编程中服务端启动
                ServerSocketChannel listenerChannel = ServerSocketChannel.open();
                listenerChannel.socket().bind(new InetSocketAddress(8000));
                listenerChannel.configureBlocking(false);
                listenerChannel.register(serverSelector, SelectionKey.OP_ACCEPT);

                while (true) {
                    // 监测是否有新的连接,这里的1指的是阻塞的时间为1ms
                    if (serverSelector.select(1) > 0) {
                        Set<SelectionKey> set = serverSelector.selectedKeys();
                        Iterator<SelectionKey> keyIterator = set.iterator();

                        while (keyIterator.hasNext()) {
                            SelectionKey key = keyIterator.next();

                            if (key.isAcceptable()) {
                                try {
                                    // (1) 每来一个新连接,不需要创建一个线程,而是直接注册到clientSelector
                                    SocketChannel clientChannel = ((ServerSocketChannel) key.channel()).accept();
                                    clientChannel.configureBlocking(false);
                                    clientChannel.register(clientSelector, SelectionKey.OP_READ);
                                } finally {
                                    keyIterator.remove();
                                }
                            }

                        }
                    }
                }
            } catch (IOException ignored) {
            }

        }).start();


        new Thread(() -> {
            try {
                while (true) {
                    // (2) 批量轮询是否有哪些连接有数据可读,这里的1指的是阻塞的时间为1ms
                    if (clientSelector.select(1) > 0) {
                        Set<SelectionKey> set = clientSelector.selectedKeys();
                        Iterator<SelectionKey> keyIterator = set.iterator();

                        while (keyIterator.hasNext()) {
                            SelectionKey key = keyIterator.next();

                            if (key.isReadable()) {
                                try {
                                    SocketChannel clientChannel = (SocketChannel) key.channel();
                                    ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
                                    // (3) 读取数据以块为单位批量读取
                                    clientChannel.read(byteBuffer);
                                    byteBuffer.flip();
                                    System.out.println(Charset.defaultCharset().newDecoder().decode(byteBuffer)
                                            .toString());
                                } finally {
                                    keyIterator.remove();
                                    key.interestOps(SelectionKey.OP_READ);
                                }
                            }

                        }
                    }
                }
            } catch (IOException ignored) {
            }
        }).start();


    }
}

相信大部分没有接触过NIO的同学应该会直接跳过代码来到这一行:原来使用JDK原生NIO的API实现一个简单的服务端通信程序是如此复杂!

复杂得我都没耐心解释这一坨代码的执行逻辑(开个玩笑),我们还是先对照NIO来解释一下几个核心思路

  1. NIO模型中通常会有两个线程,每个线程绑定一个轮询器selector,在我们这个例子中serverSelector负责轮询是否有新的连接,clientSelector负责轮询连接是否有数据可读
  2. 服务端监测到新的连接之后,不再创建一个新的线程,而是直接将新连接绑定到clientSelector上,这样就不用IO模型中1w个while循环在死等,参见(1)
  3. clientSelector被一个while死循环包裹着,如果在某一时刻有多条连接有数据可读,那么通过 clientSelector.select(1)方法可以轮询出来,进而批量处理,参见(2)
  4. 数据的读写以内存块为单位,参见(3)

其他的细节部分,我不愿意多讲,因为实在是太复杂,你也不用对代码的细节深究到底。总之,强烈不建议直接基于JDK原生NIO来进行网络开发,下面是我总结的原因

1、JDK的NIO编程需要了解很多的概念,编程复杂,对NIO入门非常不友好,编程模型不友好,ByteBuffer的api简直反人类
2、对NIO编程来说,一个比较合适的线程模型能充分发挥它的优势,而JDK没有给你实现,你需要自己实现,就连简单的自定义协议拆包都要你自己实现
3、JDK的NIO底层由epoll实现,该实现饱受诟病的空轮训bug会导致cpu飙升100%
4、项目庞大之后,自行实现的NIO很容易出现各类bug,维护成本较高,上面这一坨代码我都不能保证没有bug

正因为如此,我客户端代码都懒得写给你看了==!,你可以直接使用IOClient.javaNIOServer.java通信

JDK的NIO犹如带刺的玫瑰,虽然美好,让人向往,但是使用不当会让你抓耳挠腮,痛不欲生,正因为如此,Netty横空出世!

Netty编程

那么Netty到底是何方神圣?
用一句简单的话来说就是:Netty封装了JDK的NIO,让你用得更爽,你不用再写一大堆复杂的代码了。
用官方正式的话来说就是:Netty是一个异步事件驱动的网络应用框架,用于快速开发可维护的高性能服务器和客户端。

下面是我总结的使用Netty不使用JDK原生NIO的原因

  1. 使用JDK自带的NIO需要了解太多的概念,编程复杂,一不小心bug横飞
  2. Netty底层IO模型随意切换,而这一切只需要做微小的改动,改改参数,Netty可以直接从NIO模型变身为IO模型
  3. Netty自带的拆包解包,异常检测等机制让你从NIO的繁重细节中脱离出来,让你只需要关心业务逻辑
  4. Netty解决了JDK的很多包括空轮询在内的bug
  5. Netty底层对线程,selector做了很多细小的优化,精心设计的reactor线程模型做到非常高效的并发处理
  6. 自带各种协议栈让你处理任何一种通用协议都几乎不用亲自动手
  7. Netty社区活跃,遇到问题随时邮件列表或者issue
  8. Netty已经历各大rpc框架,消息中间件,分布式通信中间件线上的广泛验证,健壮性无比强大

看不懂没有关系,这些我们在后续的课程中我们都可以学到,接下来我们用Netty的版本来重新实现一下本文开篇的功能吧

首先,引入Maven依赖

复制代码    <dependency>
        <groupId>io.netty</groupId>
        <artifactId>netty-all</artifactId>
        <version>4.1.6.Final</version>
    </dependency>

然后,下面是服务端实现部分

NettyServer.java

复制代码/**
 * @author 闪电侠
 */
public class NettyServer {
    public static void main(String[] args) {
        ServerBootstrap serverBootstrap = new ServerBootstrap();

        NioEventLoopGroup boos = new NioEventLoopGroup();
        NioEventLoopGroup worker = new NioEventLoopGroup();
        serverBootstrap
                .group(boos, worker)
                .channel(NioServerSocketChannel.class)
                .childHandler(new ChannelInitializer<NioSocketChannel>() {
                    protected void initChannel(NioSocketChannel ch) {
                        ch.Pipeline().addLast(new StringDecoder());
                        ch.Pipeline().addLast(new SimpleChannelInboundHandler<String>() {
                            @Override
                            protected void channelRead0(ChannelHandlerContext ctx, String msg) {
                                System.out.println(msg);
                            }
                        });
                    }
                })
                .bind(8000);
    }
}

这么一小段代码就实现了我们前面NIO编程中的所有的功能,包括服务端启动,接受新连接,打印客户端传来的数据,怎么样,是不是比JDK原生的NIO编程优雅许多?

初学Netty的时候,由于大部分人对NIO编程缺乏经验,因此,将Netty里面的概念与IO模型结合起来可能更好理解

1.boos对应,IOServer.java中的接受新连接线程,主要负责创建新连接
2.worker对应 IOClient.java中的负责读取数据的线程,主要用于读取数据以及业务逻辑处理

然后剩下的逻辑我在后面的系列文章中会详细分析,你可以先把这段代码拷贝到你的IDE里面,然后运行main函数

然后下面是客户端NIO的实现部分

NettyClient.java

复制代码/**
 * @author 闪电侠
 */
public class NettyClient {
    public static void main(String[] args) throws InterruptedException {
        Bootstrap bootstrap = new Bootstrap();
        NioEventLoopGroup group = new NioEventLoopGroup();

        bootstrap.group(group)
                .channel(NioSocketChannel.class)
                .handler(new ChannelInitializer<Channel>() {
                    @Override
                    protected void initChannel(Channel ch) {
                        ch.Pipeline().addLast(new StringEncoder());
                    }
                });

        Channel channel = bootstrap.connect("127.0.0.1", 8000).channel();

        while (true) {
            channel.writeAndFlush(new Date() + ": hello world!");
            Thread.sleep(2000);
        }
    }
}

在客户端程序中,group对应了我们IOClient.java中main函数起的线程,剩下的逻辑我在后面的文章中会详细分析,现在你要做的事情就是把这段代码拷贝到你的IDE里面,然后运行main函数,最后回到NettyIOServer.java的控制台,你会看到效果。

使用Netty之后是不是觉得整个世界都美好了,一方面Netty对NIO封装得如此完美,写出来的代码非常优雅,另外一方面,使用Netty之后,网络通信这块的性能问题几乎不用操心。

如果你工作中需要接触到网络编程,Netty必将是你的最佳选择,在后续的文章中,闪电侠将基于一个简单的消息推送系统带领你从0到1系统学习Netty各方面的知识,并且每一篇文章都会有一个视频课程对应,每一篇文章对应源码我也会放到github上。

首先看到,我们创建了两个NioEventLoopGroup,这两个对象可以看做是传统IO编程模型的两大线程组,boosGroup表示监听端口,创建新连接的线程组,workerGroup表示处理每一条连接的数据读写的线程组,不理解的同学可以看一下上一篇博文《跟闪电侠学Netty》开篇:Netty是什么?。用生活中的例子来讲就是,一个工厂要运作,必然要有一个老板负责从外面接活,然后有很多员工,负责具体干活,老板就是boosGroup,员工们就是workerGroupboosGroup接收完连接,扔给workerGroup去处理。

接下来 我们创建了一个引导类 ServerBootstrap,这个类将引导我们进行服务端的启动工作,直接new出来开搞。

我们通过.group(boosGroup, workerGroup)给引导类配置两大线程,这个引导类的线程模型也就定型了。

然后,我们指定我们服务端的IO模型为NIO,我们通过.channel(NioServerSocketChannel.class)来指定IO模型,当然,这里也有其他的选择,如果你想指定IO模型为BIO,那么这里配置上OioServerSocketChannel.class类型即可,当然通常我们也不会这么做,因为Netty的优势就在于NIO。

接着,我们调用childHandler()方法,给这个引导类创建一个ChannelInitializer,这里主要就是定义后续每条连接的数据读写,业务处理逻辑,不理解没关系,在后面我们会详细分析。ChannelInitializer这个类中,我们注意到有一个泛型参数NioSocketChannel,这个类呢,就是Netty对NIO类型的连接的抽象,而我们前面NioServerSocketChannel也是对NIO类型的连接的抽象,NioServerSocketChannelNioSocketChannel的概念可以和BIO编程模型中的ServerSocket以及Socket两个概念对应上

我们的最小化参数配置到这里就完成了,我们总结一下就是,要启动一个Netty服务端,必须要指定三类属性,分别是线程模型、IO模型、连接读写处理逻辑,有了这三者,之后在调用bind(8000),我们就可以在本地绑定一个8000端口启动起来,以上这段代码读者可以直接拷贝到你的IDE中运行。

链接:https://juejin.cn/post/6844903621813862408

Netty基本概念

ByteBuf

网络数据的基本单位是字节。JAVANIO提供了ByteBuffer作为它的字节容器,但是这个类使用起来过于复杂,而且也有些繁琐。

Netty 的 ByteBuffer 替代品是 ByteBuf,一个强大的实现,既解决了 JDK API 的局限性, 又为网络应用程序的开发者提供了更好的 API。

优点

  • 通过内置的复合缓冲区实现了透明的零拷贝。
  • 容量可以按需增长
  • 在度和谐这俩种模式之间切换不需要调用ByteBuffer的flip()方法
  • 读和写使用了不同的索引
  • 支持引用计数
  • 支持池化。
  • 所有的网络通信都会涉及到字节序列的移动

结构

从结构上来说,ByteBuf是由一串字节数组 构成。数组中每个字节用来存放信息。

ByteBuf提供了俩个索引,一个用于读取数据,一个用于写入数据。这俩个索引通过在字节数组中移动,来定位需要读或者写的位置。

当从 ByteBuf 读取时,它的 readerIndex(读索引)将会根据读取的字节数递增。

同样,当写 ByteBuf 时,它的 writerIndex 也会根据写入的字节数进行递增。

ByteBuf有几个重要属性:

  • capacity:容量;
  • 0:缓冲区开始位置;
  • readIndex:下一个读位置;
  • writeIndex:下一个写位置;

已经读取的区域:[0,readerindex)

可读取的区域:[readerindex,writerIndex)

可写的区域: [writerIndex,capacity)

ByteBuf维护了俩个不同的索引,名称以read或者write开头的ByteBuf方法,将会推进对应的索引,而名称set或者get开头的不会。

可以指定ByteBuf的最大容量。试图移动写索引(即writerdex)超过这个值将会触发异常。(默认的限制是Integer.MAX_VALUE)

如果readerIndex超过了writerIndex的时候,Netty会抛出IndexOutOf-BoundsException异常

ByteBuf都是基于字节序列的,类似于一个字节数组。在AbstractByteBuf里面定义了下面5个变量:

//源码
int readerIndex; //读索引
int writerIndex; //写索引
private int markedReaderIndex;//标记读索引
private int markedWriterIndex;//标记写索引
private int maxCapacity;//缓冲区的最大容量

使用模式

堆缓冲区

最常用的ByteBuf模式是将数据存储在JVM的堆空间中。这种模式被称为支撑数组(backing array),它能在没有使用池化的情况下提供快速的分配和释放。可以由hasArray()来判断检查ButeBuf是否有数组支撑。如果不是,则这是一个直接缓冲区。

直接缓冲区

直接缓冲区是另一种ByteBuf模式

直接缓冲区的主要缺点是,相对于基于堆的缓冲区,它们的分配和释放都比较昂贵

复合缓冲区

复合缓冲区 CompositeByteBuf,它为多个 ByteBuf 提供一个聚合视图。比如 HTTP 协议, 分为消息头和消息体,这两部分可能由应用程序的不同模块产生,各有各的 ByteBuf,将会在消息被发送的时候组装为一个 ByteBuf,此时可以将这两个 ByteBuf 聚合为一个 CompositeByteBuf,然后使用统一和通用的 ByteBuf API 来操作。

分配

ByteBufAllocator 接口

ByteBufAllocator 分配我们所描述过的任意类型的 ByteBuf 实例。

名称描述
buffer()返回一个基于堆或者直接内存存储的 ByteBuf
heapBuffer()返回一个基于堆内存存储的 ByteBuf
directBuffer()返回一个基于直接内存存储的 ByteBuf
compositeBuffer()返回一个可以通过添加最大到指定数目的基于堆的或者直接
ioBuffer()返回一个用于套接字的 I/O 操作的 ByteBuf,当所运行的环境具有 sun.misc.Unsafe 支持时,返回基于直接内存存储的 ByteBuf,

可以通过 Channel(每个都可以有一个不同的 ByteBufAllocator 实例)或者绑定到 ChannelHandler 的 ChannelHandlerContext 获取一个到 ByteBufAllocator 的引用。

//从channel 获取ByteBufAllocator的引用
Channel channel = ctx.channel();
ByteBufAllocator alloc = channel.alloc();

//从ChannelHandlerContext获取一个ByteBufAllocator的引用
ByteBufAllocator alloc1 = ctx.alloc();

Netty提供了俩种ByyteBufAllocator实现PooledByteBufAllocator 和 Unpooled-ByteBufAllocator。

前者池化了ByteBuf的实例以提高性能最大限度地减少内存碎片。后者实现不池化ByteBuf实例,并且它被调用时都会返回一个新的实例。

netty4.1默认使用了PooledByteBufAlloactor

Unpooled 缓冲区

netty提供了一个简单的称为Unpooled的工具类,它提供了静态的辅助方法来创建未池化的ByteBuf实例

buffer() 返回一个未池化的基于堆内存存储的 ByteBuf

directBuffer()返回一个未池化的基于直接内存存储的 ByteBuf

wrappedBuffer() 返回一个包装了给定数据的 ByteBuf

copiedBuffer() 返回一个复制了给定数据的 ByteBuf

Unpooled 类还可用于 ByteBuf 同样可用于那些并不需要 Netty 的其他组件的非网络项目。

Unpooled.copiedBuffer("Hello,Netty",CharsetUtil.UTF_8)

使用

访问读写操作

如同普通的java字节数组中一样,ByteBuf的索引是从0开始的:第1个字节的索引是0,最后一个字节的索引总是capcity()-1。使用哪些需要一个索引值参数(随机访问,即是数组下标)的方法来访问数据,既不会改变readerIndex,也不会改writerIndex。如果有需要,也可以通过调用ReadIndex(index)或者writeIndex(index)来手动移动这俩者。

顺序访问通过索引访问。有俩种类别的读、写操作:

get()和set()操作从给定的索引开始,并且保持索引不变;get+数据字长(bool.byte,int,short,long,bytes)

reader()和write()操作,从给定的索引开始,并且会根据已经访问过的字节数对索引进行调整

更多操作

isReadable() 如果至少有一个字节可供读取,则返回 true

isWritable() 如果至少有一个字节可被写入,则返回 true

readableBytes() 返回可被读取的字节数

writableBytes() 返回可被写入的字节数

capacity() 返回 ByteBuf 可容纳的字节数。在此之后,它会尝试再次扩展直到达到

maxCapacity()

maxCapacity() 返回 ByteBuf 可以容纳的最大字节数

hasArray() 如果 ByteBuf 由一个字节数组支撑,则返回 true

array() 如果 ByteBuf 由一个字节数组支撑则返回该数组;否则,它将抛出一个

UnsupportedOperationException 异常


可丢弃字节

可丢弃的分段包含了已经被读过的字节。通过调用discardReadBytes()方法,可以丢弃它们并回收空间。这个分段大小为0,存储在readIndex中,会随着read操作的执行而增加(get*操作不会移动readIndex)

缓冲区上调用 discardReadBytes()方法后,可丢弃字节分段中的空间已经变为可写的了。 频繁地调用 discardReadBytes()方法以确保可写分段的最大化,但是请注意,这将极有可能会导致内存复制,因为可读字节必须被移动到缓冲区的开始位置。建议只在有真正需要的时候 才这样做,例如,当内存非常宝贵的时候。

可读字节

ByteBuf的可读字节分段存储了实际数据。新分配的,包装的或者复制的缓冲区默认的readerIndex值为0。

可写字节

可写字节分段是指一个拥有未定义内容的、写入就绪的内存区域。新分配的缓冲区的 writerIndex 的默认值为 0。任何名称以 write 开头的操作都将从当前的 writerIndex 处开始写数据,并将它增加已经写入的字节数。

查找操作

在 ByteBuf 中有多种可以用来确定指定值的索引的方法。最简单的是使用 indexOf()方法。 较复杂的查找可以通过调用 forEachByte()。

派生缓冲区

派生缓冲区为 ByteBuf 提供了以专门的方式来呈现其内容的视图。这类视图是通过以下方法被创建的:

duplicate();

slice();

slice(int, int);

Unpooled.unmodifiableBuffer(…);

order(ByteOrder);

readSlice(int)。

每个这些方法都将返回一个新的 ByteBuf 实例,它具有自己的读索引、写索引和标记索引。其内部存储和 JDK 的 ByteBuffer 一样也是共享的。

ByteBuf复制如果需要一个现有缓冲区的真实副本,请使用copy()或者copy(int,int)方法。不同于派生缓冲区,由这个调用所返回的ByteBuf拥有独立的数据副本。

引用计数

引用计数是一种通过在某个对象所持有的资源不再被其他对象引用时释放该对象所持有的资源来优化内存使用和性能的技术。Netty 在第 4 版中为 ByteBuf 引入了引用计数技术, interface ReferenceCounted。

工具类

ByteBufUtil 提供了用于操作 ByteBuf 的静态的辅助方法。因为这个 API 是通用的,并且和池化无关,所以这些方法已然在分配类的外部实现。

这些静态方法中最有价值的可能就是 hexdump()方法,它以十六进制的表示形式打印 ByteBuf 的内容。这在各种情况下都很有用,例如,出于调试的目的记录 ByteBuf 的内容。

十六进制的表示通常会提供一个比字节值的直接表示形式更加有用的日志条目,此外,十六进制的版本还可以很容易地转换回实际的字节表示。

另一个有用的方法是 boolean equals(ByteBuf, ByteBuf),它被用来判断两个 ByteBuf 实例的相等性。

资源释放

当某个 ChannelInboundHandler 的实现重写 channelRead()方法时,它要负责显式地释放与池化的 ByteBuf 实例相关的内存。Netty 为此提供了一个实用方法

ReferenceCountUtil.release()

Netty 将使用 WARN 级别的日志消息记录未释放的资源,使得可以非常简单地在代码中发现违规的实例。但是以这种方式管理资源可能很繁琐。一个更加简单的方式是使用

SimpleChannelInboundHandler,SimpleChannelInboundHandler 会自动释放资源。

1、对于入站请求,Netty 的 EventLoop 在处理 Channel 的读操作时进行分配 ByteBuf,对于这类 ByteBuf,需要我们自行进行释放,有三种方式,或者使用

SimpleChannelInboundHandler,或者在重写 channelRead()方法使用

//SimpleChannelInboundHandler
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    boolean release = true;

    try {
        if (this.acceptInboundMessage(msg)) {
            this.channelRead0(ctx, msg);
        } else {
            release = false;
            ctx.fireChannelRead(msg);
        }
    } finally {
       // 释放资源
        if (this.autoRelease && release) {
            ReferenceCountUtil.release(msg);
        }

    }

}

ReferenceCountUtil.release()或者使用 ctx.fireChannelRead 继续向后传递;

2、对于出站请求,不管 ByteBuf 是否由我们的业务创建的,当调用了 write 或者 writeAndFlush 方法后,Netty 会自动替我们释放,不需要我们业务代码自行释放。

  public static void main(String[] args) {
        // 创建byteBuf对象,该对象内部包含一个字节数组byte[10]
        ByteBuf byteBuf = Unpooled.buffer(1);
        System.out.println("byteBuf=" + byteBuf);

        for (int i = 0; i < 8; i++) {
            byteBuf.writeByte(i);
        }
        System.out.println("byteBuf=" + byteBuf);

        for (int i = 0; i < 5; i++) {
            System.out.println(byteBuf.getByte(i));
        }
        System.out.println("byteBuf=" + byteBuf);

        for (int i = 0; i < 5; i++) {
            System.out.println(byteBuf.readByte());
        }
        System.out.println("byteBuf=" + byteBuf);


        //用Unpooled工具类创建ByteBuf
        ByteBuf byteBuf2 = Unpooled.copiedBuffer("hello,world!", CharsetUtil.UTF_8);
        //使用相关的方法
        if (byteBuf2.hasArray()) {
            byte[] content = byteBuf2.array();
            //将 content 转成字符串
            System.out.println(new String(content, CharsetUtil.UTF_8));
            System.out.println("byteBuf2=" + byteBuf2);

            System.out.println(byteBuf2.getByte(0)); // 获取数组0这个位置的字符h的ascii码,h=104

            int len = byteBuf2.readableBytes(); //可读的字节数  12
            System.out.println("len=" + len);

            //使用for取出各个字节
            for (int i = 0; i < len; i++) {
                System.out.println((char) byteBuf2.getByte(i));
            }

            //范围读取
            System.out.println(byteBuf2.getCharSequence(0, 6, CharsetUtil.UTF_8));
            System.out.println(byteBuf2.getCharSequence(6, 6, CharsetUtil.UTF_8));
        }
    }

从结果可以看出get时候readerindex不会移动,read时候readerindex会移动

Last modification:December 1, 2023
如果觉得我的文章对你有用,请随意赞赏