Lời nói đầu

在上一篇随笔中,我们探讨了如何实现一套自定义通信协议,其中涉及到的粘包和拆包处理最初是完全自定义实现的,后来则改为了继承 ByteToMessageDecoder 来简化处理.




无缓存的情况 。

  • 反复从ByteBuf中提取完整的消息
  • 剩余的残缺消息写入缓存(会进行数据拷贝)

有缓存的情况 。

  • 将新收到的数据接入缓存
  • 反复从缓存中提取完整消息
  • 释放缓存内读取过的数据(会进行数据移动,导致拷贝)
công cộng lớp học EchoServerHandler mở rộng ChannelInboundHandlerAdapter {
    riêng tư tĩnh cuối cùng số nguyên HEADER_LENGTH = 4; //消息头部长度
    riêng tư ByteBuf buffer = Unpooled.buffer(1024); //缓存残缺消息

    @Ghi đè
    công cộng vô hiệu channelRead(ChannelHandlerContext ctx, Object msg) ném Exception { ByteBuf income = (ByteBuf) msg;

        nếu như(buffer.readableBytes() > 0) {
            buffer.ensureWritable(income.readableBytes()); income.readBytes(buffer, income.readableBytes()); readMsgFromBuffer(buffer);

            nếu như(buffer.readableBytes() > 0) {
                System.out.println("缓存剩余字节:"+buffer.readableBytes()); buffer.discardReadBytes(); } khác { //刚刚好,则清空数据
                buffer.clear(); } } khác { readMsgFromBuffer(income);

            nếu như (income.readableBytes() >0) { System.out.println("剩余字节:"+income.readableBytes()); income.readBytes(buffer, income.readableBytes()); } } }

    riêng tư vô hiệu readMsgFromBuffer(ByteBuf byteBuf) {
        trong khi(byteBuf.readableBytes() >= HEADER_LENGTH) { byteBuf.markReaderIndex(); //由于可能读不到完整的消息,所以读之前先标记索引位置,方便重置
            byte[] headerBytes = mới byte[4]; byteBuf.readBytes(headerBytes);
            số nguyên type = headerBytes[0] & 0xFF;
            số nguyên bodyLength = ((headerBytes[1] & 0xFF) << 16) |
                    ((headerBytes[2] & 0xFF) << 8) |
                    (headerBytes[3] & 0xFF);

            nếu như (byteBuf.readableBytes() < bodyLength) { byteBuf.resetReaderIndex(); //重置读索引到当前消息头位置
                phá vỡ; }

            // 完整消息体已经接收,处理消息
            byte[] body = mới byte[bodyLength]; byteBuf.readBytes(body);
            //System.out.println("type:"+type+"||length:"+bodyLength+"||body:"+new String(body, CharsetUtil.UTF_8));
            nếu như(type == 1) {
                thử { HelloRequest request = HelloRequest.parseFrom(body); System.out.println("收到消息:"+request.toString()); } nắm lấy (Exception e) { System.out.println("解析失败:"+mới String(body, CharsetUtil.UTF_8)); } } khác { System.out.println("消息类型未知:"+type); } } } ....




công cộng lớp học MessageDecoder mở rộng ByteToMessageDecoder {
    riêng tư tĩnh cuối cùng số nguyên HEADER_LENGTH = 4; //消息头部长度

    @Ghi đè
    được bảo vệ vô hiệu decode(ChannelHandlerContext ctx, ByteBuf in, List out) ném Ngoại lệ {
        // 检查是否足够的字节来读取一个消息头
        trong khi (in.readableBytes() >= HEADER_LENGTH) { in.markReaderIndex(); // 标记当前读取位置,便于重置

            // 读取消息头部
            byte[] headerBytes = mới byte[4]; in.readBytes(headerBytes);

            // 获取类型
            số nguyên type = headerBytes[0] & 0xFF;
            // 获取消息体长度
            số nguyên bodyLength = ((headerBytes[1] & 0xFF) << 16) |
                    ((headerBytes[2] & 0xFF) << 8) |
                    (headerBytes[3] & 0xFF);

            // 检查缓冲区中的数据是否足够读取整个消息体
            nếu như (in.readableBytes() < bodyLength) { in.resetReaderIndex(); // 重置读指针,等待更多数据
                phá vỡ; }

            // 读取消息体
            byte[] body = mới byte[bodyLength]; in.readBytes(body);

            // 处理消息
            thử { Object msg = vô giá trị;
                nếu như(type == 1) { msg = HelloRequest.parseFrom(body); } khác nếu như(type == 2) { msg = HelloResponse.parseFrom(body); } khác { System.out.println("未知消息:"+mới String(body, CharsetUtil.UTF_8)); }
                nếu như(Objects.nonNull(msg)) { out.add(msg); } } nắm lấy (Exception e) { System.out.println("解析失败: " + mới String(body, CharsetUtil.UTF_8)); } } } }



    riêng tư ByteBuf cumulation;
    riêng tư Cumulator cumulator = MERGE_CUMULATOR;
    riêng tư số nguyên discardAfterReads = 16;
    riêng tư số nguyên numReads;

Luồng xử lý

1.新到数据存放到缓冲区(使用累加器Cumulator进行数据合并) 。

2.循环调用子类的decode方法,读取消息存入List,直到数据不足 。

3.遍历List,依次传递给下一个处理器 。







整体思路跟自定义实现差不多,不过它多考虑了两种情况 。

  • 数据被共享:共享数据会被其他使用者影响,需排除影响
  • 数据只读:只读空间无法被写入,而缓冲区是需要写入新数据的
    công cộng tĩnh cuối cùng Cumulator MERGE_CUMULATOR = mới Cumulator() {
        @Ghi đè
        công cộng ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) {
            thử {
                cuối cùng ByteBuf buffer;
                nếu như (cumulation.writerIndex() > cumulation.maxCapacity() - in.readableBytes()
                    || cumulation.refCnt() > 1 || cumulation.isReadOnly()) {
                    // Expand cumulation (by replace it) when either there is not more room in the buffer
                    // or if the refCnt is greater then 1 which may happen when the user use slice().retain() or
                    // duplicate().retain() or if its read-only.
                    // See:
                    // - https://github.com/netty/netty/issues/2327
                    // - https://github.com/netty/netty/issues/1764
                    buffer = expandCumulation(alloc, cumulation, in.readableBytes()); } khác { buffer = cumulation; }
                trở lại buffer; } Cuối cùng {
                // We must release in in all cases as otherwise it may produce a leak if writeBytes(...) throw
                // for whatever release (for example because of OutOfMemoryError)
                in.release(); } } };


 上面的处理,新到数据与缓存的合并是通过数据拷贝。而下面这种方式,则是使用组合(数据没有移动,只是提供一个整合后的视图) 。

  công cộng tĩnh cuối cùng Cumulator COMPOSITE_CUMULATOR = mới Cumulator() { @Override
        công cộng ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) { ByteBuf buffer;
            thử {
                nếu như (cumulation.refCnt() > 1) {
                    // Expand cumulation (by replace it) when the refCnt is greater then 1 which may happen when the
                    // user use slice().retain() or duplicate().retain().
                    // See:
                    // - https://github.com/netty/netty/issues/2327
                    // - https://github.com/netty/netty/issues/1764
                    buffer = expandCumulation(alloc, cumulation, in.readableBytes()); buffer.writeBytes(in); } khác { CompositeByteBuf composite;
                    nếu như (cumulation trường hợp của CompositeByteBuf) {
                        composite = (CompositeByteBuf) cumulation; } khác { composite = alloc.compositeBuffer(Integer.MAX_VALUE);
                        composite.addComponent(ĐÚNG VẬY, cumulation); }
                    composite.addComponent(ĐÚNG VẬY, in); in = vô giá trị; buffer = composite; }
                trở lại buffer; } Cuối cùng {
                nếu như (in != vô giá trị) {
                    // We must release if the ownership was not transferred as otherwise it may produce a leak if
                    // writeBytes(...) throw for whatever release (for example because of OutOfMemoryError).
                    in.release(); } } } };




