Нетти Протобуфы не разбираются после нескольких чтений

#java #protocol-buffers #netty

Вопрос:

У меня возникли некоторые проблемы с реализацией динамических декодеров protobuf после чтения определенной текстовой строки.

 protected void initChannel(Channel ch) {
    pipeline.addLast("string-decoder", STRING_DECODER); //parses the xml into string data.
    pipeline.addLast("xml-decoder", new XmlFrameDecoder(Integer.MAX_VALUE)); //parses incoming xml data
    pipeline.addLast("check-protobuf-handler", new SimpleChannelInboundHandler<String> {
         @Override
         protected void channelRead0(ChannelHandlerContext ctx, String string) {
             if (string.equals("<go></go>") {
                  pipeline.remove("string-decoder");
                  pipeline.addBefore("xml-decoder", "proto-header-decoder", new ProtoHeaderDecoder());
                  pipeline.addBefore("xml-decoder", "proto-decoder", new ProtobufDecoder(CustomMessage.CustomMessage.getDefaultInstance()));
                  pipeline.addBefore("xml-decoder", "proto-event-decoder", new ProtoToXmlDecoder());
             }
         }
    );
}
 

Структура протобуфа представляет собой магический байт (0xbf), за которым следует длина, а затем полезная нагрузка.
Я создал пользовательский декодер, который проверяет наличие этого волшебного байта и игнорирует данные, если они не найдены. Как только это произойдет, он считывает его из буфера, а затем передает длину полезную нагрузку родительскому методу в ProtobufVarint32FrameDecoder.

 public class ProtoHeaderDecoder extends ProtobufVarint32FrameDecoder {
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        log.debug(in.toString(StandardCharsets.UTF_8));
        in.markReaderIndex();
        short magicByte = in.readUnsignedByte();
        if (magicByte == 0xbf) {
            super.decode(ctx, in, out);
        } else {
            log.warn("Could not read magic byte. Resetting head");
            in.resetReaderIndex();
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        log.error("Error parsing varint.", cause);
    }
 

Первоначально это работает, и, похоже, анализ выполняется правильно в течение примерно 10 секунд или около того, затем начинается сбой проверки magicByte = 0xbf. Есть ли что-то, что я делаю неправильно со своей стороны? Может ли процесс написания текста повлиять на это (если да, то я могу добавить и это).
Если я проанализирую протобуф в строку, данные будут похожи (большая их часть нечитабельна, но я могу кое-что разобрать)