netty入门

简介

netty是一个异步的,基于事件驱动的网络应用框架,用于快速开发可维护,高性能的网络服务器客户端

netty在网络应用框架中的地位就好比spring在javaEE开发中的地位

以下框架都是用了netty,因为它们都有网络通信需求

  • Cassandra-nosql数据库
  • spark
  • hadoop
  • rocketMQ
  • ElasticSearch
  • gRPC
  • Dubbo
  • Spring5.0 flux api完全放弃了tomcat,是用netty作为服务器端
  • zookeeper

优势

自己使用NIO开发工作量大,bug多,要自己解决tcp传输问题,如黏包半包

epoll空轮询导致100%,对API进行增强,使之更易用

Netty vs Mina

mina由apche维护,3.x版本可能有较大重构,破坏API向下兼容性,Netty的开发迭代更迅速

demo

服务端

package netty;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;

/**
 * @Author: suiyi
 * @Date: 2022/9/2 9:56
 */
public class HelloServer {
    public static void main(String[] args) {
        //启动器,负责组装netty组件
        new ServerBootstrap()
                //加入事件组 包含了线程和选择器
                .group(new NioEventLoopGroup())
                //选择服务器的ServerSocketChannel实现 NIO
                .channel(NioServerSocketChannel.class)
                //boss 负责处理连接 worker(child)负责处理读写  决定了worker 能执行哪些操作
                .childHandler(
                        //channel 和客户端进行数据读写的通道 Initializer 初始化器,负责添加别的handler
                        new ChannelInitializer<NioSocketChannel>() {
                    @Override
                    protected void initChannel(NioSocketChannel ch) throws Exception {
                        ch.pipeline().addLast(new StringDecoder());//将byteBuf转为字符串
                        ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){   //自定义handler
                            @Override//读事件
                            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                //打印转换好的字符串
                                System.out.println(msg);
//                                super.channelRead(ctx, msg);
                            }
                        });
                    }
                    //监听端口
                }).bind(8077);
    }
}

客户端

package netty;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringEncoder;

import java.net.InetSocketAddress;

/**
 * @Author: suiyi
 * @Date: 2022/9/2 10:13
 */
public class HelloClient {
    public static void main(String[] args) throws InterruptedException {
        //启动类
        new Bootstrap()
                //添加eventLoop
                .group(new NioEventLoopGroup())
                //选择客户端channel事件
                .channel(NioSocketChannel.class)
                //添加处理器
                .handler(new ChannelInitializer<NioSocketChannel>() {
                    @Override//在连接建立以后被调用,初始化
                    protected void initChannel(NioSocketChannel ch) throws Exception {
                        ch.pipeline().addLast(new StringEncoder());
                    }
                })
                .connect(new InetSocketAddress("localhost",8077))
                .sync()//同步阻塞方法,知道连接建立后才会继续执行
                .channel()//代表的是连接对象
                //向访问器发送数据
                .writeAndFlush("hello,world!");
    }
}

提示

把channel理解为数据通道

把msg理解为在流动的数据,最开始输入是bytebuf但经过pipline的加工,会变成其他类型的对象,最后输出又变成byteBuf

把handler理解为数据的处理工具

  • 由多个工序,合在一起就是pipeline,pipline负责发布事件,传播给每个handler,handler对自己感兴趣的事件进行处理
  • handler分inbound和outbound俩类(入站,出站)

把eventLoop理解为处理数据的工人

  • 工人可以管理多个channel的io操作,一旦工人负责了某个channel就要负责到底(绑定)
  • 工人既可以执行io操作,也可以进行任务处理,每位工人都有任务队列,队列里可以对方多个channel待处理任务,任务分为普通,定时任务
  • 工人按照pipeline顺序,依次按照handler的规划(代码)处理数据,可以为每到工序指定不同的工人

组件

EventLoop

事件循环对象

eventLoop本质是一个单线程执行器(同时维护了一个selector),里面有run方法处理Channel上源源不断的IO事件

继承关系

  • 一条线是JUC下ScheduledExecutorService,因此包含了线程池中所有的方法
  • 另一条线是继承自netty自己的orderedEventExecutor

定时任务

package netty;

import io.netty.channel.DefaultEventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.util.NettyRuntime;

import java.util.concurrent.TimeUnit;

/**
 * @Author: suiyi
 * @Date: 2022/9/2 11:09
 */
