检查可发送该线程的相关联的所有会话上的发送缓冲区的数据,检查完后,发送会话上的发送缓冲区的数据。
VOID ExecSockDataMgr::CheckSendSessionBuffers(PEXECDATASENDTHREAD pSendThread)
{
int nErr, nRemainSize;
char *pBuffer;
PRUNGATEUSERSESSION *pSessionList = m_SessionList;
PRUNGATEUSERSESSION pSession;
pSendThread->boSendEWouldBlock = false;
pSendThread->boSendFewBuffer = false;
//优化锁处理
//(1)在开始时就分配65535的会话列表大小,在一个逻辑网关上不会有超出这个大小的有效会话数量(所以会话列表的操作不需要加没有加会话列表锁)
//(2)会话的释放会置空会话列表的成员,会话的nSocket 是在最后才初始化的,所以可以判断pSession->nSocket != INVALID_SOCKET 来判断会话的初始化
//后的有效性,根据pSession->boMarkToClose和pSession->boRemoteClosed 来判断会话是否被关闭(所以不需要加会话锁)
//(3)发送失败会标记该会话被关闭
INT_PTR nCount = m_SessionList.count();
for ( INT_PTR nIndex = pSendThread->nThreadIdx; nIndex < nCount; nIndex += m_nSendThreadCount )
{
pSession = pSessionList[nIndex];
if (!pSession)
continue;
if ( pSession->nSocket != INVALID_SOCKET && !pSession->boMarkToClose && !pSession->boRemoteClosed )//检查该会话处于正常状态
{
if ( !pSession->boSendAvaliable )
{
if ( _getTickCount() >= pSession->dwSendTimeOut )//检查会话的发送时间(等到发送的时间再发送)
{
pSession->boSendAvaliable = true;//到了可发送时间则标记可发送
pSession->dwSendTimeOut = 0;
}
else continue;
}
// 这里可以不加会话锁,因为数据接收处理线程回收会话资源是根据关闭标识并延时10s的(这里的锁需要验证),发送缓冲区的数据操作在本线程内
//(如果实在要加锁,对于数据接收处理线程会修改会话,因为不会有其他发送线程使用该会话,所以可以是互斥量)
if ( TRYLOCK_SESSION_SEND( pSession ) )
{
nRemainSize = pSession->SendBuf.nOffset;//该会话的缓冲区的有效数据大小
if ( nRemainSize > 4096 * 1024 )//发送缓冲区过长的会话会被关闭(这是异常现象,一般不会出现)
{
UNLOCK_SESSION_SEND( pSession );
CloseSession( pSession );
nRemainSize = 0;
logWarn("关闭了一个发送数据队列大于4MB的连接。");
continue;
}
if ( nRemainSize )
{
pBuffer = pSession->SendBuf.lpBuffer;
#ifdef LINUX
nErr = ::send( pSession->nSocket, pBuffer, nRemainSize, MSG_NOSIGNAL);//禁止对端连接关闭时,send()函数向系统发送异常消息brokenpipe
#else
nErr = ::send( pSession->nSocket, pBuffer, nRemainSize, 0 );
#endif
if ( nErr > 0 )//发送成功
{
pSession->nSendPacketCount++;
InterlockedExchangeAdd( (LONG*)&m_dwWaitSendUserSize, -nErr );
InterlockedExchangeAdd( (LONG*)&m_dwSendUserSize, nErr );
if ( nErr < nRemainSize )//如果还有剩余发送数据则把剩余发送数据拷贝到该会话的缓冲区的前部
{
pSendThread->boSendFewBuffer = true;
memcpy( pBuffer, &pBuffer[nErr], nRemainSize - nErr );
nRemainSize -= nErr;
pBuffer[nRemainSize] = 0;
pSession->SendBuf.nOffset = nRemainSize;
}
else
{
pBuffer[0] = 0;
pSession->SendBuf.nOffset = 0;
}
}
else if ( !nErr || WSAGetLastError() != WSAEWOULDBLOCK )//对方关闭了套接字
{
pSession->boRemoteClosed = true;
CloseSession( pSession );
InterlockedExchangeAdd( (LONG*)&m_dwWaitSendUserSize, -nRemainSize );
InterlockedExchangeAdd( (LONG*)&m_dwSendUserSize, nRemainSize );
pBuffer[0] = 0;
pSession->SendBuf.nOffset = 0;
}
else//系统发送缓冲区已满则延时发送
{
pSession->boSendAvaliable = false;
pSession->dwSendTimeOut = _getTickCount() + RUNGATE_SENDCHECK_TIMEOUT;//目前300ms作为发送间隔
pSendThread->boSendEWouldBlock = true;
}
}
UNLOCK_SESSION_SEND( pSession );
}
}
else
{
//会话关闭后减少待发送数据统计值
if ( (nRemainSize = pSession->SendBuf.nOffset) )
{
InterlockedExchangeAdd( (LONG*)&m_dwWaitSendUserSize, -nRemainSize );//减少该逻辑网关上的待发送给用户的有效数据大小
pSession->SendBuf.nOffset = 0;
}
}
}
}