提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档


前言 粘包问题

提示:这里可以添加本文要记录的大概内容:

今天介绍一下如何处理粘包,粘包问题是服务器收发数据常遇到的一个现象,下面我们介绍一下粘包问题是什么,当客户端发送多个数据包给服务器时,服务器底层的tcp接收缓冲区收到的数据为粘连在一起的,如下图所示

在这里插入图片描述

当客户端发送两个Hello World!给服务器,服务器TCP接收缓冲区接收了两次,一次是Hello World!Hello, 第二次是World!


提示:以下是本篇文章正文内容,下面案例可供参考

一、粘包原因

因为TCP底层通信是面向字节流的,TCP只保证发送数据的准确性和顺序性,字节流以字节为单位,客户端每次发送N个字节给服务端,N取决于当前客户端的发送缓冲区是否有数据,比如发送缓冲区总大小为10个字节,当前有5个字节数据(上次要发送的数据比如’loveu’)未发送完,那么此时只有5个字节空闲空间,我们调用发送接口发送hello world!其实就是只能发送Hello给服务器,那么服务器一次性读取到的数据就很可能是loveuhello。而剩余的world!只能留给下一次发送,下一次服务器接收到的就是world!
如下图
在这里插入图片描述
这是最好理解的粘包问题的产生原因。还有一些其他的原因比如
1   客户端的发送频率远高于服务器的接收频率,就会导致数据在服务器的tcp接收缓冲区滞留形成粘连,比如客户端1s内连续发送了两个hello world!,服务器过了2s才接收数据,那一次性读出两个hello world!。
2   tcp底层的安全和效率机制不允许字节数特别少的小包发送频率过高,tcp会在底层累计数据长度到一定大小才一起发送,比如连续发送1字节的数据要累计到多个字节才发送,可以了解下tcp底层的Nagle算法。
3   再就是我们提到的最简单的情况,发送端缓冲区有上次未发送完的数据或者接收端的缓冲区里有未取出的数据导致数据粘连。

总结:

💡 为什么会出现 TCP 粘包?

  1. TCP 是面向字节流的协议,没有“包”的概念。
  2. 客户端每次 send() 实际上是把数据写入 操作系统内核的发送缓冲区。
  3. 如果缓冲区有部分上次数据还没发完(如 loveu),新的数据(如 hello world)马上又加入。
  4. 接收端就可能收到合并后的数据(如 loveuhello),这就是“粘包”。

💥 导致粘包的具体原因
● 应用层发送太快,TCP 来不及分开发
● 接收方 read 时读取了多个数据一起到缓存
● TCP 对小数据包做了优化(Nagle 算法) tcp底层的安全和效率机制不允许字节数特别少的小包发送频率过高,tcp会在底层累计数据长度到一定大小才一起发送,比如连续发送1字节的数据要累计到多个字节才发送,可以了解下tcp底层的Nagle算法

二、如何处理粘包

处理方法

处理粘包的方式主要采用应用层定义收发包格式的方式,这个过程俗称切包处理,常用的协议被称为tlv协议
(消息id+消息长度+消息内容),如下图
在这里插入图片描述
为保证大家容易理解,我们先简化发送的格式,格式变为消息长度+消息内容的方式,之后再完善为tlv格式
简化后的结构如下图
在这里插入图片描述

三、完善消息节点MsgNode

之前我们设计过消息节点的数据结构MsgNode,这里需要完善一下

代码部分

class MsgNode
{
	friend class CSession;
public:
	MsgNode(char* msg,short max_len):_cur_len(0),_total_len(HEAD_LENGTH+max_len)
	{
		_data = new char[_total_len+1]();
		memcpy(_data, &max_len, HEAD_LENGTH);
		memcpy(_data+ HEAD_LENGTH, msg, max_len);
		_data[_total_len] = '\0';
	}
	MsgNode(short max_len) :_total_len(max_len), _cur_len(0)
	{
		_data = new char[_total_len + 1]();
	}

	~MsgNode()
	{
		delete[]_data;
	}

	void Clear()
	{
		::memset(_data, 0, _total_len);
		_cur_len = 0;
	}
private:
	short _cur_len;
	short _total_len;
	char* _data;

};