public class TestEventGroup {
    public static void main(String[] args) {
        //创建时间循环组
        //这个类可以处理io事件,普通任务,定时任务
        NioEventLoopGroup group = new NioEventLoopGroup(2);//默认是电脑核心数*2
//        DefaultEventLoopGroup eventExecutors = new DefaultEventLoopGroup();//普通任务 定时任务
        //获取下一个事件循环对象
        System.out.println(group.next());
        System.out.println(group.next());
        System.out.println(group.next());

        //执行普通任务
//        group.next().execute(()->{
//            try {
//                Thread.sleep(1000);
//            } catch (InterruptedException e) {
//                e.printStackTrace();
//            }
//            System.out.println("ok");
//        });

        //执行定时任务
        group.next().scheduleAtFixedRate(()->{
            System.out.println("ok");
        },0,1, TimeUnit.SECONDS);
        //从0秒后开始执行,每秒执行一次



    }
}

io任务

服务器端

package netty;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;

import java.nio.charset.Charset;

/**
 * @Author: suiyi
 * @Date: 2022/9/2 13:28
 */
public class EventLoopServer {
    public static void main(String[] args) {
        new ServerBootstrap()
                //boss 和w worker
                //boss只负责 serverSocketChannel 的 accept事件 worker只负责SocketChannel上的读写
                .group(new NioEventLoopGroup(),new NioEventLoopGroup())
                .channel(NioServerSocketChannel.class)
                .childHandler(new ChannelInitializer<NioSocketChannel>() {
                    @Override
                    protected void initChannel(NioSocketChannel ch) throws Exception {
                        ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){
                            @Override               //bytebuf
                            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                ByteBuf buf = (ByteBuf) msg;
                                System.out.print(Thread.currentThread());//只会用同一个线程处理当前请求
                                System.out.println(buf.toString(Charset.defaultCharset()));
                            }
                        });
                    }
                })
                .bind(8077);
    }
}

客户端

package netty;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringEncoder;

import java.net.InetSocketAddress;

/**
 * @Author: suiyi
 * @Date: 2022/9/2 10:13
 */
public class EventLoopClient {
    public static void main(String[] args) throws InterruptedException {
        //启动类
        Channel channel = new Bootstrap()
                //添加eventLoop
                .group(new NioEventLoopGroup())
                //选择客户端channel事件
                .channel(NioSocketChannel.class)
                //添加处理器
                .handler(new ChannelInitializer<NioSocketChannel>() {
                    @Override//在连接建立以后被调用,初始化
                    protected void initChannel(NioSocketChannel ch) throws Exception {
                        ch.pipeline().addLast(new StringEncoder());
                    }
                })
                .connect(new InetSocketAddress("localhost", 8077))
                .sync()//同步阻塞方法,知道连接建立后才会继续执行
                .channel();//代表的是连接对象
        //如果断点发送的话,要右键断点选择thread不然会阻塞nio的线程,导致无法正常发送数据
        channel.writeAndFlush("qweqwe123asdqwezcQASDQWE");
        channel.writeAndFlush("1111111111");

    }
}

一旦建立连接,该连接的所有请求都会让一个eventloop进行处理

进一步细分指责,把handler的执行权交给不同的group

package netty;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.DefaultEventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;

import java.nio.charset.Charset;

/**
 * @Author: suiyi
 * @Date: 2022/9/2 13:28
 */
public class EventLoopServer {
    public static void main(String[] args) {
        //细分2 创建一个独立的eventGroup
        DefaultEventLoopGroup group = new DefaultEventLoopGroup();
        new ServerBootstrap()
        //boss 和w worker
                //boss只负责 serverSocketChannel 的 accept事件 worker只负责SocketChannel上的读写
                .group(new NioEventLoopGroup(),new NioEventLoopGroup())
                .channel(NioServerSocketChannel.class)
                .childHandler(new ChannelInitializer<NioSocketChannel>() {
                    @Override
                    protected void initChannel(NioSocketChannel ch) throws Exception {
                        ch.pipeline().addLast("handler1",new ChannelInboundHandlerAdapter(){
                            @Override               //bytebuf
                            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                ByteBuf buf = (ByteBuf) msg;
                                System.out.print(Thread.currentThread());//只会用同一个线程处理当前请求
                                System.out.println(buf.toString(Charset.defaultCharset()));
                                ctx.fireChannelRead(msg);//消息传给下一个handler
                            }
                        }).addLast(group,"handler2",new ChannelInboundHandlerAdapter(){
                            @Override               //bytebuf
                            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                ByteBuf buf = (ByteBuf) msg;
                                System.out.print(Thread.currentThread());//只会用同一个线程处理当前请求
                                System.out.println(buf.toString(Charset.defaultCharset()));
                            }
                        });
                    }
                })
                .bind(8077);
    }
}

