导语:对于服务端来说,异步处理相比同步处理在性能上可能会有成倍的提高,本篇就对taf的异步处理进行一个简单的分析。描述客户端进行异步调用之后,taf的都进行了哪些的处理。
本文主要是项目在架构升级异步的时候,时不时出现死锁现象,因此想了解一下异步处理的时候线程执行逻辑是什么,最后看完了,发现和 taf 无关。
本文介绍基于 taf 协议的异步调用,rpc 和 http 的也基本类似。异步调用是通过 async_XXX 接口,调用时需要传递一个异步回调对象。通过定义 jce 协议,taf 自带工具是可以自动生成相应的异步调用的接口实现。代码如下:
void AppTafProxy::async_doRequest(AppTafPrxCallbackPtr callback,const std::string ¶m,const map<string, string>& context)
{
taf::JceOutputStream<taf::BufferWriter> _os;
_os.write(param, 1);
std::map<string, string> _mStatus;
taf_invoke_async(taf::JCENORMAL,"doRequest", _os.getByteBuffer(), context, _mStatus, callback);
}
在该方法中,通过调用父类 ServantProxy 的 taf_invoke_async 方法进入异步处理。在 taf_invoke_async 中初始化请求的消息体 ReqMessage,设定请求类型为异步、设定异步回调的对象 cb(在异步处理结束之后,调用该 cb 的方法),初始化其他相应必要信息,然后调用 invoke。无论是同步还是异步调用,最后都是会调用 invoke 方法。
在 invoke 方法中会根据同步、异步还是协程进行不同的处理。对于异步调用,还会根据是否开启协程来进行不同的处理,这里分析的是不开启协程的情况下的异步调用逻辑。在 invoke 方法中把请求都会添加到 epoll 的监听事件中,设置相应的 FDinfo。代码如下:
pObjProxy->getCommunicatorEpoll()->notify(pSptd->_iReqQNo, pReqQ); //这里的 pReqQ 就是包含了上一个方法中初始化的 ReqMessage
void CommunicatorEpoll::notify(size_t iSeq,ReqInfoQueue * msgQueue)
{
//因为初次发起该请求,所以 iSeq 位置肯定是不合法,进入 else 生成一个新的元素
if(_notify[iSeq].bValid)
{
_ep.mod(_notify[iSeq].notify.getfd(),(long long)&_notify[iSeq].stFDInfo, EPOLLIN);
assert(_notify[iSeq].stFDInfo.p == (void*)msgQueue);
}
else
{
_notify[iSeq].stFDInfo.iType = FDInfo::ET_C_NOTIFY;
_notify[iSeq].stFDInfo.p=(void*)msgQueue;
_notify[iSeq].stFDInfo.fd = _notify[iSeq].eventFd;
_notify[iSeq].stFDInfo.iSeq = iSeq;
_notify[iSeq].notify.createSocket();
_notify[iSeq].bValid=true;
_ep.add(_notify[iSeq].notify.getfd(),(long long)&_notify[iSeq].stFDInfo, EPOLLIN);
}
}
加入 epoll 监听事件之后,接下来的逻辑就出现了同步异步的区别:对于同步调用会有一个 ReqMonitor 一直等待直到调用返回;而对于异步调用则会在添加监听之后直接返回。无论是同步还是异步调用都是在 epoll 轮询,发现事件 ready 调用方法处理。下面就是删除了不必要代码之后轮询执行逻辑,如下:
void CommunicatorEpoll::run()
{
TLOGDEBUG("CommunicatorEpoll::run id:"<<syscall(SYS_gettid)<<endl);
ServantProxyThreadData * pSptd = ServantProxyThreadData::getData();
pSptd->_netThreadSeq = (int)_netThreadSeq;
while (!_terminate)
{
//考虑到检测超时等的情况 这里默认就 wait100ms 吧
int num = _ep.wait(_iWaitTimeout);
for (int i = 0; i < num; i)
{
const epoll_event& ev = _ep.get(i);
uint64_t data = ev.data.u64;
if(data == 0) continue; //data 非指针, 退出循环
handle((FDInfo*)data, ev.events);
}
doTimeout();
doStat();
}
}
void CommunicatorEpoll::handle(FDInfo * pFDInfo, uint32_t events)
{
if(FDInfo::ET_C_NOTIFY == pFDInfo->iType)
{
ReqInfoQueue * pInfoQueue=(ReqInfoQueue*)pFDInfo->p;
ReqMessage * msg = NULL;
while(pInfoQueue->pop_front(msg))
{
msg->pObjectProxy->invoke(msg);
}
}
else
{
//handle events:EPOLLIN EPOLLOUT EPOLLRDHUP EPOLLERR
Transceiver *pTransceiver = (Transceiver*)pFDInfo->p;
if (events & EPOLLIN)
{
handleInputImp(pTransceiver);
}
}
}
事件 ready 之后的调用 handle 方法。在 handle 方法中会根据具体的事件类型执行不同的逻辑。根据上面 invoke 方法中添加事件代码,当发起异步调用的时候设定的 FDInfo 类型,进入 ET_C_NOTIFY
分支。调用逻辑是通过 pObjectProxy->invoke 中调用 pAdapterProxy->invoke,最后的逻辑执行是在 AdapterProxy 中。
在 AdapterProxy 中 invoke,首先检查当前队列大小,如果大于最大值,则调用 finishInvoke 方法后直接返回;否则把数据交给网络传输_pTrans 发送数据。无论是否发送成功,都会把请求加入_pTimeoutQueue 队列等待处理。此时你调用的接口刚把数据发送出去,然后 epoll 继续在后台监听。
int AdapterProxy::invoke(ReqMessage * msg)
{
if(_pTimeoutQueue->getSendListSize() >= _sendQueueLimit)
{
msg->eStatus = ReqMessage::REQ_EXC;
finishInvoke(msg);
return 0;
}
//生成 requestid taf 调用 而且 不是单向调用
if(!msg->bFromRpc)
{
msg->request.iRequestId = _pObjectProxy->generateId();
}
_pObjectProxy->getProxyProtocol().requestFunc(msg->request,msg->sReqData);
//交给连接发送数据 发送数据成功
if(_pTimeoutQueue->sendListEmpty()
&& _pTrans->sendRequest(msg->sReqData.c_str(),msg->sReqData.size()) != Transceiver::eRetError)
{
if(msg->eType == ReqMessage::ONE_WAY)
{
delete msg;
return 0;
}
bool bFlag = _pTimeoutQueue->push(msg, msg->request.iRequestId, msg->request.iTimeout msg->iBeginTime);
if(!bFlag)
{
msg->eStatus = ReqMessage::REQ_EXC;
finishInvoke(msg);
}
}
else
{
bool bFlag = _pTimeoutQueue->push(msg,msg->request.iRequestId, msg->request.iTimeout msg->iBeginTime, false);
if(!bFlag)
{
msg->eStatus = ReqMessage::REQ_EXC;
finishInvoke(msg);
}
}
return 0;
}
CommunicatorEpol 的 run 方法在上面已经有简单说明,定期检查监听的套接字,事件 ready 之后,调用 handle 方法处理事件,判断事件发生的类型。上面数据在_
pTrans 发送成功之后,调用的接口处理完成会发送回包,然后 epoll 监听事件 ready 继续调用 handle 方法,这次发现是数据进来的事件调用 handleInputImp
接受数据。
void CommunicatorEpoll::handleInputImp(Transceiver * pTransceiver)
{
if(pTransceiver->isConnecting())
{
int iVal = 0;
socklen_t iLen = static_cast<socklen_t>(sizeof(int));
if (::getsockopt(pTransceiver->fd(), SOL_SOCKET, SO_ERROR, reinterpret_cast<char*>(&iVal), &iLen) == -1 || iVal)
{
pTransceiver->close();
pTransceiver->getAdapterProxy()->addConnExc(true);
return;
}
pTransceiver->setConnected();
}
list<ResponsePacket> done;
if(pTransceiver->doResponse(done) > 0)
{
list<ResponsePacket>::iterator it = done.begin();
for (; it != done.end(); it)
{
pTransceiver->getAdapterProxy()->finishInvoke(*it);
}
}
}
无论是同步还是异步请求,请求处理最后都是回到了 AdapterProxy 的方法 finishInvoke。对于同步则是会唤醒之前的 ReqMonitor,通知处理结束;对于异步则是把请求处理结果塞回异步处理线程中的队列,异步处理线程一直监听自己任务队列,发现有任务则取出消息体进行处理。
需要注意的是在向异步线程任务队列 push 任务的时候是平均分配的,没有考虑每个异步处理线程的负载,也不考虑是那个业务线程 push 进来的(意思是同样一个业务线程接受到请求,可能一直是同一个异步处理线程处理,也可能多个异步线程同时处理来自同一个业务线程的请求)。我们遇到的多线程数据共享的问题就是来自于这里,看完源码才想通。
void AdapterProxy::finishInvoke(ReqMessage * msg)
{
if(msg->eType == ReqMessage::ONE_WAY)
{
delete msg;
return ;
}
//stat 上报调用统计
stat(msg);
if(msg->eType == ReqMessage::SYNC_CALL)
{
//handle sync,wake monitor
return ;
}
if(msg->eType == ReqMessage::ASYNC_CALL)
{
if(!msg->bCoroFlag)
{
if(msg->callback->getNetThreadProcess())
{
//如果是本线程的回调,直接本线程处理,比如获取 endpoint
ReqMessagePtr msgPtr = msg;
msg->callback->onDispatch(msgPtr);
}
else
{
//异步回调,放入回调处理线程中
_pObjectProxy->getCommunicatorEpoll()->pushAsyncThreadQueue(msg);
}
}
else
{
//开启协程
}
}
return;
}
通过阅读异步处理线程可以发现,处理逻辑是调用消息体里的回调对象的 onDispatch 方法,而该回调对象则是你最初进行异步调用的时候传递进去的。对于 onDispatch 则会根据你定义的异步对象的不同执行逻辑也不同。
void AsyncProcThread::run()
{
while (!_terminate)
{
ReqMessage * msg;
//异步请求回来的响应包处理
if(_msgQueue->empty())
{
TC_ThreadLock::Lock lock(*this);
timedWait(1000);
}
if (_msgQueue->pop_front(msg))
{
//从回调对象把线程私有数据传递到回调线程中
ServantProxyThreadData * pServantProxyThreadData = ServantProxyThreadData::getData();
if(msg->adapter)
{
snprintf(pServantProxyThreadData->_szHost, sizeof(pServantProxyThreadData->_szHost), "%s", msg->adapter->endpoint().desc().c_str());
}
ReqMessagePtr msgPtr = msg;
msg->callback->onDispatch(msgPtr);
}
}
}
这样整个异步处理逻辑就结束了。
如果你业务需要调用另外一个服务的时候,那么使用异步会很明显提升性能的。主线程接受到请求之后执行一些业务必要逻辑处理就调用异步处理直接返回了,比较耗时网络 io 和之后的处理都放在了异步处理线程,主线程可以解放出来继续接受接下来的链接。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。