阐述Dubbo服务提供方的解码原理

1 解码概述

在Dubbo服务消费方(客户端)和服务提供方(服务端)进行网络通信时,服务提供方会通过socket把需要发送的内容序列化为二进制流后发出。接着二进制流通过网络流向服务提供方。服务提供方接收到该请求后会解析该请求包,对接收到的数据进行反序列化后对请求进行处理。

服务提供方接收到请求后解析请求包(即Dubbo协议的内容)的过程即服务提供方的解码过程。

2 解码原理解析

2.1 解码入口

服务提供方解析请求包最终是借助于 InternalDecoder 的 decode 方法来实现的。具体实现代码如下所示。

protected void decode(ChannelHandlerContext ctx, ByteBuf input, List<Object> out) throws Exception {

    ChannelBuffer message = new NettyBackedChannelBuffer(input);

    NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);

    // decode object.
    do {
        int saveReaderIndex = message.readerIndex();
        Object msg = codec.decode(channel, message);
        if (msg == Codec2.DecodeResult.NEED_MORE_INPUT) {
            message.readerIndex(saveReaderIndex);
            break;
        } else {
            //is it possible to go here ?
            if (saveReaderIndex == message.readerIndex()) {
                throw new IOException("Decode without read data.");
            }
            if (msg != null) {
                out.add(msg);
            }
        }
    } while (message.readable());
}

2.2 解码实现细节

在 InternalDecoder 的 decode 方法内部再调用了具体解码实现来进行解码。如 ExchangeCodec 的 decode 方法,实现如下所示。

public Object decode(Channel channel, ChannelBuffer buffer) throws IOException {
    // 将dubbo协议头读取到数据header
    int readable = buffer.readableBytes();
    byte[] header = new byte[Math.min(readable, HEADER_LENGTH)];
    buffer.readBytes(header);
    // 解析dubbo协议帧数据部分
    return decode(channel, buffer, readable, header);
}

@Override
protected Object decode(Channel channel, ChannelBuffer buffer, int readable, byte[] header) throws IOException {
    // check magic number.
    if (readable > 0 && header[0] != MAGIC_HIGH
        || readable > 1 && header[1] != MAGIC_LOW) {
        int length = header.length;
        if (header.length < readable) {
            header = Bytes.copyOf(header, readable);
            buffer.readBytes(header, length, readable - length);
        }
        for (int i = 1; i < header.length - 1; i++) {
            if (header[i] == MAGIC_HIGH && header[i + 1] == MAGIC_LOW) {
                buffer.readerIndex(buffer.readerIndex() - header.length + i);
                header = Bytes.copyOf(header, i);
                break;
            }
        }
        return super.decode(channel, buffer, readable, header);
    }
    // check length.
    if (readable < HEADER_LENGTH) {
        return DecodeResult.NEED_MORE_INPUT;
    }

    // get data length.
    int len = Bytes.bytes2int(header, 12);

    // When receiving response, how to exceed the length, then directly construct a response to the client.
    // see more detail from https://github.com/apache/dubbo/issues/7021.
    Object obj = finishRespWhenOverPayload(channel, len, header);
    if (null != obj) {
        return obj;
    }

    int tt = len + HEADER_LENGTH;
    if (readable < tt) {
        return DecodeResult.NEED_MORE_INPUT;
    }

    // limit input stream.
    ChannelBufferInputStream is = new ChannelBufferInputStream(buffer, len);

    try {
        return decodeBody(channel, is, header);
    } finally {
        if (is.available() > 0) {
            try {
                if (logger.isWarnEnabled()) {
                    logger.warn(TRANSPORT_SKIP_UNUSED_STREAM, "", "", "Skip input stream " + is.available());
                }
                StreamUtils.skipUnusedStream(is);
            } catch (IOException e) {
                logger.warn(TRANSPORT_SKIP_UNUSED_STREAM, "", "", e.getMessage(), e);
            }
        }
    }
}

而对dubbo协议body进行解析则是通过 decodeBody 方法,实现如下所示。