根据指责不同去进一步的分工.可以让单独的group处理对应的代码

源码

如果俩个handler绑定的是同一个线程,那么就直接调用,否则把要调用的代码封装成一个任务对象,由下一个handler的线程来调用

Channel

  • calse()关闭channel
  • claseFuture()处理channel的关闭

    • sync方法是同步等待channel关闭
    • addListener方法是异步等待channel关闭
  • pipeline()方法添加处理器
  • write()方法将数据写入
  • writeAndFlush()方法将数据写入并刷出

channelfuture

在客户端demo中.connect是一个异步非阻塞方法

异步处理

package netty;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringEncoder;

import java.net.InetSocketAddress;

/**
 * @Author: suiyi
 * @Date: 2022/9/2 10:13
 */
public class EventLoopClient {
    public static void main(String[] args) throws InterruptedException {
        //带有future,promise的类型都是和异步方法配套使用的,用来处理结果
        ChannelFuture channelFuture = new Bootstrap()
                //添加eventLoop
                .group(new NioEventLoopGroup())
                //选择客户端channel事件
                .channel(NioSocketChannel.class)
                //添加处理器
                .handler(new ChannelInitializer<NioSocketChannel>() {
                    @Override//在连接建立以后被调用,初始化
                    protected void initChannel(NioSocketChannel ch) throws Exception {
                        ch.pipeline().addLast(new StringEncoder());
                    }
                })
                //异步非阻塞方法 main发起了调用,真正执行connect是nio线程
                .connect(new InetSocketAddress("localhost", 8077));

//同步阻塞方法,直到连接建立后才会继续执行,如果不执行这个方法
//        ,获取到的channel就是还没准备好的channel,无法发送数据
///        channelFuture.sync();
        //代表的是连接对象



        channelFuture.addListener(new ChannelFutureListener() {
            @Override//在nio线程连接建立之后,会调用这个方法
            public void operationComplete(ChannelFuture future) throws Exception {
                Channel channel = future.channel();
                channel.writeAndFlush("hi");
                channel.writeAndFlush("hi hi");
            }
        });

    }
}

处理关闭,实现发送q实现关闭连接

package netty;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringEncoder;

import javax.sound.sampled.Line;
import java.net.InetSocketAddress;
import java.util.Scanner;

/**
 * @Author: suiyi
 * @Date: 2022/9/2 10:13
 */
public class CloseFutureClient {
    public static void main(String[] args) throws InterruptedException {
        NioEventLoopGroup group = new NioEventLoopGroup();
        ChannelFuture channelFuture = new Bootstrap()
                .group(group)
                .channel(NioSocketChannel.class)
                .handler(new ChannelInitializer<NioSocketChannel>() {
                    @Override//在连接建立以后被调用,初始化
                    protected void initChannel(NioSocketChannel ch) throws Exception {
                        ch.pipeline().addLast(new StringEncoder());
                    }
                })
                .connect(new InetSocketAddress("localhost", 8077));
        Channel channel = channelFuture.sync().channel();
        new Thread(()->{
            Scanner scanner = new Scanner(System.in);
            while (true){
                String line = scanner.nextLine();
                if ("q".equals(line)){
                   channel.close();//close 也是异步操作
                   break;
               }
                channel.writeAndFlush(line);
            }
        },"input").start();

        ChannelFuture closeFuture = channel.closeFuture();

        //获取closedFuture对象,它也有俩种方式 1就是同步等待
//        System.out.println("正在等待关闭");
//        closeFuture.sync();//同步处理,等待关闭后才执行
//        System.out.println("处理关闭之后的操作");


        //异步回调,可简化为lambda
        closeFuture.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture channelFuture) throws Exception {
                System.out.println("处理关闭之后的操作");
                group.shutdownGracefully();//结束进程
            }
        });
    }
}