1.两个参数的构造函数做了完善,之前的构造函数通过消息首地址和长度构造节点数据,现在需要在构造节点的同时把长度信息也写入节点,该构造函数主要用来发送数据时构造发送信息的节点。
2. 一个参数的构造函数为较上次新增的,主要根据消息的长度构造消息节点,该构造函数主要是接收对端数据时构造接收节点调用的。
3.新增一个Clear函数清除消息节点的数据,主要是避免多次构造节点造成开销

细节详解

memcpy(_data, &max_len, HEAD_LENGTH);

这里为什么要用&
什么意思呢,现在解释一下
首先我们要做的格式是消息长度+消息内容
在tlv协议中,这个消息长度需要用二进制来存储

#define HEAD_LENGTH  2  //用2个字节的数据存储消息长度
memcpy(_data, &max_len, HEAD_LENGTH);

✅ 解释与流程:

  1. memcpy(_data, &max_len, HEAD_LENGTH);
    ○ 目的:将消息的长度写入消息包的前面作为 “消息头”
    ○ &max_len:取 max_len 的地址,即把这个整数以原始内存二进制形式写入 _data 中。
    ○ HEAD_LENGTH:通常为 2 或 4,代表写入几个字节(例如使用 short 就是 2 字节)。
    ○ ✅ 效果:在 _data 中的前 2 字节保存了消息的长度,二进制形式。
    在这里插入图片描述
    相当于我们要先把这个5写进去,告诉后面的数据长度为5,记得用二进制!!!所以加&

_data[_total_len] = ‘\0’

为什么最后要加一个结束符号呢??
如果没有 ‘\0’,字符串函数会一直读取内存,直到碰到恰好为 0 的字节,容易造成越界访问、程序崩溃或打印乱码
如果你的有效数据长度刚好覆盖了整个数组(不留多余的 ‘\0’),字符串函数找不到结束符,就会继续往后读,导致越界访问

四、完善CSession

为能够对收到的数据切包处理,需要定义一个消息接收节点,一个bool类型的变量表示头部是否解析完成,以及将处理好的头部先缓存起来的结构

在原先代码加入就行

//收到的消息结构
    std::shared_ptr<MsgNode> _recv_msg_node;
    bool _b_head_parse;//头部是否解析完成
    //收到的头部结构
    std::shared_ptr<MsgNode> _recv_head_node;

_recv_msg_node 用来存储接受的消息体信息
_recv_head_node 用来存储接收的头部信息
_b_head_parse 表示是否处理完头部信息

五、完善接收逻辑

我们需要修改HandleRead函数

代码部分

