一些概念:
同步和异步
同步和异步是针对应用程序和内核的交互而言的,同步指的是用户进程触发I/O操作并等待或者轮询的去查看I/O操作是否就绪,而异步是指用户进程触发I/O操作以后便开始做自己的事情,而当I/O操作已经完成的时候会得到I/O完成的通知。
阻塞和非阻塞
阻塞和非阻塞是针对于进程在访问数据的时候,根据I/O操作的就绪状态来采取的不同方式,说白了是一种读取或者写入操作函数的实现方式,阻塞方式下读取或者写入函数将一直等待,而非阻塞方式下,读取或者写入函数会立即返回一个状态值。
服务器端几种模型:
1、阻塞式模型(blocking IO)
我们第一次接触到的网络编程都是从 listen()、accpet()、send()、recv() 等接口开始的。使用这些接口可以很方便的构建C/S的模型。这里大部分的 socket 接口都是阻塞型的。所谓阻塞型接口是指系统调用(一般是 IO 接口)不返回调用结果并让当前线程一直阻塞,只有当该系统调用获得结果或者超时出错时才返回。
如下面一个简单的Server端实现:
#include <Winsock2.h>
#include <cstdio>
#include <iostream>
#include <string>
using namespace std;
#pragma comment(lib,"ws2_32.lib")
int init_win_socket()
{
WSADATA wsaData;
if(WSAStartup(MAKEWORD(2,2) , &wsaData ) != 0)
{
return -1;
}
return 0;
}
#define Server_Port 10286
void handle_client(int newfd)
{
while(1)
{
char buff[1024];
memset(buff,0,1024);
int result = recv(newfd,buff,1024,0);
if(result <= 0)
{
break;
}
else
{
printf("Receive Data %s, Size: %d \n",buff,result);
int ret = send(newfd,buff,result,0);
if(ret>0)
{
printf("Send Data %s, Size: %d \n",buff,ret);
}
else
{
break;
}
}
}
closesocket(newfd);
return;
}
int run()
{
int listener;
struct sockaddr_in addr_server;
listener = socket(AF_INET, SOCK_STREAM, 0);
//addr_server.sin_addr.S_un.S_addr = inet_addr("127.0.0.1");
addr_server.sin_addr.S_un.S_addr = ADDR_ANY;
addr_server.sin_family = AF_INET;
addr_server.sin_port = htons(Server_Port);
if(bind(listener,(const sockaddr *)&addr_server,sizeof(addr_server)) < 0)
{
perror("bind error");
return -1;
}
if (listen(listener, 10)<0)
{
perror("listen error");
return -1;
}
printf("Server is listening ... \n");
bool runing = true;
while(runing)
{
sockaddr_in addr_client;
int clientlen = sizeof(addr_client);
int client_sock;
if ((client_sock = accept(listener, (struct sockaddr *) &addr_client, &clientlen)) < 0)
{
printf("Failed to accept client connection \n");
}
fprintf(stdout, "Client connected: %s \n", inet_ntoa(addr_client.sin_addr));
/*Handle this connect */
handle_client(client_sock);
}
closesocket(listener);
return 0;
}
int main(int c, char **v)
{
#ifdef WIN32
init_win_socket();
#endif
run();
getchar();
return 0;
}
示意图如下:
这里的socket的接口是阻塞的(blocking),在线程被阻塞期间,线程将无法执行任何运算或响应任何的网络请求,这给多客户机、多业务逻辑的网络编程带来了挑战。
2、多线程的服务器模型(Multi-Thread)
应对多客户机的网络应用,最简单的解决方式是在服务器端使用多线程(或多进程)。多线程(或多进程)的目的是让每个连接都拥有独立的线程(或进程),这样任何一个连接的阻塞都不会影响其他的连接。
多线程Server端的实现:
#include <Winsock2.h>
#include <cstdio>
#include <iostream>
#include <string>
using namespace std;
#pragma comment(lib,"ws2_32.lib")
int init_win_socket()
{
WSADATA wsaData;
if(WSAStartup(MAKEWORD(2,2) , &wsaData ) != 0)
{
return -1;
}
return 0;
}
#define Server_Port 10286
DWORD WINAPI handle_client(LPVOID lppara)
{
int *newfd = (int *)lppara;
while(1)
{
char buff[1024];
memset(buff,0,1024);
int result = recv(*newfd,buff,1024,0);
if(result <= 0)
{
break;
}
else
{
printf("Receive Data %s, Size: %d \n",buff,result);
int ret = send(*newfd,buff,result,0);
if(ret>0)
{
printf("Send Data %s, Size: %d \n",buff,ret);
}
else
{
break;
}
}
Sleep(10);
}
closesocket(*newfd);
return 0;
}
int run()
{
int listener;
struct sockaddr_in addr_server;
int sock_clients[1024]; //max number for accept client connection;
listener = socket(AF_INET, SOCK_STREAM, 0);
//addr_server.sin_addr.S_un.S_addr = inet_addr("127.0.0.1");
addr_server.sin_addr.S_un.S_addr = ADDR_ANY;
addr_server.sin_family = AF_INET;
addr_server.sin_port = htons(Server_Port);
if(bind(listener,(const sockaddr *)&addr_server,sizeof(addr_server)) < 0)
{
perror("bind error");
return -1;
}
if (listen(listener, 10)<0)
{
perror("listen error");
return -1;
}
printf("Server is listening ... \n");
int fd_count = 0;
bool runing = true;
while(runing)
{
sockaddr_in addr_client;
int clientlen = sizeof(addr_client);
int client_sock;
if ((client_sock = accept(listener, (struct sockaddr *) &addr_client, &clientlen)) < 0)
{
printf("Failed to accept client connection \n");
}
fprintf(stdout, "Client connected: socket fd %d , %s \n", client_sock,inet_ntoa(addr_client.sin_addr));
/*Handle this connect */
if(fd_count<1024)
{
sock_clients[fd_count] = client_sock;
if(CreateThread(NULL,0,handle_client,&sock_clients[fd_count],0,NULL)==NULL)
return -1;
++ fd_count;
}
Sleep(10);
}
closesocket(listener);
return 0;
}
int main(int c, char **v)
{
#ifdef WIN32
init_win_socket();
#endif
run();
getchar();
return 0;
}
上述多线程的服务器模型可以解决一些连接量不大的多客户端连接请求,但是如果要同时响应成千上万路的连接请求,则无论多线程还是多进程都会严重占据系统资源,降低系统对外界响应效率。
在多线程的基础上,可以考虑使用“线程池”或“连接池”,“线程池”旨在减少创建和销毁线程的频率,其维持一定合理数量的线程,并让空闲的线程重新承担新的执行任务。“连接池”维持连接的缓存池,尽量重用已有的连接、减少创建和关闭连接的频率。这两种技术都可以很好的降低系统开销,都被广泛应用很多大型系统。
3、非阻塞式模型(Non-blocking IO)
非阻塞的接口相比于阻塞型接口的显著差异在于,在被调用之后立即返回。
非阻塞型IO的示意图如下:
从应用程序的角度来说,blocking read 调用会延续很长时间。在内核执行读操作和其他工作时,应用程序会被阻塞。
非阻塞的IO可能并不会立即满足,需要应用程序调用许多次来等待操作完成。这可能效率不高,因为在很多情况下,当内核执行这个命令时,应用程序必须要进行忙碌等待,直到数据可用为止。
另一个问题,在循环调用非阻塞IO的时候,将大幅度占用CPU,所以一般使用select等来检测”是否可以操作“。
4、多路复用IO
支持I/O复用的系统调用有select、poll、epoll、kqueue等,
这里以Select函数为例,select函数用于探测多个文件句柄的状态变化,以下为一个使用了使用了Select函数的Server实现:
#include <Winsock2.h>
#include <cstdio>
#include <cstdlib>
#include <cassert>
#include <iostream>
#include <string>
using namespace std;
#pragma comment(lib,"ws2_32.lib")
int init_win_socket()
{
WSADATA wsaData;
if(WSAStartup(MAKEWORD(2,2) , &wsaData ) != 0)
{
return -1;
}
return 0;
}
#define Server_Port 10286
#define MAX_LINE 16384
#define FD_SETSIZE 1024
struct fd_state
{
char buffer[MAX_LINE];
size_t buffer_used;
int writing;
size_t n_written;
size_t write_upto;
};
struct fd_state * alloc_fd_state(void)
{
struct fd_state *state = (struct fd_state *)malloc(sizeof(struct fd_state));
if (!state)
return NULL;
state->buffer_used = state->n_written = state->writing =
state->write_upto = 0;
memset(state->buffer,0,MAX_LINE);
return state;
}
void free_fd_state(struct fd_state *state)
{
free(state);
}
int set_socket_nonblocking(int fd)
{
unsigned long mode = 1;
int result = ioctlsocket(fd, FIONBIO, &mode);
if (result != 0)
{
return -1;
printf("ioctlsocket failed with error: %ld\n", result);
}
return 0;
}
int do_read(int fd, struct fd_state *state)
{
char buf[1024];
int i;
int result;
while (1)
{
memset(buf,0,1024);
result = recv(fd, buf, sizeof(buf), 0);
if (result <= 0)
break;
for (i=0; i < result; ++i)
{
if (state->buffer_used < sizeof(state->buffer))
state->buffer[state->buffer_used++] = buf[i];
}
}
state->writing = 1;
state->write_upto = state->buffer_used;
printf("Receive data: %s size: %d\n",state->buffer+state->n_written,state->write_upto-state->n_written);
if (result == 0)
{
return 1;
}
else if (result < 0)
{
#ifdef WIN32
if (result == -1 && WSAGetLastError()==WSAEWOULDBLOCK)
return 0;
#else
if (errno == EAGAIN)
return 0;
#endif
return -1;
}
return 0;
}
int do_write(int fd, struct fd_state *state)
{
while (state->n_written < state->write_upto)
{
int result = send(fd, state->buffer + state->n_written,
state->write_upto - state->n_written, 0);
if (result < 0)
{
#ifdef WIN32
if (result == -1 && WSAGetLastError()==WSAEWOULDBLOCK)
return 0;
#else
if (errno == EAGAIN)
return 0;
#endif
return -1;
}
assert(result != 0);
printf("Send data: %s \n",state->buffer+ state->n_written);
state->n_written += result;
}
if (state->n_written == state->buffer_used)
state->n_written = state->write_upto = state->buffer_used = 0;
state->writing = 0;
return 0;
}
void run()
{
int listener;
struct fd_state *state[FD_SETSIZE];
struct sockaddr_in sin;
int i, maxfd;
fd_set readset, writeset, exset;
sin.sin_family = AF_INET;
sin.sin_addr.s_addr = 0;
sin.sin_port = htons(Server_Port);
for (i = 0; i < FD_SETSIZE; ++i)
state[i] = NULL;
listener = socket(AF_INET, SOCK_STREAM, 0);
set_socket_nonblocking(listener);
int one = 1;
setsockopt(listener, SOL_SOCKET, SO_REUSEADDR,(const char *)&one, sizeof(one));
if (bind(listener, (struct sockaddr*)&sin, sizeof(sin)) < 0)
{
perror("bind");
return;
}
if (listen(listener, 16)<0)
{
perror("listen");
return;
}
printf("Server is listening ... \n");
FD_ZERO(&readset);
FD_ZERO(&writeset);
FD_ZERO(&exset);
while (1)
{
maxfd = listener;
FD_ZERO(&readset);
FD_ZERO(&writeset);
FD_ZERO(&exset);
FD_SET(listener, &readset);
for (i=0; i < FD_SETSIZE; ++i)
{
if (state[i])
{
if (i > maxfd)
maxfd = i;
FD_SET(i, &readset);
if (state[i]->writing)
{
FD_SET(i, &writeset);
}
}
}
if (select(maxfd+1, &readset, &writeset, &exset, NULL) < 0)
{
perror("select");
return;
}
//check if listener can accept
if (FD_ISSET(listener, &readset))
{
struct sockaddr_in ss;
int slen = sizeof(ss);
int fd = accept(listener, (struct sockaddr*)&ss, &slen);
if (fd < 0)
{
perror("accept");
}
else if(fd > FD_SETSIZE)
{
closesocket(fd);
}
else
{
printf("Accept socket %d, address %s \n",fd,inet_ntoa(ss.sin_addr));
set_socket_nonblocking(fd);
state[fd] = alloc_fd_state();
assert(state[fd]);
}
}
//process read and write socket
for (i=0; i < maxfd+1; ++i)
{
int r = 0;
if (i == listener)
continue;
if (FD_ISSET(i, &readset))
{
r = do_read(i, state[i]);
}
if (r == 0 && FD_ISSET(i, &writeset))
{
r = do_write(i, state[i]);
}
if (r)
{
free_fd_state(state[i]);
state[i] = NULL;
closesocket(i);
}
}
}
}
int main(int c, char **v)
{
#ifdef WIN32
init_win_socket();
#endif
run();
return 0;
}
示意图如下:
这里Select监听的socket都是Non-blocking的,所以在do_read() do_write()中对返回为EAGAIN/WSAEWOULDBLOCK都做了处理。
从代码中可以看出使用Select返回后,仍然需要轮训再检测每个socket的状态(读、写),这样的轮训检测在大量连接下也是效率不高的。因为当需要探测的句柄值较大时,select () 接口本身需要消耗大量时间去轮询各个句柄。
很多操作系统提供了更为高效的接口,如 linux 提供 了 epoll,BSD 提供了 kqueue,Solaris 提供了 /dev/poll …。如果需要实现更高效的服务器程序,类似 epoll 这样的接口更被推荐。遗憾的是不同的操作系统特供的 epoll 接口有很大差异,所以使用类似于 epoll 的接口实现具有较好跨平台能力的服务器会比较困难。