future&promise

在异步处理时,经常用到这俩个接口

首先要说明的是netty的future与jdk的future同名,但是是俩个接口,netty的future继承自jdk的future,而promise又对netty Future接口进行了拓展

  • jdk future只能同步等待任务结束(或许成功,或许失败),才能得到结果
  • netty future可以同步等待任务结束得到结果,也可以异步方式得到结果,但都要等任务结束
  • netty prowmise 不仅有netty future的功能,而且脱离了任务独立存在,只作为俩个线程间传递结果的容器

jdk自带future

package netty.nio2;

import java.util.concurrent.*;

import static java.util.concurrent.Executors.*;

/**
 * @Author: suiyi
 * @Date: 2022/9/5 10:16
 */
public class TestJdkFuture {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        //线程池
        ExecutorService service= newFixedThreadPool(2);
        //提交任务
        Future<Integer> future = service.submit(new Callable<Integer>() {
            @Override
            public Integer call() throws Exception {
                Thread.sleep(3000);
                return 50;
            }
        });
        //自行车通过future来获取结果
        System.out.println(future.get());
    }
}

netty的Future

package netty.nio2;

import io.netty.channel.EventLoop;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;

/**
 * @Author: suiyi
 * @Date: 2022/9/5 13:10
 */
public class TestNettyFuture {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        NioEventLoopGroup group = new NioEventLoopGroup();
        EventLoop eventLoop = group.next();
        Future<Integer> future = eventLoop.submit(new Callable<Integer>() {
            @Override
            public Integer call() throws Exception {
                Thread.sleep(3000);
                return 70;
            }
        });
//        System.out.println(future.get());
        future.addListener(new GenericFutureListener<Future<? super Integer>>() {
            @Override
            public void operationComplete(Future<? super Integer> future) throws Exception {
                System.out.println(future.getNow());
            }
        });
        System.out.println("执行到结尾");
    }
}

netty promise

package netty.nio2;

import io.netty.channel.EventLoop;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.util.concurrent.DefaultPromise;

import java.util.concurrent.ExecutionException;

/**
 * @Author: suiyi
 * @Date: 2022/9/5 13:22
 */
public class TestNettyPromise {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        //准备event
        EventLoop eventLoop = new NioEventLoopGroup().next();
        //可以主动创建promise,结果容器
        DefaultPromise<Integer> promise = new DefaultPromise<Integer>(eventLoop);
        new Thread(()->{
            try {
//                int i=1/0;
                promise.setSuccess(80);
                Thread.sleep(3000);
            } catch (Exception e) {
                e.printStackTrace();
                promise.setFailure(e);
            }

        }).start();
        //在这里如果有异常的话,主线程会拿到这个异常
        System.out.println(promise.get());
    }
}

Handler&pipline

channelHandler用来处理Channel上的各种事件,分为入站,出站俩种,所有的channelHAndler被连成一串,就是pipeline

  • 出站处理器通常是ChannelInboundHandlerAdapter的子类,主要用来读取客户端数据,写回结果
  • 出站处理器通常是ChannelOutboundHandlerAdapter的子类,主要对回写的结果进行加工

channel是一个产品的加工车间,pipline是车间中的流水线,ChannelHandler就是流水线上的各个工序,而后面要讲的ByteBuf是原材料先经过一道道入站工序,再经过一道道出站工序最终变成产品

package netty.nio2;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;

/**
 * @Author: suiyi
 * @Date: 2022/9/5 14:03
 */