//读的回调函数
void CSession::HandleRead(const boost::system::error_code& ec,
 size_t bytes_transferred, shared_ptr<CSession>_self_shared)
{
	
	if (!ec)
	{
		int copy_len = 0;//本次回调读取了多少数据  后续需要偏移

		while (bytes_transferred > 0)//接收到了数据
		{
			if (!_b_head_parse)//先解析头部
			{
				//当前传入的数据+头部中的数据  依然没有达到2字节 那就全加入进去
				if (bytes_transferred + _recv_head_node->_cur_len < HEAD_LENGTH)
				{
					memcpy(_recv_head_node->_data + _recv_head_node->_cur_len, 
					_data+copy_len, bytes_transferred);
					_recv_head_node->_cur_len += bytes_transferred;
					::memset(_data, 0, max_length);
					//继续回调 等待数据传入
					_socket.async_read_some(boost::asio::buffer(_data, max_length),
						std::bind(&CSession::HandleRead, this, std::placeholders::_1,
						 std::placeholders::_2, _self_shared));
					return;
				}


				//收到的数据比头部多 可能刚好等于或者大于

				 //头部剩余未复制的长度
				int head_remain = HEAD_LENGTH - _recv_head_node->_cur_len;
				memcpy(_recv_head_node->_data + _recv_head_node->_cur_len, 
				_data + copy_len, head_remain);

				//更新已处理的data长度和剩余未处理的长度
				copy_len += head_remain;
				_recv_head_node->_cur_len += head_remain;
				bytes_transferred -= head_remain;

				//获取头部数据进行判断
				short data_len = 0;
				memcpy(&data_len, _recv_head_node->_data, HEAD_LENGTH);
				cout << "data_len is" << data_len << endl;

				if (data_len > max_length)
				{
					std::cout << "invalid data length is " << data_len << endl;
					_server->ClearSession(_uuid);
					return;
				}


				_recv_msg_node = make_shared<MsgNode>(data_len);
				//数据小 可以全存入 但后续还需要存入 因为没有达到头部所显示的长度
				if (bytes_transferred < data_len)
				{
					memcpy(_recv_msg_node->_data + _recv_msg_node->_cur_len,
					 _data + copy_len, bytes_transferred);
					_recv_msg_node->_cur_len += bytes_transferred;
					::memset(_data, 0, max_length);
					_socket.async_read_some(boost::asio::buffer(_data,max_length),
						std::bind(&CSession::HandleRead, this, 
						std::placeholders::_1, std::placeholders::_2, _self_shared));
					//头部处理完成
					_b_head_parse = true;
					return;
				}

				//数据大 就只存data_len长度  和头部显示的长度一样
				memcpy(_recv_msg_node->_data + _recv_msg_node->_cur_len,
				 _data + copy_len, data_len);
				_recv_msg_node->_cur_len += data_len;
				copy_len += data_len;
				bytes_transferred -= data_len;
				_recv_msg_node->_data[_recv_msg_node->_total_len] = '\0';
				cout << "receive data is " << _recv_msg_node->_data << endl;

				//此处可以调用Send发送测试
				Send(_recv_msg_node->_data, _recv_msg_node->_total_len);

				//继续轮询剩余未处理数据
				_b_head_parse = false;
				_recv_head_node->Clear();

				if (bytes_transferred == 0) //说明后面没有数据了 刚好是一个包
				{
					::memset(_data, 0, max_length);
					_socket.async_read_some(boost::asio::buffer(_data, max_length),
						std::bind(&CSession::HandleRead, 
						this, std::placeholders::_1, std::placeholders::_2, _self_shared));
					return;
				}
				continue;//说明后面还有数据  继续轮询处理 
			}


			//头部已经解析完
			int data_len = _recv_msg_node->_total_len - _recv_msg_node->_cur_len;

			//数据小 可以全存入 但后续还需要存入 因为没有达到头部所显示的长度
			if (bytes_transferred < data_len) 
			{
				memcpy(_recv_msg_node->_data + _recv_msg_node->_cur_len,
				 _data + copy_len, bytes_transferred);
				_recv_msg_node->_cur_len += bytes_transferred;
				::memset(_data, 0, max_length);
				_socket.async_read_some(boost::asio::buffer(_data, max_length),
					std::bind(&CSession::HandleRead, this, 
					std::placeholders::_1, std::placeholders::_2, _self_shared));
				return;
			}

			//数据大 就只存data_len长度  和头部显示的长度一样
			memcpy(_recv_msg_node->_data + _recv_msg_node->_cur_len,
			 _data + copy_len, data_len);
			_recv_msg_node->_cur_len += data_len;
			bytes_transferred -= data_len;
			copy_len += data_len;
			_recv_msg_node->_data[_recv_msg_node->_total_len] = '\0';
			cout << "receive data is " << _recv_msg_node->_data << endl;

			// 此处可以调用Send发送测试
				Send(_recv_msg_node->_data, _recv_msg_node->_total_len);

			//继续轮询剩余未处理数据
			_b_head_parse = false;
			_recv_head_node->Clear();
			if (bytes_transferred == 0) {
				::memset(_data, 0,max_length);
				_socket.async_read_some(boost::asio::buffer(_data, max_length),
					std::bind(&CSession::HandleRead, this,
					 std::placeholders::_1, std::placeholders::_2, _self_shared));
				return;
			}
			continue;
		}
	}
	else
	{
		std::cout << "handle read failed, error is " << ec.what() << endl;
		_server->ClearSession(_uuid);
	}
}

流程图

