Netty 系列 ———— (五) 自定义编码解码器


Netty 自定义编码解码器

Netty 已经内置了很多常用的编码器和解码器给我们,但是有的时候出于业务场景的需要,我们需要来自定义消息传输协议(protocol)。这个时候就需要我们来自定义编码器和解码器来解决 socket 编程导致的拆包和沾包。

首先我们设计一个消息协议,前面的4个字节为传输消息的长度,后面的字节为具体的消息

例如在 socket 中要传输 hello world 这个消息,通过该协议传输的就是: 11hello world
长度消息
11hello world

11代表了 hello world 这个消息的字节长度,消息内容就是我们传递的消息,真正传输的是字节数组,在这儿为了方便理解写成了字符串

Netty 中自定义编码器需要继承 MessageToByteEncoder<T> 类,这个类也继承了ChannelOutboundHandlerAdapter,这表示自定义的编码器会在消息出站的时候被调用。

具体的编码器实现:

public class CustomEncoder extends MessageToByteEncoder<CustomProtocol> {

    @Override
    protected void encode(ChannelHandlerContext ctx, CustomProtocol msg, ByteBuf out) throws Exception {
        // 1.写入长度
        out.writeInt(msg.getLength());
        // 2.写入消息字节
        out.writeBytes(msg.getMessage());
    }
}
  1. 通过调用 ByteBuf 的 writeInt 方法将这个消息的长度写入到了 socket 中
  2. 然后再调用 writeBytes 方法,将具体的消息写入了 socket 中
  3. 同时我们需要将自定义的编码器加入到 Netty 的 pipeline 中(这一步看源码)。
长度消息
0, 0, 0, 11104, 101, 108, 108, 111, 32, 119, 111, 114, 108, 100

上面的内容就是消息真正在网络中传输的内容

自定义解码器就是要将传输的 Byte 转换为对应的对象,实现自定义解码器需要继承 ByteToMessageDecoder 类,表示类解码器是在入站的时候被调用

编码器和解码器通常是逆反的流程,编码器负责对数据做编码处理,而解码器负责将数据处理为对象

具体的解码器实现:

/**
 * 自定义解码器
 *
 * @author g5niusx
 */
public class CustomDecoder extends ByteToMessageDecoder {
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        in.markReaderIndex();
        // 读取消息长度
        int length = in.readInt();
        // 如果可以读的数据小于协议中的数据,需要重置一下读的下标,重新读取
        if (in.readableBytes() < length) {
            in.resetReaderIndex();
            return;
        }
        byte[] bytes = new byte[length];
        // 将读取的字节填充到空的数组中
        in.readBytes(bytes);
        // 将传输的字节数组转换为字符串传递给下一个入站类处理
        out.add(new String(bytes, UTF_8));
    }
}
  1. 在编码器中我们先写入了传递消息的长度(INT类型),所以在解码器中,先读取了INT类型的长度,这样读取出来的就是我们的消息长度
  2. 然后通过 readableBytes 方法来确定已经接受的字节长度和我们实际需要的字节长度是否一直,如果不一致,通过调用 resetReaderIndex 来重置了 ByteBuf 的读位置
  3. 然后来重新读取消息。最后将接受到的字节数组转换为字符串传递给了下一个入站处理类
具体的 Handler 实现:
@ChannelHandler.Sharable
@Slf4j
public class CustomClientHandler extends SimpleChannelInboundHandler<String> {
    @Override
    public void channelActive(ChannelHandlerContext ctx) {
        CustomProtocol customProtocol = new CustomProtocol("hello world");
        ctx.writeAndFlush(customProtocol);
    }

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
        log.info("客户端接收到: {}", msg);
    }
}
  1. 上面代码中的 ctx.writeAndFlush(customProtocol); 写入的 CustomProtocol 对象刚好是我们编码器的范型
  2. 上面代码中的 SimpleChannelInboundHandler<String> 继承的范型类型其实就是我们的解码器 out.add(new String(bytes, UTF_8)); 写出的对象范型

总结

通过上面的自定义编码器和解码器就可以实现消息的自定义协议,同时可以在自定义的编码器和解码器之间可以再插入其他的编码器或者解码器来让我们的程序更加的优雅

源码地址: netty-demo-custom

  #netty 

« Netty 系列 ———— (四) 心跳检测 Mysql 对大表执行扩列和加索引 »
blog comments powered by Disqus