public class TestPipeline {
    public static void main(String[] args) {
        new ServerBootstrap()
                .group(new NioEventLoopGroup())
                .channel(NioServerSocketChannel.class)
                .childHandler(new ChannelInitializer<NioSocketChannel>() {
                    @Override
                    protected void initChannel(NioSocketChannel ch) throws Exception {
                        //通过channel拿到pipeline
                        ChannelPipeline pipeline = ch.pipeline();
                        //添加处理器 head-> h1->h2->h3->tail 底层是双向列表
                        //入栈和出站顺序是反过来的
                        pipeline.addLast("h1",new ChannelInboundHandlerAdapter(){
                            @Override
                            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                System.out.println(1);
                                super.channelRead(ctx, msg);//把数据传递给下个handler,如果不调用,调用链会断开
                            }
                        });
                        pipeline.addLast("h2",new ChannelInboundHandlerAdapter(){
                            @Override
                            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                System.out.println(2);
                                super.channelRead(ctx, msg);
                            }
                        });
                        pipeline.addLast("h3",new ChannelInboundHandlerAdapter(){
                            @Override
                            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                System.out.println(3);
                                super.channelRead(ctx, msg);
                                //分配了一个bytebuffer对象,写入server这个字符串的字节数组 
                                // 只有执行这个操作才会触发出站处理器
                                ch.writeAndFlush(ctx.alloc().buffer().writeBytes("server...".getBytes()));
                                //如果这里是ctx.write的话,就是从当前位置往前找
                            }
                        });
                        pipeline.addLast("h4",new ChannelOutboundHandlerAdapter(){

                            @Override
                            public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
                                System.out.println(4);
                                super.write(ctx, msg, promise);
                            }
                        });
                        pipeline.addLast("h5",new ChannelOutboundHandlerAdapter(){

                            @Override
                            public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
                                System.out.println(5);
                                super.write(ctx, msg, promise);
                            }
                        });
                    }
                }).bind(7088);
    }
}

输出结果

1
2
3
5
4

可以看到ChannelInboundHandlerAdapter是按照addLast的顺序执行的,而ChannelOutboundHandlerAdapter是按照addLast的逆序执行的.Channelpipeline实现是一个ChannelHandlerContext(包装了ChannelHandler)组成的双向列表

EmbeddedChannel

netty提供的测试工具类

package netty.nio2;

import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.embedded.EmbeddedChannel;
import io.netty.channel.socket.nio.NioSocketChannel;

/**
 * @Author: suiyi
 * @Date: 2022/9/5 16:22
 */
public class TestEmbeddedChannel {
    public static void main(String[] args) {
        ChannelInboundHandlerAdapter h1 = new ChannelInboundHandlerAdapter() {
            @Override
            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                System.out.println(1);
                super.channelRead(ctx, msg);
            }
        };
        ChannelInboundHandlerAdapter h2 = new ChannelInboundHandlerAdapter() {
            @Override
            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                System.out.println(2);
                super.channelRead(ctx, msg);
            }
        };
        //模拟入栈操作,writeInbound,模拟出站操作可以用writeOutbound
        EmbeddedChannel channel = new EmbeddedChannel(h1,h2);
        channel.writeInbound(ByteBufAllocator.DEFAULT.buffer().writeBytes("hello".getBytes()));

    }
}

ByteBuf

是对字节数据的封装

ByteBufallocator.Default.buffer(10)

创建一个默认的ByteBuf,初始容量是10

package netty.nio2;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;

/**
 * @Author: suiyi
 * @Date: 2022/9/6 9:16
 */
public class TestByteBuf {
    public static void main(String[] args) {
        ByteBuf buf = ByteBufAllocator.DEFAULT.buffer();
        System.out.println(buf);
        StringBuilder sb = new StringBuilder();
        for (int i=0;i<300;i++){
            sb.append("a");
        }
        ByteBuf byteBuf = buf.writeBytes(sb.toString().getBytes());
        System.out.println(byteBuf);

    }
}

直接内存vs堆内存

可以用下面的代码来创建池化基于堆的ByteBuf

ByteBuf buf = ByteBufAllocator.DEFAULT.heapBuffer();

下面代码是池化基于直接内存的ByteBuf

ByteBuf buf = ByteBufAllocator.DEFAULT.directBuffer();
  • 直接内存创建和销毁的代价高昂,但读写性能高(少一次内存复制),适合配合池化功能一起使用
  • 直接内存对GC压力小,因为这部分内存不收JVM垃圾回收的管理,但也要注意及时主动释放

netty默认情况下,都是使用直接内存作为byteBuf的内存

池化vs非池化

池化的概念类似于数据库连接池

池化最大的意义在于重用ByteBuf,优点如下

  • 没有池化,则每次创建新的ByteBuf实例,这个操作内存代价昂贵,就算是堆内存,也会增加GC压力
  • 有了池化,啧可以重用池中ByteBuf实例,并且采用了与jemalloc类似的内存分配算法提高效率
  • 高并发时,池化功能更节约内存,减少内存溢出的可能

4.1以后,非android平台默认启用池化,android平台默认非池化

4.1以前池化不成熟,默认是非池化实现