protected Object decodeBody(Channel channel, InputStream is, byte[] header) throws IOException {
    byte flag = header[2], proto = (byte) (flag & SERIALIZATION_MASK);
    // get request id.
    long id = Bytes.bytes2long(header, 4);
    if ((flag & FLAG_REQUEST) == 0) {
        // decode response.
        Response res = new Response(id);
        if ((flag & FLAG_EVENT) != 0) {
            res.setEvent(true);
        }
        // get status.
        byte status = header[3];
        res.setStatus(status);
        try {
            if (status == Response.OK) {
                Object data;
                if (res.isEvent()) {
                    byte[] eventPayload = CodecSupport.getPayload(is);
                    if (CodecSupport.isHeartBeat(eventPayload, proto)) {
                        // heart beat response data is always null;
                        data = null;
                    } else {
                        data = decodeEventData(channel, CodecSupport.deserialize(channel.getUrl(), new ByteArrayInputStream(eventPayload), proto), eventPayload);
                    }
                } else {
                    data = decodeResponseData(channel, CodecSupport.deserialize(channel.getUrl(), is, proto), getRequestData(channel, res, id));
                }
                res.setResult(data);
            } else {
                res.setErrorMessage(CodecSupport.deserialize(channel.getUrl(), is, proto).readUTF());
            }
        } catch (Throwable t) {
            res.setStatus(Response.CLIENT_ERROR);
            res.setErrorMessage(StringUtils.toString(t));
        }
        return res;
    } else {
        // decode request.
        Request req;
        try {
            Object data;
            if ((flag & FLAG_EVENT) != 0) {
                byte[] eventPayload = CodecSupport.getPayload(is);
                if (CodecSupport.isHeartBeat(eventPayload, proto)) {
                    // heart beat response data is always null;
                    req = new HeartBeatRequest(id);
                    ((HeartBeatRequest) req).setProto(proto);
                    data = null;
                } else {
                    req = new Request(id);
                    data = decodeEventData(channel, CodecSupport.deserialize(channel.getUrl(), new ByteArrayInputStream(eventPayload), proto), eventPayload);
                }
                req.setEvent(true);
            } else {
                req = new Request(id);
                data = decodeRequestData(channel, CodecSupport.deserialize(channel.getUrl(), is, proto));
            }
            req.setData(data);
        } catch (Throwable t) {
            // bad request
            req = new Request(id);
            req.setBroken(true);
            req.setData(t);
        }
        req.setVersion(Version.getProtocolVersion());
        req.setTwoWay((flag & FLAG_TWOWAY) != 0);
        return req;
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/594448.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

详解基于 RAG 的 txt2sql 全过程

前文 本文使用通义千问大模型和 ChromaDB 向量数据库来实现一个完整的 text2sql 的项目&#xff0c;并基于实际的业务进行效果的展示。 准备 在进行项目之前需要准备下面主要的内容&#xff1a; python 环境通义千问 qwen-max 模型的 api-keyChromaDB 向量数据库acge_text_…

一款 NodeJS 版本管理工具 NVM (Windows)

一、简介 Node Version Manager&#xff08;NVM&#xff09;是一种用于管理多个 NodeJS 版本的工具。在日常工作中&#xff0c;我们可能同时在进行多个不同的项目开发&#xff0c;每个项目的需求不同&#xff0c;依赖与不同版本的NodeJS 运行环境。这种情况下&#xff0c;维护…

数据处理学习笔记9

一些其他的函数 “Resize”和“Reshape”的区别主要在于它们对数组元素数量和形状的处理方式不同&#xff0c;以下是详细介绍&#xff1a; “Resize”通常会改变数组的元素数量&#xff0c;在放大数组形状时会用0补全新增的元素&#xff0c;而在缩小数组形状时会丢弃多余的元素…

一款AI工作流项目:phidatahq/phidata

一款AI工作流项目&#xff1a;phidatahq/phidata 构建和测试功能强大的 AI 工作流程。该项目提供了一个工作流平台,可以结合大型语言模型(LLM)和各种工具,扩展模型的实用性和应用范围。[1][4][5] 开发各种 AI 助手应用,如客服聊天机器人、数据分析工具、研究助手等。phidata 提…

Golang | Leetcode Golang题解之第72题编辑距离

题目&#xff1a; 题解&#xff1a; func minDistance(word1 string, word2 string) int {m, n : len(word1), len(word2)dp : make([][]int, m1)for i : range dp {dp[i] make([]int, n1)}for i : 0; i < m1; i {dp[i][0] i // word1[i] 变成 word2[0], 删掉 word1[i], …

LabVIEW波浪发电平台浮筒取能效率数据采集系统

LabVIEW波浪发电平台浮筒取能效率数据采集系统 随着化石能源的逐渐减少以及能源价格的上升&#xff0c;寻找可替代的、可再生的、清洁的能源成为了世界各国的共识。波浪能作为一种重要的海洋能源&#xff0c;因其巨大的潜力和清洁性&#xff0c;近年来受到了广泛关注。开发了一…

32 OpenCV Harris角点检测

文章目录 cornerHarris 算子示例 角点检测 cornerHarris 算子 void cv::cornerHarris ( InputArray src,OutputArray dst,int blockSize,int ksize,double K,int borderType BORDER_DEFAULT) src:待检测Harris角点的输入图像&#xff0c;图像必须是CV 8U或者CV 32F的单通道…

玩comfyui踩过的坑之使用ComfyUI_Custom_NODES_ALEKPET翻译组件问题

环境&#xff1a; 秋叶安装包&#xff0c;安装ComfyUI_Custom_NODES_ALEKPET组件或者直接下载网盘中的包&#xff0c;直接解压包到comfyui根目录/custom_nodes/&#xff0c;重启后&#xff0c;按指导文件操作。 注意&#xff1a;网盘指导包中有配置好的流程json文件&#xff0…

【源码】 频裂变加群推广强制分享引流源码

视频裂变加群推广强制分享引流源码&#xff0c;用户达到观看次数后需要分享给好友或者群,好友必须点击推广链接后才会增加观看次数。 引导用户转发QV分享,达到快速裂变引流的效果&#xff01; 视频裂变推广程序&#xff0c;强制分享链接&#xff0c;引导用户转发&#xff0c;…

prometheus搭建

1.prometheus下载 下载地址:Download | Prometheus 请下载LTS稳定版本 本次prometheus搭建使用prometheus-2.37.1.linux-amd64.tar.gz版本 2.上传prometheus-2.37.1.linux-amd64.tar.gz至服务器/opt目录 CentOS7.9 使用命令rz -byE上传 3.解压缩prometheus-2.37.1.linux…

VscodeC/C++环境配置

引言 vscode是一款非常好用的编辑器&#xff0c;集成了大量的插件&#xff0c;具有很高的自由度&#xff0c;因此广受大家的喜爱。但是他本身是不带编译器的&#xff0c;因此如果要使用vscode来编译C/C程序的话&#xff0c;我们需要额外安装编译器并且为vscode配上环境。 编译…

Docker 入门与实践:从零开始构建容器化应用环境

Docker 一、docker常用命令docker ps 格式化输出Linux设置命令别名 二、数据卷相关命令挂载到默认目录&#xff08;/var/lib/docker&#xff09;挂载到本地目录 三、自定义镜像Dockerfile构建镜像的命令 四、网络自定义网络 五、DockerCompose相关命令 一、docker常用命令 dock…

Python | Leetcode Python题解之第71题简化路径

题目&#xff1a; 题解&#xff1a; class Solution:def simplifyPath(self, path: str) -> str:names path.split("/")stack list()for name in names:if name "..":if stack:stack.pop()elif name and name ! ".":stack.append(name)re…

ThreeJS:光线投射与3D场景交互

光线投射Raycaster 光线投射详细介绍可参考&#xff1a;https://en.wikipedia.org/wiki/Ray_casting&#xff0c; ThreeJS中&#xff0c;提供了Raycaster类&#xff0c;用于进行鼠标拾取&#xff0c;即&#xff1a;当三维场景中鼠标移动时&#xff0c;利用光线投射&#xff0c;…

点亮一个LED

新建项目 #include <REGX52.H>void main() {P2 0xFE;while(1){} }调整字体大小 在编译之前要勾选一个东西,不然scp读取不了 去stc-isp中烧录进51单片机 两个地方要勾选,一个是单片机型号,一个是串口号,我的单片机型号不是江科大视频里面那个型号,所以不能按视频里面来选…

【数据结构(邓俊辉)学习笔记】列表04——排序器

文章目录 0. 统一入口1. 选择排序1.1 构思1.2 实例1.3 实现1.4 复杂度 2. 插入排序2.1 构思2.2 实例2.3 实现2.4 复杂度分析2.5 性能分析 3. 归并排序3.1 二路归并算法3.1.1 二路归并算法原理3.1.2 二路归并算法实现3.1.3 归并时间 3.2 分治策略3.2.1 实现3.2.2 排序时间 4. 总…

学习笔记:【QC】Android Q - IMS 模块

一、IMS init 流程图 二、IMS turnon 流程图 三、分析说明 1、nv702870 不创建ims apn pdp 2、nv702811 nv702811的时候才创建ims pdp&#xff1a; ims pdp 由ims库发起&#xff0c;高通没有开放这部分代码&#xff1a; 10-10 11:45:53.027 943 943 E Diag_Lib: [IMS_D…

只用语音能训练出AI大模型吗?就像训练会说话但不识字的人一样

AI语音对话技术通常是基于语音识别和自然语言处理&#xff08;NLP&#xff09;的。在这个过程中&#xff0c;语音首先被识别成文字&#xff0c;然后NLP技术对这些文字进行处理&#xff0c;生成回应。然而&#xff0c;我们是否可以直接训练一个“文盲”大模型&#xff0c;即只用…

45. UE5 RPG 增加角色受击反馈

在前面的文章中&#xff0c;我们实现了对敌人的属性的初始化&#xff0c;现在敌人也拥有的自己的属性值&#xff0c;技能击中敌人后&#xff0c;也能够实现血量的减少。 现在还需要的就是在技能击中敌人后&#xff0c;需要敌人进行一些击中反馈&#xff0c;比如敌人被技能击中后…

深度学习中的注意力机制二(Pytorch 16)

一 Bahdanau 注意力 通过设计一个 基于两个循环神经网络的编码器‐解码器架构&#xff0c;用于序列到序列学习。具体来说&#xff0c;循环神经网络编码器将长度可变的序列转换为固定形状的上下文变量&#xff0c;然后循环神经网络 解码器根据生成的词元和上下文变量按词元生成…
最新文章