1   copy_len记录的是已经处理过数据的长度,因为存在一次接收多个包的情况,所以copy_len用来做已经处理的数据长度的。
2   首先判断_b_head_parse是否为false,如果为false则说明头部未处理,先判断接收的数据是否小于头部, 如果小于头部大小则将接收到的数据放入_recv_head_node节点保存,然后继续调用读取函数监听对端发送数据。否则进入步骤3.
3   如果收到的数据比头部多,可能是多个逻辑包,所以要做切包处理。根据之前保留在_recv_head_node的长度,计算出剩余未取出的头部长度,然后取出剩余的头部长度保存在_recv_head_node节点,然后通过memcpy方式从节点拷贝出数据写入short类型的data_len里,进而获取消息的长度。接下来继续处理包体,也就是消息体,判断接收到的数据未处理部分的长度和总共要接收的数据长度大小,如果小于总共要接受的长度,说明消息体没接收完,则将未处理部分先写入_recv_msg_node里,并且继续监听读事件。否则说明消息体接收完全,进入步骤4
4   将消息体数据接收到_recv_msg_node中,接受完全后返回给对端。当然存在多个逻辑包粘连,此时要判断bytes_transferred是否小于等于0,如果是说明只有一个逻辑包,我们处理完了,继续监听读事件,就直接返回即可。否则说明有多个数据包粘连,就继续执行上述操作。

5   因为存在_b_head_parse为true,也就是包头接收并处理完的情况,但是包体未接受完,再次触发HandleRead,此时要继续处理上次未接受完的消息体,大体逻辑和3,4一样。
以上就是处理粘包的过程,我们绘制流程图更明了一些
在这里插入图片描述

六、客户端修改

客户端的发送也要遵循先发送数据2个字节的数据长度,再发送数据消息的结构
接收时也是先接收两个字节数据获取数据长度,再根据长度接收消息

#include<boost/asio.hpp>
#include <iostream>
const int MAX_LENGTH = 1024;//表示发送和接收最大长度为1024字节
const int HEAD_LENGTH = 2;
int main()
{
    while (1)
    {
        try
        {

            //创建上下文服务
            boost::asio::io_context ios;
            //创建endpoint
            boost::asio::ip::tcp::endpoint
                remote_ep(boost::asio::ip::make_address("127.0.0.1"), 10086);
            //127.0.0.1是本机的回路地址 因为客户端和服务端在同一地址
            //服务器就是本机
            boost::asio::ip::tcp::socket sock(ios, remote_ep.protocol());
            boost::system::error_code error = boost::asio::error::host_not_found;
            sock.connect(remote_ep, error);
            if (error)
            {
                std::cout << "connect failed,code is:" << error.value() <<
                    "error message is" << error.message() << std::endl;
                return 0;
            }

            std::cout << "请输入需要发送的信息:";
            char request[MAX_LENGTH];
            std::cin.getline(request, MAX_LENGTH);
            size_t request_length = strlen(request);//获取实际要发送的数据长度
            //发送数据
            char send_data[MAX_LENGTH] = { 0 };
            memcpy(send_data, &request_length, 2);
            memcpy(send_data + 2, request, request_length);
            boost::asio::write(sock, boost::asio::buffer(send_data, request_length+2));
            
            /*
            char reply[MAX_LENGTH];//存储接收数据
            //用read读
            size_t reply_length = 0;
            boost::system::error_code ec;
            reply_length = sock.read_some(boost::asio::buffer(reply, MAX_LENGTH), ec);
            if (ec)
            {
                std::cerr << "Read failed: " << ec.message() << std::endl;
                return 0;
            }
            std::cout<<"长度" <<reply_length<<std::endl;
            std::cout << "回复:";
            std::cout.write(reply, reply_length);
            std::cout << "\n";
           */
            
            char reply_head[HEAD_LENGTH];
            size_t reply_length = boost::asio::read(sock, boost::asio::buffer(reply_head, HEAD_LENGTH));
            short msglen = 0;
            memcpy(&msglen, reply_head, HEAD_LENGTH);
            char msg[MAX_LENGTH] = { 0 };
            size_t  msg_length = boost::asio::read(sock, boost::asio::buffer(msg, msglen));

            std::cout << "Reply is: ";
            std::cout.write(msg, msglen) ;
            std::cout << "Reply len is " << msglen;
            std::cout << "\n";
            

        }
        catch (std::exception& e)
        {
            std::cerr << "Exception is: " << e.what() << std::endl;
        }
    }
    
    return 0;
}