而这里做了优化,累积16次读取后,才会进行释放。(channelReadComplete的时候也会触发) 。

这样做的好处,就是可以减少数据拷贝的次数。(discard操作会把已读数据清空,重置读索引,然后把剩余数据往前挪) 。

    @Ghi đè
    công cộng vô hiệu channelRead(ChannelHandlerContext ctx, Object msg) ném Ngoại lệ {
        nếu như (msg trường hợp của ByteBuf) { CodecOutputList out = CodecOutputList.newInstance();
            thử { ByteBuf data = (ByteBuf) msg; first = cumulation == vô giá trị;
                nếu như (first) { cumulation = data; } khác {
                    cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data); }
                callDecode(ctx, cumulation, out); } nắm lấy (DecoderException e) {
                ném e; } nắm lấy (Ngoại lệ e) {
                ném mới DecoderException(e); } Cuối cùng {
                nếu như (cumulation != vô giá trị && !cumulation.isReadable()) {
                    numReads = 0; cumulation.release(); cumulation = vô giá trị; } khác nếu như (++ numReads >= discardAfterReads) {
                    // We did enough reads already try to discard some bytes so we not risk to see a OOME.
                    // See https://github.com/netty/netty/issues/4275
                    numReads = 0; discardSomeReadBytes(); }

                số nguyên kích thước = out.size();
                decodeWasNull = !out.insertSinceRecycled();
                fireChannelRead(ctx, out, size);
                out.recycle(); } } khác {
            ctx.fireChannelRead(msg); } }



    được bảo vệ vô hiệu callDecode(ChannelHandlerContext ctx, ByteBuf in, List out) {
        thử {
            trong khi (in.isReadable()) {
                số nguyên outSize = out.size();
                nếu như (outSize > 0) { fireChannelRead(ctx, out, outSize); out.clear();

                    // Check if this handler was removed before continuing with decoding.
                    // If it was removed, it is not safe to continue to operate on the buffer.
                    // See:
                    // - https://github.com/netty/netty/issues/4635
                    nếu như (ctx.isRemoved()) {
                        phá vỡ; } outSize = 0; }

                số nguyên oldInputLength = in.readableBytes();
                decodeRemovalReentryProtection(ctx, in, out);

                // Check if this handler was removed before continuing the loop.
                // If it was removed, it is not safe to continue to operate on the buffer.
                // See https://github.com/netty/netty/issues/1664
                nếu như (ctx.isRemoved()) {
                    phá vỡ; }

                nếu như (outSize == out.size()) {
                    nếu như (oldInputLength == in.readableBytes()) {
                        phá vỡ; } khác { //数据有被动过,但还没解析出数据,继续执行
                        Tiếp tục; } }
                nếu như (oldInputLength == in.readableBytes()) {
                    ném mới DecoderException( StringUtil.simpleClassName(getClass()) + ".decode() did not read anything but decoded a message."); }
                nếu như (isSingleDecode()) {
                    phá vỡ; } } } nắm lấy (DecoderException e) {
            ném e; } nắm lấy (Exception cause) {
            ném mới DecoderException(cause); } }

Tóm tắt

核心内容并无太大差异,但 Netty 提供的抽象类在实现上考虑了更多细节,并经过社区的不断演进,功能变得更加稳定和完善.

因此,推荐继承 ByteToMessageDecoder 来实现解码.

最后此篇关于【源码】ByteToMessageDecoder对比自定义实现的文章就讲到这里了,如果你想了解更多关于【源码】ByteToMessageDecoder对比自定义实现的内容请搜索CFSDN的文章或继续浏览相关文章,希望大家以后支持我的博客! 。

