设想下这样的情形:
基于上面情形,Transceiver引入了两个TC_Buffer,分别作为发送缓存_sendBuffer和接收缓存_recvBuffer。
当Transceiver使用sendRequest发送数据时:
上述流程见图1
关键代码如下:
int Transceiver::sendRequest(const char * pData, size_t iSize, bool forceSend)
{
//buf不为空,直接返回失败
//等buffer可写了,epoll会通知写时间
if(!_sendBuffer.IsEmpty())
{
return eRetError;
}
int iRet = this->send(pData,iSize,0);
//失败,直接返回
if(iRet < 0)
{
return eRetError;
}
//没有全部发送完,写buffer 返回成功
if(iRet < (int)iSize)
{
_sendBuffer.PushData(pData+iRet,iSize-iRet);
return eRetFull;
}
return eRetOk;
}
继续看handleOutputImp是如何处理_sendBuffer中数据的,handleOutputImp调用了Transceiver::doRequest来处理上次_sendBuffer中剩下的内容
int Transceiver::doRequest()
{
//buf不为空,先发生buffer的内容
if(!_sendBuffer.IsEmpty())
{
size_t length = 0;
void* data = NULL;
_sendBuffer.PeekData(data, length);
iRet = this->send(data, length, 0);
//失败,直接返回
if(iRet < 0)
{
return iRet;
}
//即使这次也没发送完也没关系,还会保留在_sendBuffer中,下次继续发送
if(iRet > 0)
{
_sendBuffer.Consume(iRet);
if (_sendBuffer.IsEmpty())
_sendBuffer.Shrink();
else
return 0;
}
}
刚才说到,Transceiver使用sendRequest时,如果发现_sendBuffer有内容,就把本次请求放到_timeoutQueue中。这个请求也是在上面的Transceiver::doRequest方法中被发送的,关键代码如下。
int Transceiver::doRequest()
{
//取adapter里面积攒的数据
_adapterProxy->doInvoke();
}
void AdapterProxy::doInvoke()
{
while(!_timeoutQueue->sendListEmpty())
{
ReqMessage * msg = NULL;
_timeoutQueue->getSend(msg);
int iRet = _trans->sendRequest(msg->sReqData.c_str(), msg->sReqData.size());
//发送失败 返回
if(iRet == Transceiver::eRetError)
{
TLOGINFO("[TARS][AdapterProxy::doInvoke fail,errono:" << iRet << endl);
return;
}
//请求发送成功了 处理采样
//...
//发送完成
_timeoutQueue->popSend(msg->eType == ReqMessage::ONE_WAY);
}
}
5.1.2 接收缓存_recvBuffer
_recvBuffer使用相对简单,从套接字得到的数据直接放到_recvBuffer中,然后解析_recvBuffer中的内容即可。
int TcpTransceiver::doResponse(list<ResponsePacket>& done)
{
//第一部分,接收数据放到_recvBuffer中
do
{
_recvBuffer.AssureSpace(8 * 1024);
char stackBuffer[64 * 1024];
struct iovec vecs[2];
vecs[0].iov_base = _recvBuffer.WriteAddr();
vecs[0].iov_len = _recvBuffer.WritableSize();
vecs[1].iov_base = stackBuffer;
vecs[1].iov_len = sizeof stackBuffer;
if ((iRet = this->readv(vecs, 2)) > 0)
{
if (static_cast<size_t>(iRet) <= vecs[0].iov_len)
{
_recvBuffer.Produce(iRet);
}
else
{
_recvBuffer.Produce(vecs[0].iov_len);
size_t stackBytes = static_cast<size_t>(iRet) - vecs[0].iov_len;
_recvBuffer.PushData(stackBuffer, stackBytes);
}
}
}
while (iRet>0);
//第二部分,反序列化_recvBuffer中的内容,结构化为ResponsePacket供上层应用使用
if(!_recvBuffer.IsEmpty())
{
try
{
const char* data = _recvBuffer.ReadAddr();
size_t len = _recvBuffer.ReadableSize();
size_t pos = 0;
ProxyProtocol& proto = _adapterProxy->getObjProxy()->getProxyProtocol();
if (proto.responseExFunc)
{
long id = _adapterProxy->getId();
pos = proto.responseExFunc(data, len, done, (void*)id);
}
else
{
pos = proto.responseFunc(data, len, done);
}
if(pos > 0)
{
{
_recvBuffer.Consume(pos);
if (_recvBuffer.Capacity() > 8 * 1024 * 1024)
_recvBuffer.Shrink();
}
}
}
}
return done.empty()?0:1;
}
_timeoutQueue定义在AdapterProxy中
std::unique_ptr<TC_TimeoutQueueNew<ReqMessage*>> _timeoutQueue;
引入_timeoutQueue的意义在于解耦请求发送和结果接收。对于一个线程来说,发送完请求就完成了任务,不用额外等待结果,将事情交给epoll去主动调度。
//请求放入队列中
int AdapterProxy::invoke(ReqMessage * msg)
{
bool bFlag = _timeoutQueue->push(msg, msg->request.iRequestId, msg->request.iTimeout + msg->iBeginTime);
}
//接收到结果后,根据iRequestId找到对应的msg
void AdapterProxy::finishInvoke(ResponsePacket & rsp)
{
ReqMessage * msg = NULL;
else
{
//这里的队列中的发送链表中的数据可能已经在timeout的时候删除了
bool retErase = _timeoutQueue->erase(rsp.iRequestId, msg);
//找不到此请求id信息
if (!retErase)
{
if(_timeoutLogFlag)
{
TLOGERROR("[TARS][AdapterProxy::finishInvoke(ResponsePacket) objname:"<< _objectProxy->name() << ",get req-ptr NULL,may be timeout,id:" << rsp.iRequestId
<< ",desc:" << _endpoint.desc() << endl);
}
return ;
}
assert(msg->eStatus == ReqMessage::REQ_REQ);
msg->eStatus = ReqMessage::REQ_RSP;
}
//将结果赋给msg的response
msg->response = rsp;
finishInvoke(msg);
}
void CommunicatorEpoll::run()
{
while (!_terminate)
{
doTimeout();
}
}
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。