服务器启动后,启动客户端,然后客户端发送Hello World,服务器收到后打印如下
在这里插入图片描述

七、测试粘包

为了测试粘包,需要制造粘包产生的现象,可以让客户端发送的频率高一些,服务器接收的频率低一些,这样造成前后端收发数据不一致导致多个数据包在服务器tcp缓冲区滞留产生粘包现象
测试粘包之前,在服务器的CSession类里添加打印二进制数据的函数,便于查看缓冲区的数据

void CSession::PrintRecvData(char* data, int length) {
    stringstream ss;
    string result = "0x";
    for (int i = 0; i < length; i++) {
        string hexstr;
        ss << hex << std::setw(2) << std::setfill('0') << int(data[i]) << endl;
        ss >> hexstr;
        result += hexstr;
    }
    std::cout << "receive raw data is : " << result << endl;;
}

然后将这个函数放到HandleRead里,每次收到数据就调用这个函数打印接收到的最原始的数据,然后睡眠2秒再进行收发操作,用来延迟接收对端数据制造粘包,之后的逻辑不变

void CSession::HandleRead(const boost::system::error_code& error, size_t  bytes_transferred, std::shared_ptr<CSession> shared_self){
    if (!error) {
        PrintRecvData(_data, bytes_transferred);
        std::chrono::milliseconds dura(2000);
        std::this_thread::sleep_for(dura);
    }
}

修改客户端逻辑,实现收发分离

int main()
{
    try {
        //创建上下文服务
        boost::asio::io_context   ioc;
        //构造endpoint
        tcp::endpoint  remote_ep(address::from_string("127.0.0.1"), 10086);
        tcp::socket  sock(ioc);
        boost::system::error_code   error = boost::asio::error::host_not_found; ;
        sock.connect(remote_ep, error);
        if (error) {
            cout << "connect failed, code is " << error.value() << " error msg is " << error.message();
            return 0;
        }
        thread send_thread([&sock] {
            for (;;) {
                this_thread::sleep_for(std::chrono::milliseconds(2));
                const char* request = "hello world!";
                size_t request_length = strlen(request);
                char send_data[MAX_LENGTH] = { 0 };
                memcpy(send_data, &request_length, 2);
                memcpy(send_data + 2, request, request_length);
                boost::asio::write(sock, boost::asio::buffer(send_data, request_length + 2));
            }
            });
        thread recv_thread([&sock] {
            for (;;) {
                this_thread::sleep_for(std::chrono::milliseconds(2));
                cout << "begin to receive..." << endl;
                char reply_head[HEAD_LENGTH];
                size_t reply_length = boost::asio::read(sock, boost::asio::buffer(reply_head, HEAD_LENGTH));
                short msglen = 0;
                memcpy(&msglen, reply_head, HEAD_LENGTH);
                char msg[MAX_LENGTH] = { 0 };
                size_t  msg_length = boost::asio::read(sock, boost::asio::buffer(msg, msglen));
                std::cout << "Reply is: ";
                std::cout.write(msg, msglen) << endl;
                std::cout << "Reply len is " << msglen;
                std::cout << "\n";
            }
            });
        send_thread.join();
        recv_thread.join();
    }
    catch (std::exception& e) {
        std::cerr << "Exception: " << e.what() << endl;
    }
    return 0;
}

再次启动服务器和客户端,看到粘包现象了,我们的服务器也能稳定切割数据包并返回正确的消息给客户端。
可以看到服务器收到了大量数据,然后准确切割返回给了客户端。如下图

在这里插入图片描述

总结

该服务虽然实现了粘包处理,但是服务器仍存在不足,比如当客户端和服务器处于不同平台时收发数据会出现异常,根本原因是未处理大小端模式的问题

Logo

助力广东及东莞地区开发者,代码托管、在线学习与竞赛、技术交流与分享、资源共享、职业发展,成为松山湖开发者首选的工作与学习平台

更多推荐