原文是C++写的,我改写成了C版本。
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <assert.h>
#include <stdio.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>
#include <fcntl.h>
#include <stdlib.h>
#include <sys/epoll.h>
#include <signal.h>
#include <sys/wait.h>
#include <sys/stat.h>
#include <stdbool.h>
// 连接socket读缓冲区大小
#define BUFFER_SIZE 1024
// 进程池容量
const int MAX_PROCESS_NUMBER = 16;
// 处理连接任务的任务对象的最大数量
const int USER_PER_PROCESS = 65536;
// 最大epoll监听事件数量
const int MAX_EVENT_NUMBER = 10000;
// 进程数组中的进程容器
struct process {
pid_t m_pid;
int m_pipefd[2];
};
// 进程池对象
struct processpoll {
int m_process_number;
int m_idx;
int m_epollfd;
int m_listenfd;
int m_stop;
struct process* m_sub_process;
};
// 连接任务处理对象
struct conn_task {
int m_epollfd;
int m_sockfd;
struct sockaddr_in m_address;
char m_buf[BUFFER_SIZE];
int m_read_idx;
};
// 进程池唯一实例
static struct processpoll* m_instance = NULL;
// 信号管道
static int sig_pipefd[2];
// 非阻塞
static int setnonblocking(int fd) {
int old_option = fcntl(fd, F_GETFL);
int new_option = old_option | O_NONBLOCK;
fcntl(fd, F_SETFL, new_option);
return old_option;
}
// 注册epoll事件
static void addfd(int epollfd, int fd) {
struct epoll_event event;
event.data.fd = fd;
event.events = EPOLLIN | EPOLLET;
epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &event);
setnonblocking(fd);
}
// 移除epoll事件
static void removefd(int epollfd, int fd) {
epoll_ctl(epollfd, EPOLL_CTL_DEL, fd, 0);
close(fd);
}
// 信号处理函数
static void sig_handler(int sig) {
int save_errno = errno;
int msg = sig;
send(sig_pipefd[1], (char*) &msg, 1, 0);
errno = save_errno;
}
// 添加信息函数
static void addsig(int sig, void(handler)(int), bool restart) {
struct sigaction sa;
memset(&sa, '\0', sizeof(sa));
sa.sa_handler = handler;
if (restart) {
sa.sa_flags |= SA_RESTART;
}
sigfillset(&sa.sa_mask);
assert(sigaction(sig, &sa, NULL) != -1);
}
// 创建进程自己的epoll并且监听信号管道 以及添加各类信号处理函数
void setup_sig_pipe() {
m_instance->m_epollfd = epoll_create(5);
assert(m_instance->m_epollfd != -1);
int ret = socketpair(PF_UNIX, SOCK_STREAM, 0, sig_pipefd);
assert(ret != -1);
setnonblocking(sig_pipefd[1]);
addfd(m_instance->m_epollfd, sig_pipefd[0]);
// 注册信号事件函数
addsig(SIGCHLD, sig_handler, true);
addsig(SIGTERM, sig_handler, true);
addsig(SIGINT, sig_handler, true);
// 忽略SIGPIPE
addsig(SIGPIPE, SIG_IGN, true);
}
// 进程池的构造函数
void processpoll_constructor(int listenfd, int process_number) {
m_instance->m_idx = -1;
m_instance->m_listenfd = listenfd;
m_instance->m_process_number = process_number;
assert((process_number > 0) && (process_number <= MAX_PROCESS_NUMBER));
m_instance->m_sub_process = (struct process*)malloc(sizeof(struct process)*process_number);
assert(m_instance->m_sub_process);
for(int i=0;i<process_number;++i) {
// 创建一对父子进程的双向通信管道
int ret = socketpair(PF_UNIX, SOCK_STREAM, 0, m_instance->m_sub_process[i].m_pipefd);
assert(ret == 0);
// 在这里进行fork()
m_instance->m_sub_process[i].m_pid = fork();
assert(m_instance->m_sub_process[i].m_pid >= 0);
if (m_instance->m_sub_process[i].m_pid > 0) {
// 父进程
close(m_instance->m_sub_process[i].m_pipefd[1]);
continue;
} else {
// 子进程
close(m_instance->m_sub_process[i].m_pipefd[0]);
// 每个子进程都有一个m_instance 它的m_idx就是此时的i 代表当前子进程在父进程池中的索引
m_instance->m_idx = i;
break;
}
}
}
// 进程池销毁
void processpoll_free() {
free(m_instance->m_sub_process);
free(m_instance);
}
// 创建进程池
struct processpoll* processpool_create(int listenfd, int process_number) {
if (!m_instance) {
m_instance = (struct processpoll*)malloc(sizeof(struct processpoll));
processpoll_constructor( listenfd, process_number);
}
return m_instance;
}
// 父进程的主体逻辑
void run_parent() {
setup_sig_pipe();
// 父进程负责监听listen socket
addfd(m_instance->m_epollfd, m_instance->m_listenfd);
struct epoll_event events[MAX_EVENT_NUMBER];
int sub_process_counter = 0;
int new_conn = 1;
int number = 0;
int ret = -1;
while(!m_instance->m_stop) {
number = epoll_wait(m_instance->m_epollfd, events, MAX_EVENT_NUMBER, -1);
if (number < 0 && errno != EINTR) {
printf("epoll failture\n");
break;
}
for(int i = 0;i < number; ++i) {
int sockfd = events[i].data.fd;
if (sockfd == m_instance->m_listenfd) {
// 所有进程中选取一个空闲的进程 进程池初始化的时候每个子进程的m_pid都是0 代表是空闲状态
// 从上一轮选择的sub_process_counter开始计数 一个一个轮流选取
int i = sub_process_counter;
do {
if (m_instance->m_sub_process[i].m_pid != -1) {
break;
}
i = (i+1) % m_instance->m_process_number;
} while(i != sub_process_counter);
// 如果最后所有的子进程都退出了 则父进程也退出
if (m_instance->m_sub_process[i].m_pid == -1) {
m_instance->m_stop = true;
break;
}
// 所有已经使用了的子进程的数量更新
sub_process_counter = (i+1)%m_instance->m_process_number;
// 给当前选中的进程 i 发送信号 通过通信管道 告知子进程处理这个新到来的连接
send(m_instance->m_sub_process[i].m_pipefd[0], (char*)&new_conn, sizeof(new_conn), 0);
printf("send request to child %d\n", i);
}
else if (sockfd == sig_pipefd[0] && (events[i].events & EPOLLIN)) {
// 信号捕获逻辑
int sig;
char signals[1024];
ret = recv(sig_pipefd[0], signals, sizeof(signals), 0);
if (ret <= 0) {
continue;
}
else {
for(int i=0;i<ret;++i) {
switch(signals[i]) {
case SIGCHLD:
{
// 有个子进程退出了
pid_t pid;
int stat;
while((pid = waitpid(-1, &stat, WNOHANG)) > 0) {
for(int i=0;i<m_instance->m_process_number;++i) {
if (m_instance->m_sub_process[i].m_pid == pid) {
// 捕获了这个子进程的退出事件
printf("child %d join\n", i);
close(m_instance->m_sub_process[i].m_pipefd[0]);
// 把它的m_pid标记为-1
m_instance->m_sub_process[i].m_pid = -1;
}
}
}
// 如果所有子进程都退出了 那父进程就结束
m_instance->m_stop = true;
for(int i=0;i<m_instance->m_process_number;++i) {
if (m_instance->m_sub_process[i].m_pid != -1) {
m_instance->m_stop = false;
}
}
break;
}
case SIGTERM:
case SIGINT:
{
// 父进程收到了终止信号 杀死所有子进程
printf("kill all the child now\n");
for(int i=0;i<m_instance->m_process_number;++i) {
int pid = m_instance->m_sub_process[i].m_pid;
if (pid != -1) {
kill(pid, SIGTERM);
}
}
break;
}
default:
{
break;
}
}
}
}
}
else {
continue;
}
}
}
close(m_instance->m_epollfd);
}
// 处理连接任务的task
void conn_task_init(struct conn_task* task, int epollfd, int connfd, struct sockaddr_in* client_address) {
task->m_epollfd = epollfd;
task->m_sockfd = connfd;
task->m_address = *client_address;
memset(task->m_buf, '\0', BUFFER_SIZE);
task->m_read_idx = 0;
}
void conn_task_process(struct conn_task* task) {
int idx = 0;
int ret = -1;
while(1) {
idx = task->m_read_idx;
ret = recv(task->m_sockfd, task->m_buf + idx, BUFFER_SIZE - 1 -idx, 0);
if (ret < 0) {
if (errno != EAGAIN) {
removefd(task->m_epollfd, task->m_sockfd);
}
break;
}
else if (ret == 0) {
removefd(task->m_epollfd, task->m_sockfd);
break;
}
else {
// 上一轮已经读取的字符个数
task->m_read_idx += ret;
printf("user content is: %s\n", task->m_buf);
for(;idx < task->m_read_idx;++idx) {
// if (idx >= 1 && task->m_buf[idx - 1] == '\r' && task->m_buf[idx] == '\n') {
// break;
// }
// 这里我用换行符在表示用户输入完毕了
if (idx >= 1 && task->m_buf[idx] == '\n') {
break;
}
}
// 这一行代表还没读取到换行符 需要继续读取
if (idx == task->m_read_idx) {
continue;
}
// task->m_buf[idx - 1] = '\0';
// 替换换行符\n为字符串结束符\0
task->m_buf[idx] = '\0';
// CGI服务器 接受的数据 是一个GCI脚本文件的完整路径
char* file_name = task->m_buf;
// 检查文件状态
if (access(file_name, F_OK) == -1) {
removefd(task->m_epollfd, task->m_sockfd);
break;
}
// 子进程自己fork出一个新的孙子进程来执行CGI脚本
ret = fork();
if (ret == -1) {
removefd(task->m_epollfd, task->m_sockfd);
break;
}
else if (ret > 0) {
// 子进程关闭sockfd即可
removefd(task->m_epollfd, task->m_sockfd);
break;
} else {
// 孙子进程关闭标准输出 也就是1
close(STDOUT_FILENO);
// dup函数选取一个最小的没有被使用的文件描述符来指向 参数中的文件描述符所对应的内核中的文件表
dup(task->m_sockfd);
// 执行对应的CGI脚本 由于孙子进程继承了子进程的文件描述符 这个脚本的标准输出将会被写入到sockfd中 执行完后 孙子进程自己终止 子进程负责回收它
execl(task->m_buf, task->m_buf, 0, NULL);
exit(0);
}
}
}
}
// 子进程的主体逻辑
void run_child() {
// 每个子进程自己要独立处理信号 因为fork的时候信号集被清空了 需要重新设置
setup_sig_pipe();
// 子进程先注册通信管道
int pipefd = m_instance->m_sub_process[m_instance->m_idx].m_pipefd[1];
addfd(m_instance->m_epollfd, pipefd);
struct epoll_event events[MAX_EVENT_NUMBER];
// 初始化每个子进程的任务处理对象数组
struct conn_task* users = (struct conn_task*)malloc(sizeof(struct conn_task)*USER_PER_PROCESS);
assert(users);
int number = 0;
int ret = -1;
while(!m_instance->m_stop) {
number = epoll_wait(m_instance->m_epollfd, events, MAX_EVENT_NUMBER, -1);
if (number < 0 && errno != EINTR) {
printf("epoll failture\n");
break;
}
for(int i = 0;i < number; ++i) {
int sockfd = events[i].data.fd;
if (sockfd == pipefd && (events[i].events & EPOLLIN)) {
// 通信管道有可读数据 意味着有新连接到了
int client = 0;
ret = recv(sockfd, (char*) &client, sizeof(client), 0);
if ((ret < 0 && errno != EAGAIN) || ret == 0) {
continue;
}
else {
struct sockaddr_in client_address;
socklen_t client_addrlength = sizeof(client_address);
// 子进程负责accept这个连接
int connfd = accept(m_instance->m_listenfd, (struct sockaddr*) &client_address, &client_addrlength);
if (connfd < 0) {
printf("errno is: %d\n", errno);
continue;
}
// 并且子进程自己监听这个connfd的事件
addfd(m_instance->m_epollfd, connfd);
// 初始化一个任务处理对象 建立起任务对象和这个连接socket 以及 子进程epollfd 之间的关联
conn_task_init(&users[connfd], m_instance->m_epollfd, connfd, &client_address);
}
}
else if (sockfd == sig_pipefd[0] && (events[i].events & EPOLLIN)) {
// 子进程信号捕获逻辑
int sig;
char signals[1024];
ret = recv(sig_pipefd[0], signals, sizeof(signals), 0);
if (ret <= 0) {
continue;
}
else {
for(int i=0;i<ret;++i) {
switch(signals[i]) {
case SIGCHLD:
{
// 由于这是一个CGI服务器 子进程也会自己fork 需要处理回收自己的子进程
pid_t pid;
int stat;
while((pid = waitpid(-1, &stat, WNOHANG)) > 0) {
continue;
}
break;
}
case SIGTERM:
case SIGINT:
{
// 终止当前子进程
m_instance->m_stop = true;
break;
}
default:
{
break;
}
}
}
}
}
else if (events[i].events & EPOLLIN) {
// 连接socket数据可读了 处理一下它
conn_task_process(&users[sockfd]);
}
else {
continue;
}
}
}
// 释放当前子进程申请的资源
free(users);
users = NULL;
close(pipefd);
close(m_instance->m_epollfd);
}
void processpoll_run() {
if (m_instance->m_idx != -1) {
// 运行子进程的逻辑
run_child();
return;
}
// 运行父进程的逻辑
run_parent();
}
int main(int argc, char *argv[])
{
if (argc < 2) {
printf("usage: %s ip_address port_number\n", basename(argv[0]));
return 1;
}
const char* ip = argv[1];
int port = atoi(argv[2]);
int listenfd = socket(PF_INET, SOCK_STREAM, 0);
assert(listenfd >= 0);
int ret = 0;
struct sockaddr_in address;
bzero(&address, sizeof(address));
address.sin_family = AF_INET;
inet_pton(AF_INET, ip, &address.sin_addr);
address.sin_port = htons(port);
int reuse = 1;
ret = setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR,(const void *)&reuse , sizeof(int));
assert(ret != -1);
ret = bind(listenfd, (struct sockaddr*) &address, sizeof(address));
assert(ret != -1);
ret = listen(listenfd, 5);
assert(ret != -1);
// 创建进程池
struct processpoll* pool = processpool_create(listenfd, 8);
if (pool) {
// 由于在上一步的processpool_create中存在fork()调用 所有每个进程都会执行下面的逻辑 阻塞在自己的内部循环中 除非它被终止了
processpoll_run();
processpoll_free();
}
// 哪个函数打开了文件描述符就哪个函数关闭它
close(listenfd);
return 0;
}
原文是C++写的,我改写成了C版本。
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <stdio.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>
#include <fcntl.h>
#include <stdlib.h>
#include <assert.h>
#include <sys/epoll.h>
#include <stdbool.h>
#include <pthread.h>
#include <semaphore.h>
#include <signal.h>
#include <sys/stat.h>
#include <sys/mman.h>
#include <stdarg.h>
// 最大任务处理对象的数量
#define MAX_FD 65536
// epoll可注册最大事件数量
#define MAX_EVENT_NUMBER 10000
// 请求的html文件资源名字的最大长度
#define FILENAME_LEN 200
// socket可读缓冲区最大值
#define READ_BUFFER_SIZE 2048
// socket可写缓冲区最大值
#define WRITE_BUFFER_SIZE 1024
// 全局唯一 epollfd
static int m_epollfd;
// 用户连接数量
static int m_user_count = 0;
// http请求行中的方法
enum METHOD {GET = 0, POST, HEAD, PUT, DELETE, TRACE, OPTIONS, CONNECT, PATCH};
// 状态机解析当前读取的字符所处的状态枚举
enum CHECK_STATE {CHECK_STATE_REQUESTLINE = 0, CHECK_STATE_HEADER, CHECK_STATE_CONTENT };
// 读取完一段字符后得到的状态码
enum HTTP_CODE {NO_REQUEST, GET_REQUEST, BAD_REQUEST, NO_RESOURCE, FORBIDDEN_REQUEST, FILE_REQUEST, INTERNAL_ERROR, CLOSED_CONNECTION};
// 读到当前字符串结尾或者完整的一行时候返回的行状态枚举
enum LINE_STATUS {LINE_OK = 0, LINE_BAD, LINE_OPEN};
// 一些固定的http响应报文字符串 状态 提示等
const char* ok_200_title = "OK";
const char* error_400_title = "Bad Request";
const char* error_400_form = "Your request has bad syntax or is inherently impossible to satisfy.\n";
const char* error_403_title = "Forbidden";
const char* error_403_form = "You do not have permission to get file from this server.\n";
const char* error_404_title = "Not Found";
const char* error_404_form = "The requested file was not found on this server.\n";
const char* error_500_title = "Internal Error";
const char* error_500_form = "There was an unusual problem serving the requested file.\n";
// 系统目录中存放静态资源的前面一截路径 最终解析完请求数据后得到 例如 /var/www/html/请求行中的path部分/xxxx.html 的地址
const char* doc_root = "/var/www/html";
// 前向声明
struct listnode;
struct http_conn;
struct threadpool;
bool sem();
void free_sem();
bool wait();
bool post();
bool locker();
void free_locker();
bool lock();
bool unlock();
bool cond();
bool free_cond();
bool cond_wait();
bool cond_signal();
bool list_empty(struct threadpool*);
void list_append(struct threadpool*, struct http_conn*);
void list_free(struct threadpool*);
struct http_conn* list_front(struct threadpool*);
struct threadpool* threadpool_contstruct( int, int );
void threadpool_free(struct threadpool*);
// bool pool_append_work(struct threadpool*, struct http_coon*);
void* worker(void*);
void pool_run(struct threadpool*);
int setnonblocking( int );
void addfd( int , int , bool );
void modfd( int , int , int );
void removefd( int , int );
void http_conn_init(struct http_conn* , int , const struct sockaddr_in* );
void init(struct http_conn*);
void http_conn_close(struct http_conn* , bool );
void http_conn_process(struct http_conn*);
bool http_conn_read(struct http_conn*);
bool http_conn_write(struct http_conn*);
bool process_write(struct http_conn* , enum HTTP_CODE );
enum HTTP_CODE process_read(struct http_conn*);
enum HTTP_CODE parse_request_line(struct http_conn* , char* );
enum HTTP_CODE parse_headers(struct http_conn* , char* );
enum HTTP_CODE parse_content(struct http_conn* , char* );
enum HTTP_CODE do_request(struct http_conn* );
char* get_line(struct http_conn*);
enum LINE_STATUS parse_line(struct http_conn*);
void unmap(struct http_conn* );
bool add_response(struct http_conn* , const char* , ... );
bool add_content(struct http_conn* , const char* );
bool add_status_line(struct http_conn* , int , const char* );
bool add_headers(struct http_conn* , int );
bool add_content_length(struct http_conn* , int );
bool add_linger(struct http_conn* );
// bool add_blank_line(struct http_conn* ,);
// void m_addsig(int, void()(int), bool);
void show_error(int , const char* );
// 线程之间由于会相互修改共享的变量 导致引入竞争态 需要需要一些方法来同步彼此之间的执行顺序
// locker 锁相关代码
sem_t m_queuestat;
// 初始化信号量
bool sem() {
return sem_init(&m_queuestat, 0, 0) != 0;
}
// 释放信号量
void free_sem() {
sem_destroy(&m_queuestat);
}
// 等待信号量 如果之前信号量的值<=0就阻塞调用者
bool wait() {
return sem_wait(&m_queuestat) == 0;
}
// 信号量+1 如果此时有线程阻塞在wait中 系统负责唤醒其中一个
bool post() {
return sem_post(&m_queuestat) == 0;
}
pthread_mutex_t m_queuelocker;
// 互斥锁
bool locker() {
return pthread_mutex_init(&m_queuelocker, NULL) != 0;
}
// 释放互斥锁
void free_locker() {
pthread_mutex_destroy(&m_queuelocker);
}
// 锁住互斥锁 如果它已经被其他线程锁住 就阻塞调用者
bool lock() {
return pthread_mutex_lock(&m_queuelocker) == 0;
}
// 释放互斥锁 唤醒一个在等待互斥锁的线程(如果有)
bool unlock() {
return pthread_mutex_unlock(&m_queuelocker) == 0;
}
pthread_mutex_t m_cond_mutex;
pthread_cond_t m_cond;
// 条件变量和配合它使用的一个互斥锁
bool cond() {
return pthread_mutex_init( &m_cond_mutex, NULL ) != 0 && pthread_cond_init( &m_cond, NULL ) != 0;
}
// 释放条件变量
bool free_cond() {
pthread_mutex_destroy( &m_cond_mutex );
pthread_cond_destroy( &m_cond );
}
// 等待
bool cond_wait() {
int ret = 0;
pthread_mutex_lock(&m_cond_mutex);
ret = pthread_cond_wait(&m_cond, &m_cond_mutex);
pthread_mutex_unlock(&m_cond_mutex);
return ret == 0;
}
// 通知信号量可用了
bool cond_signal() {
return pthread_cond_signal(&m_cond) == 0;
}
// end
// threadpool 线程池代码
// 一个简单的双向链表节点
struct listnode {
struct http_conn* task;
struct listnode* prev;
struct listnode* next;
};
// 线程池对象
struct threadpool {
int m_thread_number;
int m_max_requests;
pthread_t* m_threads;
struct listnode* head;
struct listnode* tail;
int size;
bool m_stop;
};
// 链表判空
bool list_empty(struct threadpool* pool) {
return pool->head->next == pool->tail;
}
// 链表尾部插入一个任务
void list_append(struct threadpool* pool, struct http_conn* request) {
struct listnode* p = pool->tail->prev;
struct listnode* node = (struct listnode*)malloc(sizeof(struct listnode));
node->task = request;
node->next = pool->tail;
node->prev = p;
p->next = node;
pool->tail->prev = node;
// 线程池size+1
pool->size += 1;
}
// 释放链表
void list_free(struct threadpool* pool) {
if (!list_empty(pool)) {
struct listnode* node = pool->head->next;
while(node) {
struct listnode* next = node->next;
free(node);
node = next;
}
pool->size = 0;
}
}
// 从链表中取出第一个来交给任务处理对象去处理
struct http_conn* list_front(struct threadpool* pool) {
struct listnode* node = pool->head->next;
struct http_conn* task = node->task;
pool->head->next = node->next;
node->next->prev = pool->head;
free(node);
pool->size -= 1;
return task;
}
// 线程池构造函数
struct threadpool* threadpool_contstruct( int thread_number, int max_requests ) {
struct threadpool* pool = (struct threadpool*)malloc(sizeof(struct threadpool));
if (!pool) {
return NULL;
}
assert(thread_number> 0);
assert(max_requests > 0);
pool->m_thread_number = thread_number;
pool->m_max_requests = max_requests;
// 初始化任务队列(双向链表实现)
pool->head = (struct listnode*)malloc(sizeof(struct listnode));
pool->tail = (struct listnode*)malloc(sizeof(struct listnode));
pool->head->next = pool->tail;
pool->tail->prev = pool->head;
pool->size = 0;
pool->m_stop = false;
pool->m_threads = (pthread_t*)malloc(sizeof(pthread_t));
assert(pool->m_threads);
// 创建目标数量的线程 以worker函数作为起点 pool作为参数 将子线程设置为脱离属性 让他们自己在终止的时候回收资源
for(int i=0;i<thread_number;++i) {
printf("create the %dth thread\n", i);
if (pthread_create((pool->m_threads)+i, NULL, worker, pool) != 0) {
free(pool->m_threads);
}
if (pthread_detach(pool->m_threads[i])) {
free(pool->m_threads);
}
}
return pool;
}
// 释放线程池
void threadpool_free(struct threadpool* pool) {
free(pool->m_threads);
list_free(pool);
free(pool->head);
free(pool->tail);
pool->m_stop = true;
}
// 工作队列中插入一个任务
bool pool_append_work(struct threadpool* pool, struct http_coon* request) {
// 先锁住它
lock();
if (pool->size > pool->m_max_requests) {
unlock();
return false;
}
list_append(pool, request);
unlock();
// 通知一个阻塞在wait状态的线程 可以处理task了现在
post();
return true;
}
void* worker(void* arg) {
struct threadpool* pool = (struct threadpool*)arg;
pool_run(pool);
return pool;
}
// 子线程就简单的阻塞在wait 等待被唤醒去处理task 处理完后 如果没有被终止 就继续wait等待
void pool_run(struct threadpool* pool) {
while(!pool->m_stop) {
wait();
lock();
if (list_empty(pool)) {
unlock();
continue;
}
struct http_conn* request = list_front(pool);
unlock();
if (!request) {
continue;
}
http_conn_process(request);
}
}
// end
// http请求的处理任务
// 非阻塞
int setnonblocking( int fd )
{
int old_option = fcntl( fd, F_GETFL );
int new_option = old_option | O_NONBLOCK;
fcntl( fd, F_SETFL, new_option );
return old_option;
}
// 注册事件 EPOLLONESHOT 类型保证:如果有一个线程正在处理某个socket上的数据 在这个线程未处理完之前不会触发这个socket的可读事件从而导致其他线程来处理这个socket, 避免出现2个线程操作一个socket的数据的情况
void addfd( int epollfd, int fd, bool one_shot )
{
struct epoll_event event;
event.data.fd = fd;
event.events = EPOLLIN | EPOLLET | EPOLLRDHUP;
if( one_shot )
{
event.events |= EPOLLONESHOT;
}
epoll_ctl( epollfd, EPOLL_CTL_ADD, fd, &event );
setnonblocking( fd );
}
// 修改事件 因为需要重置ev事件 让socket重新可读或者可写
void modfd( int epollfd, int fd, int ev )
{
struct epoll_event event;
event.data.fd = fd;
event.events = ev | EPOLLET | EPOLLONESHOT | EPOLLRDHUP;
epoll_ctl( epollfd, EPOLL_CTL_MOD, fd, &event );
}
// 移除fd
void removefd( int epollfd, int fd )
{
epoll_ctl( epollfd, EPOLL_CTL_DEL, fd, 0 );
close( fd );
}
// http处理对象
struct http_conn {
int m_sockfd;
struct sockaddr_in m_address;
char m_read_buf[ READ_BUFFER_SIZE ];
int m_read_idx;
int m_checked_idx;
int m_start_line;
char m_write_buf[ WRITE_BUFFER_SIZE ];
int m_write_idx;
enum CHECK_STATE m_check_state;
enum METHOD m_method;
char m_real_file[ FILENAME_LEN ];
char* m_url;
char* m_version;
char* m_host;
int m_content_length;
bool m_linger;
char* m_file_address;
struct stat m_file_stat;
struct iovec m_iv[2];
int m_iv_count;
};
// 初始化一个task对象 建立和目标socket的联系
void http_conn_init(struct http_conn* task, int sockfd, const struct sockaddr_in* addr) {
task->m_sockfd = sockfd;
task->m_address = *addr;
int error = 0;
socklen_t len = sizeof(error);
getsockopt( task->m_sockfd, SOL_SOCKET, SO_ERROR, &error, &len );
int reuse = 1;
setsockopt( task->m_sockfd, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof( reuse ) );
addfd( m_epollfd, sockfd, true );
m_user_count++;
init(task);
}
// 重置读写辅助字段的值
void init(struct http_conn* task) {
task->m_check_state = CHECK_STATE_REQUESTLINE;
task->m_linger = false;
task->m_method = GET;
task->m_url = 0;
task->m_version = 0;
task->m_content_length = 0;
task->m_host = 0;
task->m_start_line = 0;
task->m_checked_idx = 0;
task->m_read_idx = 0;
task->m_write_idx = 0;
memset( task->m_read_buf, '\0', READ_BUFFER_SIZE );
memset( task->m_write_buf, '\0', WRITE_BUFFER_SIZE );
memset( task->m_real_file, '\0', FILENAME_LEN );
}
// 终止一个sockfd连接
void http_conn_close(struct http_conn* task, bool real_close) {
if (real_close && (task->m_sockfd != -1)) {
removefd(m_epollfd, task->m_sockfd);
task->m_sockfd = -1;
m_user_count--;
}
}
// task处理函数
void http_conn_process(struct http_conn* task) {
// 读取当前缓冲区的数据后得到一个当前结果
enum HTTP_CODE read_ret = process_read(task);
// 如果还没得到一个确定的结果 则继续等待读取
if ( read_ret == NO_REQUEST )
{
modfd( m_epollfd, task->m_sockfd, EPOLLIN );
return;
}
// 根据当前确定的结果 写响应报文
bool write_ret = process_write(task, read_ret);
if (!write_ret) {
http_conn_close(task ,true);
}
// 写准备工作完成后 注册可写事件 让另外个线程来执行任务
modfd( m_epollfd, task->m_sockfd, EPOLLOUT );
}
// 把当前socket缓冲区的数据都读到task的读缓冲区中
bool http_conn_read(struct http_conn* task) {
if (task->m_read_idx >= READ_BUFFER_SIZE) {
return false;
}
int bytes_read = 0;
while(true) {
bytes_read = recv( task->m_sockfd, task->m_read_buf + task->m_read_idx, READ_BUFFER_SIZE - task->m_read_idx, 0 );
if (bytes_read == -1) {
if( errno == EAGAIN || errno == EWOULDBLOCK )
{
break;
}
return false;
}
else if (bytes_read == 0) {
return false;
}
task->m_read_idx += bytes_read;
}
return true;
}
// 把准备好的响应报文数据写入到socket的内核写缓冲区中
bool http_conn_write(struct http_conn* task) {
int temp = 0;
int bytes_have_send = 0;
int bytes_to_send = task->m_write_idx;
if (bytes_to_send == 0) {
// 写完了 重置辅助参数 重新注册可读事件 等待socket的新数据到来
modfd(m_epollfd, task->m_sockfd, EPOLLIN);
init(task);
return true;
}
while(1) {
temp = writev(task->m_sockfd, task->m_iv, task->m_iv_count);
if (temp <= -1) {
if( errno == EAGAIN )
{
modfd( m_epollfd, task->m_sockfd, EPOLLOUT );
return true;
}
unmap(task);
return false;
}
bytes_to_send -= temp;
bytes_have_send += temp;
// task的写缓冲区分2块 第一个块放头部信息 第二个块放文档数据 由于writev返回的是写入成功的字节数 所以需要我们自己维持下一次的写入开始位置和剩余写入数量
if (bytes_have_send >= task->m_iv[0].iov_len)
{
task->m_iv[0].iov_len = 0;
task->m_iv[1].iov_base = task->m_file_address + (bytes_have_send - task->m_write_idx);
task->m_iv[1].iov_len = bytes_to_send;
}
else
{
task->m_iv[0].iov_base = task->m_write_buf + bytes_have_send;
task->m_iv[0].iov_len = task->m_iv[0].iov_len - bytes_have_send;
}
// 写入完毕
if (bytes_to_send <= 0) {
unmap(task);
// 根据字段是否终止当前连接
if (task->m_linger) {
init(task);
modfd( m_epollfd, task->m_sockfd, EPOLLIN );
return true;
}
else {
modfd( m_epollfd, task->m_sockfd, EPOLLIN );
return false;
}
}
}
}
// 处理task的读缓冲区的状态机函数
enum HTTP_CODE process_read(struct http_conn* task) {
enum LINE_STATUS line_status = LINE_OK;
enum HTTP_CODE ret = NO_REQUEST;
char* text = 0;
while ( ( ( task->m_check_state == CHECK_STATE_CONTENT ) && ( line_status == LINE_OK ) )
|| ( (line_status = parse_line(task)) == LINE_OK ) )
{
// 进入到这里意味着已经读取到一个完整的一行字符了 可以分析目前这一行数据的字段含义了
// text 指向这一行的起始地址
text = get_line(task);
// 更新一下当前行的位置 也就是上一次检查到的字符位置
task->m_start_line = task->m_checked_idx;
printf( "got 1 http line: %s\n", text );
// 状态机内部转换 开始时候为 CHECK_STATE_REQUESTLINE
switch ( task->m_check_state )
{
case CHECK_STATE_REQUESTLINE:
{
// 处理请求行
ret = parse_request_line(task, text );
if ( ret == BAD_REQUEST )
{
return BAD_REQUEST;
}
break;
}
case CHECK_STATE_HEADER:
{
// 处理头部字段行
ret = parse_headers(task, text );
if ( ret == BAD_REQUEST )
{
return BAD_REQUEST;
}
else if ( ret == GET_REQUEST )
{
return do_request(task);
}
break;
}
case CHECK_STATE_CONTENT:
{
// 处理内容数据
ret = parse_content(task, text );
if ( ret == GET_REQUEST )
{
return do_request(task);
}
line_status = LINE_OPEN;
break;
}
default:
{
return INTERNAL_ERROR;
}
}
}
// 如果在上面的状态转换中没有返回确定的状态 则返回 还没有收到一个完整请求的状态
return NO_REQUEST;
}
// 把响应报文根据返回状态码写入到task的写缓冲区中
bool process_write(struct http_conn* task, enum HTTP_CODE ret ) {
switch ( ret )
{
case INTERNAL_ERROR:
{
add_status_line(task, 500, error_500_title );
add_headers(task, strlen( error_500_form ) );
if ( ! add_content(task, error_500_form ) )
{
return false;
}
break;
}
case BAD_REQUEST:
{
add_status_line(task, 400, error_400_title );
add_headers(task, strlen( error_400_form ) );
if ( ! add_content(task, error_400_form ) )
{
return false;
}
break;
}
case NO_RESOURCE:
{
add_status_line(task, 404, error_404_title );
add_headers(task, strlen( error_404_form ) );
if ( ! add_content(task, error_404_form ) )
{
return false;
}
break;
}
case FORBIDDEN_REQUEST:
{
add_status_line(task, 403, error_403_title );
add_headers(task, strlen( error_403_form ) );
if ( ! add_content(task, error_403_form ) )
{
return false;
}
break;
}
case FILE_REQUEST:
{
// 请求成功
add_status_line(task, 200, ok_200_title );
if ( task->m_file_stat.st_size != 0 )
{
add_headers(task, task->m_file_stat.st_size );
// 对应上文的写缓冲区分2块
task->m_iv[ 0 ].iov_base = task->m_write_buf;
task->m_iv[ 0 ].iov_len = task->m_write_idx;
task->m_iv[ 1 ].iov_base = task->m_file_address;
task->m_iv[ 1 ].iov_len = task->m_file_stat.st_size;
task->m_iv_count = 2;
return true;
}
else
{
const char* ok_string = "<html><body></body></html>";
add_headers(task, strlen( ok_string ) );
if ( ! add_content(task, ok_string ) )
{
return false;
}
}
// 这里原文代码中并没有加 导致进入default 然后逻辑进入关闭连接代码了 那为何要加内容呢放里面? 所以我觉得本意是发送空html过去 所以要加break,然后进入538行的代码段,发送这些信息过去
break;
}
default:
{
return false;
}
}
// 没有通过正常成功的处理逻辑后返回 就会进入到下面的处理逻辑 只需要写入一个缓冲块就可以了
task->m_iv[ 0 ].iov_base = task->m_write_buf;
task->m_iv[ 0 ].iov_len = task->m_write_idx;
task->m_iv_count = 1;
return true;
}
// 解析请求行信息
enum HTTP_CODE parse_request_line(struct http_conn* task, char* text ) {
// 以 GET http://lcoalhost/index.html HTTP/1.1\r\n 为例
// 把m_url指针移动到 GET后面第一个空格
task->m_url = strpbrk( text, " \t" );
if ( !task->m_url )
{
return BAD_REQUEST;
}
// 空格替换\0
*task->m_url++ = '\0';
// METHOD只支持GET
char* method = text;
if ( strcasecmp( method, "GET" ) == 0 )
{
task->m_method = GET;
}
else
{
return BAD_REQUEST;
}
// 把m_url移动到第一个不是空格 也就是h这里
task->m_url += strspn( task->m_url, " \t" );
// 把m_version移动到m_url后面第一个空格 也就是 html后面第一个空格
task->m_version = strpbrk( task->m_url, " \t" );
if ( !task->m_version )
{
return BAD_REQUEST;
}
// 空格替换\0
*task->m_version++ = '\0';
// m_version移动到第一个不是空格的地方 也就是 HTTP/1.1的H
task->m_version += strspn( task->m_version, " \t" );
if ( strcasecmp( task->m_version, "HTTP/1.1" ) != 0 )
{
return BAD_REQUEST;
}
// 解析资源url了
if ( strncasecmp( task->m_url, "http://", 7 ) == 0 )
{
task->m_url += 7;
// 把m_url移动到第一次出现 / 的位置也就是 lcoalhost/index.html 的 /index.html 第一个字符这里 这个操作去掉了host信息 保留了path信息
task->m_url = strchr( task->m_url, '/' );
}
if ( !task->m_url || task->m_url[ 0 ] != '/' )
{
return BAD_REQUEST;
}
// 解析状态变成解析头部
task->m_check_state = CHECK_STATE_HEADER;
return NO_REQUEST;
}
// 解析头部字段
enum HTTP_CODE parse_headers(struct http_conn* task, char* text ) {
// 由于头部字段可能有多行 所以起始位置可能是个\0
if( text[ 0 ] == '\0' )
{
if ( task->m_method == HEAD )
{
return GET_REQUEST;
}
if ( task->m_content_length != 0 )
{
task->m_check_state = CHECK_STATE_CONTENT;
return NO_REQUEST;
}
return GET_REQUEST;
}
// 下面就是常规的头部字段解析了 分析思路跟上面一样 移动指针
else if ( strncasecmp( text, "Connection:", 11 ) == 0 )
{
text += 11;
text += strspn( text, " \t" );
if ( strcasecmp( text, "keep-alive" ) == 0 )
{
task->m_linger = true;
}
}
else if ( strncasecmp( text, "Content-Length:", 15 ) == 0 )
{
text += 15;
text += strspn( text, " \t" );
task->m_content_length = atol( text );
}
else if ( strncasecmp( text, "Host:", 5 ) == 0 )
{
text += 5;
text += strspn( text, " \t" );
task->m_host = text;
}
else
{
printf( "oop! unknow header %s\n", text );
}
return NO_REQUEST;
}
// 解析请求报文的内容字段
enum HTTP_CODE parse_content(struct http_conn* task, char* text ) {
// 不做内容具体分析 只看是否已读取的字节数大于文件长度 代表读取完成了
if ( task->m_read_idx >= ( task->m_content_length + task->m_checked_idx ) )
{
text[ task->m_content_length ] = '\0';
// 得到了一个完整的http请求报文了
return GET_REQUEST;
}
return NO_REQUEST;
}
// 咋解析请求报文的头部和数据部分时候可能得到一个完整的请求 我们要获取请求中对应的资源
enum HTTP_CODE do_request(struct http_conn* task) {
strcpy( task->m_real_file, doc_root );
int len = strlen( doc_root );
// 得到完整的路径 如 /var/www/html/index.html
strncpy( task->m_real_file + len, task->m_url, FILENAME_LEN - len - 1 );
if ( stat( task->m_real_file, &task->m_file_stat ) < 0 )
{
return NO_RESOURCE;
}
if ( !( task->m_file_stat.st_mode & S_IROTH ) )
{
return FORBIDDEN_REQUEST;
}
// 不支持目录
if ( S_ISDIR( task->m_file_stat.st_mode ) )
{
return BAD_REQUEST;
}
int fd = open( task->m_real_file, O_RDONLY );
// m_file_address 指向文件存储的内存区间映射 为发送操作建立好数据来源
task->m_file_address = ( char* )mmap( NULL, task->m_file_stat.st_size, PROT_READ, MAP_PRIVATE, fd, 0 );
close( fd );
return FILE_REQUEST;
}
// 下一个可以分析的完整行的起始地址
char* get_line(struct http_conn* task) { return task->m_read_buf + task->m_start_line; }
// 从task读缓冲区中尝试读出一个完整的一行
enum LINE_STATUS parse_line(struct http_conn* task) {
// 一行以\r\n作为结尾
char temp;
for(;task->m_checked_idx < task->m_read_idx;++task->m_checked_idx) {
temp = task->m_read_buf[task->m_checked_idx];
if (temp == '\r') {
if ( ( task->m_checked_idx + 1 ) == task->m_read_idx )
{
return LINE_OPEN;
}
else if (task->m_read_buf[ task->m_checked_idx + 1 ] == '\n') {
task->m_read_buf[ task->m_checked_idx++ ] = '\0';
task->m_read_buf[ task->m_checked_idx++ ] = '\0';
return LINE_OK;
}
return LINE_BAD;
}
else if (temp == '\n') {
if( ( task->m_checked_idx > 1 ) && ( task->m_read_buf[ task->m_checked_idx - 1 ] == '\r' ) )
{
task->m_read_buf[ task->m_checked_idx-1 ] = '\0';
task->m_read_buf[ task->m_checked_idx++ ] = '\0';
return LINE_OK;
}
return LINE_BAD;
}
}
return LINE_OPEN;
}
// 释放共享映射
void unmap(struct http_conn* task) {
if (task->m_file_address) {
munmap( task->m_file_address, task->m_file_stat.st_size );
task->m_file_address = 0;
}
}
// 把目标字符串按照指定格式写入task的写缓冲区中
bool add_response(struct http_conn* task, const char* format, ... ) {
if( task->m_write_idx >= WRITE_BUFFER_SIZE )
{
return false;
}
va_list arg_list;
va_start( arg_list, format );
int len = vsnprintf( task->m_write_buf + task->m_write_idx, WRITE_BUFFER_SIZE - 1 - task->m_write_idx, format, arg_list );
if( len >= ( WRITE_BUFFER_SIZE - 1 - task->m_write_idx ) )
{
return false;
}
task->m_write_idx += len;
va_end( arg_list );
return true;
}
// 以下几个函数分别对应响应报文中的各个字段
bool add_content(struct http_conn* task, const char* content ) {
return add_response(task, "%s", content );
}
bool add_status_line(struct http_conn* task, int status, const char* title ) {
return add_response(task, "%s %d %s\r\n", "HTTP/1.1", status, title );
}
bool add_blank_line(struct http_conn* task) {
return add_response(task, "%s", "\r\n" );
}
bool add_headers(struct http_conn* task, int content_length ) {
add_content_length(task, content_length );
add_linger(task);
add_blank_line(task);
}
bool add_content_length(struct http_conn* task, int content_length ) {
return add_response(task, "Content-Length: %d\r\n", content_length );
}
bool add_linger(struct http_conn* task) {
return add_response(task, "Connection: %s\r\n", ( task->m_linger == true ) ? "keep-alive" : "close" );
}
// end
// 添加信号处理函数
void m_addsig(int sig, void(handler)(int), bool restart) {
struct sigaction sa;
memset( &sa, '\0', sizeof( sa ) );
sa.sa_handler = handler;
if( restart )
{
sa.sa_flags |= SA_RESTART;
}
sigfillset( &sa.sa_mask );
assert( sigaction( sig, &sa, NULL ) != -1 );
}
// 发送错误提示
void show_error(int connfd, const char* info) {
printf("%s", info);
send(connfd, info, strlen(info), 0);
close(connfd);
}
int main(int argc, char* argv[]) {
if (argc <= 2) {
printf("usage: %s ip_address port_number\n", basename(argv[0]));
return 1;
}
const char* ip = argv[1];
int port = atoi(argv[2]);
m_addsig(SIGPIPE, SIG_IGN, false);
// 初始化信号量和互斥锁
sem();
locker();
struct threadpool* pool = NULL;
pool = threadpool_contstruct(8, 10000);
assert(pool);
struct http_conn* users = (struct http_conn*)malloc(sizeof(struct http_conn)*MAX_FD);
assert(users);
int listenfd = socket(PF_INET, SOCK_STREAM, 0);
assert(listenfd >= 0);
// 设置 {1, 0} 将导致close系统调用立即返回并且丢弃socket缓冲区的待发送数据 服务器的对连接socket按照这种处理逻辑处理即可 连接socket会从listen socket继承这个属性
struct linger tmp = {1, 0};
setsockopt(listenfd, SOL_SOCKET, SO_LINGER, &tmp, sizeof(tmp));
int ret = 0;
int reuse = 1;
ret = setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR,(const void *)&reuse , sizeof(int));
assert(ret != -1);
struct sockaddr_in address;
bzero( &address, sizeof( address ) );
address.sin_family = AF_INET;
inet_pton( AF_INET, ip, &address.sin_addr );
address.sin_port = htons( port );
ret = bind( listenfd, ( struct sockaddr* )&address, sizeof( address ) );
assert( ret >= 0 );
ret = listen( listenfd, 5 );
assert( ret >= 0 );
struct epoll_event events[ MAX_EVENT_NUMBER ];
int epollfd = epoll_create( 5 );
assert( epollfd != -1 );
addfd( epollfd, listenfd, false );
// 全局只有epollfd
m_epollfd = epollfd;
while(true) {
// 主线程负责监听listen socket和所有connect socket的事件 然后让子线程根据事件类型来竞争处理任务 这种进程池不太一样
int number = epoll_wait(epollfd, events, MAX_EVENT_NUMBER, -1);
if ((number < 0) && (errno != EINTR)) {
printf("epoll failtrue\n");
break;
}
for(int i = 0;i < number; ++i) {
int sockfd = events[i].data.fd;
if (sockfd == listenfd) {
struct sockaddr_in client_address;
socklen_t client_addrlength = sizeof( client_address );
int connfd = accept( listenfd, ( struct sockaddr* ) &client_address, &client_addrlength );
if ( connfd < 0 )
{
printf( "errno is: %d\n", errno );
continue;
}
if (m_user_count >= MAX_FD) {
show_error(connfd, "Internal server busy");
continue;
}
// 建立任务对象和这个connct socket的联系 各个子线程竞争这个task对象
http_conn_init(&users[connfd], connfd, &client_address);
}
else if (events[i].events & (EPOLLRDHUP | EPOLLHUP | EPOLLERR)) {
// 连接socket终止了
http_conn_close(&users[sockfd], true);
}
else if (events[i].events & EPOLLIN) {
// 连接socket可读了 分析读取的数据 插入任务队列中
if (http_conn_read(&users[sockfd])) {
pool_append_work(pool, users + sockfd);
}
else {
http_conn_close(&users[sockfd], true);
}
}
else if (events[i].events & EPOLLOUT) {
// 连接socket可写了 需要发送数据了 根据写的返回值来看是否关闭连接
if (!http_conn_write(&users[sockfd])) {
http_conn_close(&users[sockfd], true);
}
}
else {
}
}
}
// 释放资源
free_sem();
free_locker();
// free_cond();
close(epollfd);
close(listenfd);
free(users);
threadpool_free(pool);
free(pool);
return 0;
}
编译的时候记得带上2个共享库
sudo gcc -g threadpool_server.c -lrt -lpthread
最后,再次感谢 游双 大佬的《Linux高性能服务器编程》。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。