组成

ByteBuf有容量capacity,和最大容量 max capcity,

  1. 写指针到容量的位置是可写部分
  2. 读指针到写指针的部分是可读部分
  3. 读过的数据是废弃部分
  4. 容量到最大容量之间是可扩容部分

相比于ByteBuffer,他把读写切换为了俩个指针不用来回的切换,还可以支持扩容

内存回收

由于Netty中有堆外内存ByteBuf实现,堆外内存最好是手动来释放,而不是等待GC回收

  • UnpooledHeapByteBuf使用的是JVM内存,只需等待GC回收即可
  • UnpooledDirectByteBuf使用的是直接内存,需要特殊方法来回收内存
  • PooledByteBuf和它的子类使用了池化机制,需要更复杂的规则来回收内存

Netty这里采用了引用计数法来控制回收的内存,每个ByteBuf都实现了ReferenceCounted接口

  • 每个ByteBuf对象的初始技术为1
  • 调用release方法计数减1,如果技术为0,ByteBuf内存被回收
  • 调用retain方法计数+1,表示调用者没调用完之前,其他Handler调用了release也不会造成回收
  • 当计数为0时,基层内存会被回收(也看是回到内存池),这时即使ByteBuf对象还在,其各个方法均无法正常使用

因为pipeline的存在,一般需要将ByteBuf传递给下一个ChannelHandler,如果在Fianlly中relase了就失去了传递性(当然如果在这个ChannelHandler内这个ByteBuf已经完成了它的使命,那么便无需传递)

基本规则是,谁是最终使用者,谁负责进行relase

slice

零拷贝的体现之一,对原始ByteBuf进行切片成多个ByteBuf,切片后ByteBuf并没有发生内存复制.还是使用原始ByteBuf的内存,切片后的ByteBuf维护独立的read,write指针

package netty.nio2;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;

/**
 * @Author: suiyi
 * @Date: 2022/9/6 11:13
 */
public class TestSlice {
    public static void main(String[] args) {
        ByteBuf buf = ByteBufAllocator.DEFAULT.buffer(10);
        buf.writeBytes(new byte[]{'a','b','c','d','e','f','g'});
        System.out.println(buf);
        //切片过程中没有发生数据复制,用的还是原来的内存
        ByteBuf f1 = buf.slice(0, 4);//切片方法执行时,会对最大容量做一个限制
        ByteBuf f2 = buf.slice(4, 6);
        System.out.println(f1);
        System.out.println(f2);
        //切片后的内容中不能在写入新的数据

        f1.retain();
        //释放原有byteBuf内存会影响之前的内存,使用f1.retain();可以添加计数器,防止被回收
        //所以调用slice方法时最好对切片后的对象调用一次retain方法
        buf.release();



    }
}

duplicate

零拷贝的体现之一,就好比截取了原始ByteBuf中所有内容,并且没有maxCapcity的限制,也是与原始ByteBuf使用同一块底层内存,只是读写指针是独立的

copy

会将内存数据进行深拷贝,因此无论读写,都与原始ByteBuf无关

compositeBuffer

package netty.nio2;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.CompositeByteBuf;

/**
 * @Author: suiyi
 * @Date: 2022/9/6 13:01
 */
public class TestCompositeByteBuf {
    public static void main(String[] args) {
        ByteBuf buf1 = ByteBufAllocator.DEFAULT.buffer();
        buf1.writeBytes(new byte[]{1,2,3,4,5});

        ByteBuf buf2 = ByteBufAllocator.DEFAULT.buffer();
        buf2.writeBytes(new byte[]{6,7,8,9,10});

//        ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();
//        buffer.writeBytes(buf1).writeBytes(buf2); //传统方式

        CompositeByteBuf buffer = ByteBufAllocator.DEFAULT.compositeBuffer();
        buffer.addComponents(true,buf1,buf2);//true代表自动增长写指针
        System.out.println(buffer);

    }
}

误解

很多人有误区,认为只有在netty,nio这样的多路复用IO模型时,读写才不会相互阻塞,才可以实现高效的双向通信,但实际上

java Socket是全双工的,在任意时刻,线路A到B和B到A的双向信号传输,即便是阻塞IO,读写也是可以同时进行的,只要分别采用读线程和写线程即可

Last modification:October 7, 2023
如果觉得我的文章对你有用,请随意赞赏