前面3节介绍了整体设计思路,下面的章节来一起浏览下实现细节
从官方自带的例子TarsCpp/examples/QuickStartDemo/HelloServer/AsyncClient/main.cpp开始
//main.cpp
int main(int argc,char ** argv)
{
//封装了CommunicatorEpoll、AsyncProcThread的实现
Communicator comm;
try
{
/*
4.1 准备阶段,初始化了四大组件:
CommunicatorEpoll、AsyncProcThread、ServantProxy和ObjectProxy
*/
//HelloPrx 继承了ServantProxy,是一个ServantProxy
HelloPrx prx;
comm.stringToProxy("TestApp.HelloServer.HelloObj@tcp -h 10.120.129.226 -p 20001" , prx);
try
{
string sReq("hello world");
HelloPrxCallbackPtr cb = new HelloCallBack();
/*
4.2 具体调用环节
*/
prx->async_testHello(cb, sReq);
cout<<" sReq:"<<sReq<<endl;
}
…………
}
//Communicator.cpp
ServantProxy * Communicator::getServantProxy(const string& objectName,const string& setName)
{
//initialize初始化了CommunicatorEpoll、AsyncProcThread
Communicator::initialize();
//返回封装好的ServantProxy
return _servantProxyFactory->getServantProxy(objectName,setName);
}
//Communicator.cpp
void Communicator::initialize()
{
…………
//初始化CommunicatorEpoll
for(size_t i = 0; i < _clientThreadNum; ++i)
{
_communicatorEpoll[i] = new CommunicatorEpoll(this, i);
_communicatorEpoll[i]->start();
}
…………
}
//CommunicatorEpoll.cpp
CommunicatorEpoll::CommunicatorEpoll(Communicator * pCommunicator,size_t netThreadSeq)
…………
{
…………
//创建异步线程
//注意每个异步线程里都有一个异步队列ReqInfoQueue
for(size_t i = 0; i < _asyncThreadNum; ++i)
{
_asyncThread[i] = new AsyncProcThread(iAsyncQueueCap);
_asyncThread[i]->start();
}
//初始化请求的事件通知
//_notify提供了一个人为控制epoll触发机制的入口
for(size_t i = 0; i < MAX_CLIENT_NOTIFYEVENT_NUM; ++i)
{
_notify[i].bValid = false;
}
…………
}
在AsyncProcThread中,有一个无锁的异步队列ReqInfoQueue(TC_LoopQueue),CommunicatorEpoll线程执行AsyncProcThread::push_back操作,AsyncProcThread线程中的run方法执行pop_front操作,但队列本身是不加锁的。只有在一生产者一消费者场景中才适用这种无锁队列。
在TC_LoopQueue中,push_back操作的是_iBegin,pop_front操作的是_iEnd,因为只有一读(push_back)一写(pop_front)两个线程,两个线程操作的不是同一个变量,因此不会有竞态关系产生。
//ObjectProxyFactory.cpp
ObjectProxy * ObjectProxyFactory::getObjectProxy(const string& sObjectProxyName,const string& setName)
{
…………
ObjectProxy * pObjectProxy = new ObjectProxy(_communicatorEpoll, sObjectProxyName,setName);
…………
return pObjectProxy;
}
如图1所示,如果一个ServantProxy有多个ObjectProxy,也就意味着它对应着多个CommunicatorEpoll1,正常情况下它会以轮询方式把多次请求放到不同的ObjectProxy中,见下面代码:
//ServantProxy.cpp
void ServantProxy::selectNetThreadInfo(ServantProxyThreadData * pSptd, ObjectProxy * & pObjProxy, ReqInfoQueue * & pReqQ)
{
…………
{
//用线程的私有数据来保存选到的seq
pObjProxy = *(_objectProxy + pSptd->_netSeq);
pReqQ = pSptd->_reqQueue[pSptd->_netSeq];
pSptd->_netSeq++;
if(pSptd->_netSeq == _objectProxyNum)
pSptd->_netSeq = 0;
}
}
}
如果是同步请求,ServantProxy所在线程会同步阻塞,异步请求会直接结束。见下面代码:
//ServantProxy.cpp
void ServantProxy::invoke(ReqMessage * msg, bool bCoroAsync)
{
…………
//通知CommunicatorEpoll
pObjProxy->getCommunicatorEpoll()->notify(pSptd->_reqQNo, pReqQ);
//异步调用 另一个线程delele msg 如果是异步后面不能再用msg了
if(bSync)
{
if(!msg->bCoroFlag)
{
if(!msg->bMonitorFin)
{
TC_ThreadLock::Lock lock(*(msg->pMonitor));
//等待直到网络线程通知过来
if(!msg->bMonitorFin)
{
msg->pMonitor->wait();
}
}
}
else
{
msg->sched->yield(false);
}
…………
}
}
ServantProxy把msg消息通知到CommunicatorEpoll1中,利用epoll模型完成请求和结果的处理
(1)pObjectProxy->invoke里最终调用的AdapterProxy::invoke方法来完成请求发送,下面列出主要代码节点
int AdapterProxy::invoke(ReqMessage * msg)
{
//生成requestid
//tars调用 而且 不是单向调用
if(!msg->bFromRpc)
{
msg->request.iRequestId = _objectProxy->generateId();
}
//对请求进行协议转化
_objectProxy->getProxyProtocol().requestFunc(msg->request, msg->sReqData);
//交给连接发送数据,连接连上,buffer不为空,直接发送数据成功
//不管是否发送成功,都放到_timeoutQueue队列中
if(_timeoutQueue->sendListEmpty() && _trans->sendRequest(msg->sReqData.c_str(),msg->sReqData.size()) != Transceiver::eRetError)
{
bool bFlag = _timeoutQueue->push(msg, msg->request.iRequestId, msg->request.iTimeout + msg->iBeginTime);
if(!bFlag)
{
TLOGERROR("[TARS][AdapterProxy::invoke fail1 : insert timeout queue fail,queue size:" << _timeoutQueue->size() << ",objname" <<_objectProxy->name() << ",desc" << _endpoint.desc() <<endl);
msg->eStatus = ReqMessage::REQ_EXC;
finishInvoke(msg);
}
}
else
{
TLOGINFO("[TARS][AdapterProxy::invoke push (no send) " << _objectProxy->name() << ", " << _endpoint.desc() << ",id " << msg->request.iRequestId <<endl);
//请求发送失败了
bool bFlag = _timeoutQueue->push(msg,msg->request.iRequestId, msg->request.iTimeout+msg->iBeginTime, false);
if(!bFlag)
{
TLOGERROR("[TARS][AdapterProxy::invoke fail2 : insert timeout queue fail,queue size:" << _timeoutQueue->size() << "," <<_objectProxy->name() << ", " << _endpoint.desc() <<endl);
msg->eStatus = ReqMessage::REQ_EXC;
finishInvoke(msg);
}
}
return 0;
}
(2)当有结果到达时,使用handleInputImp处理结果,handleInputImp调用了AdapterProxy::finishInvoke进行结果处理
void AdapterProxy::finishInvoke(ResponsePacket & rsp)
{
…………
else
{
//根据rsp.iRequestId从_timeoutQueue中取出对应请求的msg
//这里的队列中的发送链表中的数据可能已经在timeout的时候删除了
bool retErase = _timeoutQueue->erase(rsp.iRequestId, msg);
//找不到此请求id信息
if (!retErase)
{
if(_timeoutLogFlag)
{
TLOGERROR("[TARS][AdapterProxy::finishInvoke(ResponsePacket) objname:"<< _objectProxy->name() << ",get req-ptr NULL,may be timeout,id:" << rsp.iRequestId
<< ",desc:" << _endpoint.desc() << endl);
}
return ;
}
assert(msg->eStatus == ReqMessage::REQ_REQ);
msg->eStatus = ReqMessage::REQ_RSP;
}
//将结果放到msg中
msg->response = rsp;
finishInvoke(msg);
}
从_timeoutQueue中找到msg后,使用finishInvoke(msg)方法进行通知,finishInvoke(msg)中包含了对同步和异步的处理
void AdapterProxy::finishInvoke(ReqMessage * msg)
{
//同步调用,唤醒ServantProxy线程
if(msg->eType == ReqMessage::SYNC_CALL)
{
if(!msg->bCoroFlag)
{
assert(msg->pMonitor);
TC_ThreadLock::Lock sync(*(msg->pMonitor));
msg->pMonitor->notify();
msg->bMonitorFin = true;
}
else
{
msg->sched->put(msg->iCoroId);
}
return ;
}
//异步调用
if(msg->eType == ReqMessage::ASYNC_CALL)
{
…………
else
{
//异步回调,放入回调处理线程中
_objectProxy->getCommunicatorEpoll()->pushAsyncThreadQueue(msg);
}
}
else
…………
}
return;
}
assert(false);
return;
}
(3)handleOutputImp调用的是Transceiver::doRequest(),用来解决两个问题,一是Transceiver的发送缓存里如果还有内容,将继续发送出去。二是_timeoutQueue中如果有上次没有发送成功的请求,会在这里重新尝试发送一次,如果再发送失败就放弃
int Transceiver::doRequest()
{
//buf不为空,先发生buffer的内容
if(!_sendBuffer.IsEmpty())
{
size_t length = 0;
void* data = NULL;
_sendBuffer.PeekData(data, length);
iRet = this->send(data, length, 0);
//失败,直接返回
if(iRet < 0)
{
return iRet;
}
if(iRet > 0)
{
_sendBuffer.Consume(iRet);
if (_sendBuffer.IsEmpty())
_sendBuffer.Shrink();
else
return 0;
}
}
//取adapter里面积攒的数据
_adapterProxy->doInvoke();
//object里面应该是空的
assert(_adapterProxy->getObjProxy()->timeoutQSize() == 0);
return 0;
}
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。