
在数字宇宙的某个角落,存在着一个名为Ubuntu的奇妙世界。这里的居民们称它为“Linux for human beings”,但鲜少有人知道,这个世界实际上是一个巨大的“婚恋平台”——专门为那些孤独的进程灵魂寻找合适的伴侣。
Ubuntu,这个源自非洲祖鲁语的概念,意为“人道待人”,在技术世界里却演变成了一个让进程们能够和谐共处的生态系统。想象一下,当一个进程被fork()出来的那一刻,就像一个新生的灵魂被抛入这个世界,它茫然四顾,寻找着能够与之对话的伙伴。
# 新生进程的啼哭
$ ./newborn_process
Hello, world! Is there anybody out there?但世界并非总是如此友好。在早期的Unix系统中,进程们生活在各自的孤岛上,就像被囚禁在单人牢房中的囚徒,只能通过墙壁上凿开的小孔(也就是标准输入输出)与外界进行极为有限的交流。
让我们来看一个真实的悲剧案例。在某大型互联网公司的生产环境中,一个负责处理用户订单的进程因为无法及时与数据库进程通信,导致数万笔交易挂起。监控系统显示一切正常,CPU在转,内存没满,网络通畅,但系统就是不工作。
// 订单处理进程的内心独白
while (1) {
order = get_new_order(); // 获取新订单
result = process_order(order); // 处理订单
// 尝试通知数据库进程
if (notify_database(result) == FAILED) {
// 如果通知失败,订单状态将永远悬置
log_error("Database unreachable!");
// 但进程别无选择,只能继续工作
continue;
}
}这个案例暴露了进程间通信(IPC)的根本重要性。就像人类社会一样,当个体之间无法有效沟通时,再强大的个体能力也无法组成一个高效的系统。
在Ubuntu世界中,信号(Signal)是最古老也最基础的通信方式。它就像进程间的眉目传情——简单、快速,但信息量有限。
信号的本质是一种异步通知机制,用于通知进程某个事件已经发生。在Linux中,共有64种不同的信号,每个都有特定的含义:
#include <signal.h>
#include <stdio.h>
#include <unistd.h>
// 信号处理函数 - 相当于进程的"耳朵"
void signal_handler(int sig) {
switch(sig) {
case SIGUSR1:
printf("收到约会邀请信号! 😊\n");
break;
case SIGUSR2:
printf("收到分手信号! 😭\n");
break;
case SIGINT:
printf("被无情拒绝... 💔\n");
exit(0);
}
}
int main() {
// 注册信号处理器 - 告诉内核我对什么信号感兴趣
signal(SIGUSR1, signal_handler);
signal(SIGUSR2, signal_handler);
signal(SIGINT, signal_handler);
printf("进程PID: %d 正在等待信号...\n", getpid());
// 无限等待,就像在酒吧等待搭讪的单身汉
while(1) {
pause(); // 暂停,直到收到信号
}
return 0;
}在另一个终端,我们可以向这个进程发送信号:
# 发送约会邀请
$ kill -SIGUSR1 1234
# 发送分手通知
$ kill -SIGUSR2 1234
# 残忍拒绝
$ kill -SIGINT 1234信号的局限性很明显:它只能传递"发生了什么",而不能传递"具体内容"。就像你只能对心仪的对象眨眨眼,却无法说出"周末一起看电影吗?"这样具体的邀请。
当信号这种"眉目传情"不够用时,进程们发明了管道(Pipe)——一种能够传输实际数据的通信方式。
匿名管道就像两个进程之间的耳语:
#include <stdio.h>
#include <unistd.h>
#include <string.h>
#include <sys/wait.h>
int main() {
int pipefd[2];
pid_t pid;
char buffer[256];
// 创建管道 - 建立悄悄话通道
if (pipe(pipefd) == -1) {
perror("管道创建失败");
return 1;
}
// 生孩子进程 - 现在有人可以说话了
pid = fork();
if (pid == 0) {
// 子进程 - 关闭写端,只读
close(pipefd[1]);
// 读取父进程的悄悄话
read(pipefd[0], buffer, sizeof(buffer));
printf("孩子听到: %s", buffer);
close(pipefd[0]);
} else {
// 父进程 - 关闭读端,只写
close(pipefd[0]);
char message[] = "儿子,我是你爸爸!\n";
write(pipefd[1], message, strlen(message));
close(pipefd[1]);
wait(NULL); // 等待子进程结束
}
return 0;
}命名管道(FIFO)则更进一步,允许无亲缘关系的进程通信:
# 创建命名管道
$ mkfifo /tmp/my_pipe
# 进程A写入管道
$ echo "Hello through named pipe!" > /tmp/my_pipe
# 进程B从管道读取
$ cat < /tmp/my_pipe管道的内部实现基于内核缓冲区,本质上是一个先进先出(FIFO)的队列。在Linux内核中,管道的实现涉及以下关键数据结构:
// Linux内核中的管道结构(简化版)
struct pipe_inode_info {
unsigned int head; // 读位置
unsigned int tail; // 写位置
unsigned int max_usage; // 最大使用量
unsigned int buffers; // 缓冲区数量
struct page *pages[]; // 内存页数组
};管道的局限性在于它是半双工的,并且容量有限。当缓冲区满时,写操作会被阻塞;当缓冲区空时,读操作也会被阻塞。
System V IPC提供了一套更加正式的通信机制,包括消息队列、共享内存和信号量。这就像是进程世界的商务社交——结构化、可靠,但稍显繁琐。
消息队列为进程提供了一种结构化的通信方式,类似于人类通过信件交流:
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/ipc.h>
#include <sys/msg.h>
// 定义消息结构
struct message {
long mtype; // 消息类型
char mtext[256]; // 消息内容
int sender_pid; // 发送者PID
int priority; // 优先级
};
// 创建或获取消息队列
int create_message_queue() {
key_t key = ftok("/tmp", 'A');
int msgid = msgget(key, 0666 | IPC_CREAT);
if (msgid == -1) {
perror("消息队列创建失败");
exit(1);
}
return msgid;
}
// 消息发送者
void message_sender(int msgid) {
struct message msg;
msg.mtype = 1; // 消息类型
msg.sender_pid = getpid();
msg.priority = 10;
strcpy(msg.mtext, "你好,这是我的名片,交个朋友吧!");
// 发送消息
if (msgsnd(msgid, &msg, sizeof(msg) - sizeof(long), 0) == -1) {
perror("消息发送失败");
return;
}
printf("消息已发送: %s\n", msg.mtext);
}
// 消息接收者
void message_receiver(int msgid) {
struct message msg;
// 接收消息
if (msgrcv(msgid, &msg, sizeof(msg) - sizeof(long), 1, 0) == -1) {
perror("消息接收失败");
return;
}
printf("收到来自PID %d 的消息: %s (优先级: %d)\n",
msg.sender_pid, msg.mtext, msg.priority);
}
int main() {
int msgid = create_message_queue();
pid_t pid = fork();
if (pid == 0) {
sleep(1); // 让父进程先发送消息
message_receiver(msgid);
} else {
message_sender(msgid);
wait(NULL);
// 清理消息队列
msgctl(msgid, IPC_RMID, NULL);
}
return 0;
}消息队列的优势:
共享内存是最快的IPC方式,它让多个进程能够直接访问同一块内存区域,就像人类之间的共同记忆:
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/shm.h>
#include <sys/ipc.h>
#include <sys/sem.h>
// 定义共享数据结构
struct shared_data {
int counter;
char message[256];
int data_ready; // 数据就绪标志
};
// 创建信号量用于同步
int create_semaphore() {
key_t key = ftok("/tmp", 'S');
int semid = semget(key, 1, 0666 | IPC_CREAT);
if (semid == -1) {
perror("信号量创建失败");
exit(1);
}
// 初始化信号量值为1(互斥锁)
if (semctl(semid, 0, SETVAL, 1) == -1) {
perror("信号量初始化失败");
exit(1);
}
return semid;
}
// P操作 - 获取锁
void semaphore_lock(int semid) {
struct sembuf op = {0, -1, SEM_UNDO};
semop(semid, &op, 1);
}
// V操作 - 释放锁
void semaphore_unlock(int semid) {
struct sembuf op = {0, 1, SEM_UNDO};
semop(semid, &op, 1);
}
int main() {
// 创建共享内存
key_t key = ftok("/tmp", 'M');
int shmid = shmget(key, sizeof(struct shared_data), 0666 | IPC_CREAT);
if (shmid == -1) {
perror("共享内存创建失败");
exit(1);
}
// 附加共享内存
struct shared_data *shared = shmat(shmid, NULL, 0);
if (shared == (void*)-1) {
perror("共享内存附加失败");
exit(1);
}
// 创建信号量
int semid = create_semaphore();
pid_t pid = fork();
if (pid == 0) {
// 子进程 - 读者
for (int i = 0; i < 5; i++) {
semaphore_lock(semid);
if (shared->data_ready) {
printf("读者: 计数器=%d, 消息=%s\n",
shared->counter, shared->message);
shared->data_ready = 0;
}
semaphore_unlock(semid);
sleep(1);
}
} else {
// 父进程 - 写者
for (int i = 0; i < 5; i++) {
semaphore_lock(semid);
if (!shared->data_ready) {
shared->counter = i;
snprintf(shared->message, sizeof(shared->message),
"消息编号 %d", i);
shared->data_ready = 1;
printf("写者: 写入数据 %d\n", i);
}
semaphore_unlock(semid);
sleep(1);
}
wait(NULL);
// 清理
shmdt(shared);
shmctl(shmid, IPC_RMID, NULL);
semctl(semid, 0, IPC_RMID);
}
return 0;
}共享内存的技术细节:
在Linux内核中,共享内存的实现基于内存管理子系统。当进程调用shmat()时,内核会:
共享内存的优势在于性能,但需要额外的同步机制(如信号量)来避免竞态条件。
POSIX IPC提供了更加现代和文件系统集成的IPC机制,在Ubuntu等现代Linux系统中更为推荐。
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <mqueue.h>
#include <fcntl.h>
#include <sys/stat.h>
#define QUEUE_NAME "/my_posix_queue"
#define MAX_SIZE 1024
int main() {
mqd_t mq;
struct mq_attr attr;
char buffer[MAX_SIZE];
unsigned int priority;
// 设置消息队列属性
attr.mq_flags = 0;
attr.mq_maxmsg = 10;
attr.mq_msgsize = MAX_SIZE;
attr.mq_curmsgs = 0;
// 创建或打开消息队列
mq = mq_open(QUEUE_NAME, O_CREAT | O_RDWR, 0666, &attr);
if (mq == (mqd_t)-1) {
perror("POSIX消息队列打开失败");
exit(1);
}
pid_t pid = fork();
if (pid == 0) {
// 子进程 - 接收者
printf("接收者等待消息...\n");
ssize_t bytes_read = mq_receive(mq, buffer, MAX_SIZE, &priority);
if (bytes_read >= 0) {
buffer[bytes_read] = '\0';
printf("收到消息: %s (优先级: %u)\n", buffer, priority);
}
} else {
// 父进程 - 发送者
char message[] = "Hello POSIX Message Queue!";
if (mq_send(mq, message, strlen(message), 1) == -1) {
perror("消息发送失败");
} else {
printf("消息发送成功\n");
}
wait(NULL);
// 清理
mq_close(mq);
mq_unlink(QUEUE_NAME);
}
return 0;
}#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <fcntl.h>
#include <unistd.h>
#include <sys/mman.h>
#include <sys/stat.h>
#define SHM_NAME "/my_posix_shm"
#define SHM_SIZE 4096
int main() {
int fd;
char *ptr;
// 创建共享内存对象
fd = shm_open(SHM_NAME, O_CREAT | O_RDWR, 0666);
if (fd == -1) {
perror("shm_open失败");
exit(1);
}
// 设置共享内存大小
if (ftruncate(fd, SHM_SIZE) == -1) {
perror("ftruncate失败");
exit(1);
}
// 内存映射
ptr = mmap(NULL, SHM_SIZE, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
if (ptr == MAP_FAILED) {
perror("mmap失败");
exit(1);
}
pid_t pid = fork();
if (pid == 0) {
// 子进程 - 读取数据
sleep(1); // 等待父进程写入
printf("子进程读取: %s\n", ptr);
} else {
// 父进程 - 写入数据
snprintf(ptr, SHM_SIZE, "这是POSIX共享内存的测试数据!");
printf("父进程写入完成\n");
wait(NULL);
// 清理
munmap(ptr, SHM_SIZE);
close(fd);
shm_unlink(SHM_NAME);
}
return 0;
}当进程不在同一台机器上时,它们需要通过网络套接字进行通信,这就像是一场异地恋。
TCP提供面向连接的可靠通信,确保数据按序到达:
服务器端代码:
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#define PORT 8080
#define BUFFER_SIZE 1024
void tcp_server() {
int server_fd, new_socket;
struct sockaddr_in address;
int opt = 1;
int addrlen = sizeof(address);
char buffer[BUFFER_SIZE] = {0};
// 创建套接字文件描述符
if ((server_fd = socket(AF_INET, SOCK_STREAM, 0)) == 0) {
perror("socket创建失败");
exit(EXIT_FAILURE);
}
// 设置套接字选项
if (setsockopt(server_fd, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT,
&opt, sizeof(opt))) {
perror("setsockopt失败");
exit(EXIT_FAILURE);
}
address.sin_family = AF_INET;
address.sin_addr.s_addr = INADDR_ANY;
address.sin_port = htons(PORT);
// 绑定套接字到端口
if (bind(server_fd, (struct sockaddr *)&address, sizeof(address)) < 0) {
perror("bind失败");
exit(EXIT_FAILURE);
}
// 开始监听
if (listen(server_fd, 3) < 0) {
perror("listen失败");
exit(EXIT_FAILURE);
}
printf("TCP服务器监听端口 %d...\n", PORT);
// 接受连接
if ((new_socket = accept(server_fd, (struct sockaddr *)&address,
(socklen_t*)&addrlen)) < 0) {
perror("accept失败");
exit(EXIT_FAILURE);
}
// 读取客户端数据
read(new_socket, buffer, BUFFER_SIZE);
printf("服务器收到消息: %s\n", buffer);
// 发送响应
char *hello = "Hello from TCP server!";
send(new_socket, hello, strlen(hello), 0);
printf("服务器发送响应\n");
close(new_socket);
close(server_fd);
}
void tcp_client() {
int sock = 0;
struct sockaddr_in serv_addr;
char buffer[BUFFER_SIZE] = {0};
if ((sock = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
printf("套接字创建错误\n");
return;
}
serv_addr.sin_family = AF_INET;
serv_addr.sin_port = htons(PORT);
// 转换IP地址
if (inet_pton(AF_INET, "127.0.0.1", &serv_addr.sin_addr) <= 0) {
printf("无效地址/地址不受支持\n");
return;
}
// 连接服务器
if (connect(sock, (struct sockaddr *)&serv_addr, sizeof(serv_addr)) < 0) {
printf("连接失败\n");
return;
}
// 发送数据
char *hello = "Hello from TCP client!";
send(sock, hello, strlen(hello), 0);
printf("客户端发送消息\n");
// 读取响应
read(sock, buffer, BUFFER_SIZE);
printf("客户端收到: %s\n", buffer);
close(sock);
}
int main() {
pid_t pid = fork();
if (pid == 0) {
sleep(1); // 等待服务器启动
tcp_client();
} else {
tcp_server();
wait(NULL);
}
return 0;
}UDP提供无连接的不可靠通信,但速度更快:
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#define PORT 8080
#define BUFFER_SIZE 1024
void udp_server() {
int sockfd;
struct sockaddr_in servaddr, cliaddr;
char buffer[BUFFER_SIZE];
socklen_t len;
// 创建UDP套接字
if ((sockfd = socket(AF_INET, SOCK_DGRAM, 0)) < 0) {
perror("socket创建失败");
exit(EXIT_FAILURE);
}
memset(&servaddr, 0, sizeof(servaddr));
memset(&cliaddr, 0, sizeof(cliaddr));
servaddr.sin_family = AF_INET;
servaddr.sin_addr.s_addr = INADDR_ANY;
servaddr.sin_port = htons(PORT);
// 绑定套接字
if (bind(sockfd, (const struct sockaddr *)&servaddr,
sizeof(servaddr)) < 0) {
perror("bind失败");
exit(EXIT_FAILURE);
}
printf("UDP服务器监听端口 %d...\n", PORT);
len = sizeof(cliaddr);
// 接收数据
int n = recvfrom(sockfd, (char *)buffer, BUFFER_SIZE,
MSG_WAITALL, (struct sockaddr *)&cliaddr, &len);
buffer[n] = '\0';
printf("服务器收到: %s\n", buffer);
// 发送响应
char *hello = "Hello from UDP server!";
sendto(sockfd, (const char *)hello, strlen(hello),
MSG_CONFIRM, (const struct sockaddr *)&cliaddr, len);
printf("服务器发送响应\n");
close(sockfd);
}
void udp_client() {
int sockfd;
struct sockaddr_in servaddr;
char buffer[BUFFER_SIZE];
socklen_t len;
// 创建UDP套接字
if ((sockfd = socket(AF_INET, SOCK_DGRAM, 0)) < 0) {
perror("socket创建失败");
exit(EXIT_FAILURE);
}
memset(&servaddr, 0, sizeof(servaddr));
servaddr.sin_family = AF_INET;
servaddr.sin_port = htons(PORT);
servaddr.sin_addr.s_addr = INADDR_ANY;
char *hello = "Hello from UDP client!";
// 发送数据
sendto(sockfd, (const char *)hello, strlen(hello),
MSG_CONFIRM, (const struct sockaddr *)&servaddr,
sizeof(servaddr));
printf("客户端发送消息\n");
len = sizeof(servaddr);
// 接收响应
int n = recvfrom(sockfd, (char *)buffer, BUFFER_SIZE,
MSG_WAITALL, (struct sockaddr *)&servaddr, &len);
buffer[n] = '\0';
printf("客户端收到: %s\n", buffer);
close(sockfd);
}
int main() {
pid_t pid = fork();
if (pid == 0) {
sleep(1);
udp_client();
} else {
udp_server();
wait(NULL);
}
return 0;
}在长期的通信关系中,进程需要各种同步机制来协调彼此的行动,避免冲突和混乱。
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <unistd.h>
#define BUFFER_SIZE 5
// 共享缓冲区
struct shared_buffer {
int data[BUFFER_SIZE];
int count;
int in;
int out;
pthread_mutex_t mutex;
pthread_cond_t not_empty;
pthread_cond_t not_full;
};
// 初始化缓冲区
void buffer_init(struct shared_buffer *buf) {
buf->count = 0;
buf->in = 0;
buf->out = 0;
pthread_mutex_init(&buf->mutex, NULL);
pthread_cond_init(&buf->not_empty, NULL);
pthread_cond_init(&buf->not_full, NULL);
}
// 生产者线程
void* producer(void *arg) {
struct shared_buffer *buf = (struct shared_buffer*)arg;
for (int i = 0; i < 10; i++) {
pthread_mutex_lock(&buf->mutex);
// 等待缓冲区不满
while (buf->count == BUFFER_SIZE) {
printf("生产者: 缓冲区满,等待...\n");
pthread_cond_wait(&buf->not_full, &buf->mutex);
}
// 生产数据
buf->data[buf->in] = i;
buf->in = (buf->in + 1) % BUFFER_SIZE;
buf->count++;
printf("生产者: 生产数据 %d, 缓冲区大小: %d\n", i, buf->count);
// 通知消费者
pthread_cond_signal(&buf->not_empty);
pthread_mutex_unlock(&buf->mutex);
usleep(100000); // 100ms
}
return NULL;
}
// 消费者线程
void* consumer(void *arg) {
struct shared_buffer *buf = (struct shared_buffer*)arg;
for (int i = 0; i < 10; i++) {
pthread_mutex_lock(&buf->mutex);
// 等待缓冲区不空
while (buf->count == 0) {
printf("消费者: 缓冲区空,等待...\n");
pthread_cond_wait(&buf->not_empty, &buf->mutex);
}
// 消费数据
int item = buf->data[buf->out];
buf->out = (buf->out + 1) % BUFFER_SIZE;
buf->count--;
printf("消费者: 消费数据 %d, 缓冲区大小: %d\n", item, buf->count);
// 通知生产者
pthread_cond_signal(&buf->not_full);
pthread_mutex_unlock(&buf->mutex);
usleep(200000); // 200ms
}
return NULL;
}
int main() {
pthread_t prod_thread, cons_thread;
struct shared_buffer buf;
buffer_init(&buf);
// 创建生产者和消费者线程
pthread_create(&prod_thread, NULL, producer, &buf);
pthread_create(&cons_thread, NULL, consumer, &buf);
// 等待线程结束
pthread_join(prod_thread, NULL);
pthread_join(cons_thread, NULL);
// 清理资源
pthread_mutex_destroy(&buf.mutex);
pthread_cond_destroy(&buf.not_empty);
pthread_cond_destroy(&buf.not_full);
return 0;
}#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <unistd.h>
#include <string.h>
#define READER_COUNT 3
#define WRITER_COUNT 2
struct shared_data {
char message[256];
int version;
pthread_rwlock_t rwlock;
};
void* reader(void *arg) {
struct shared_data *data = (struct shared_data*)arg;
for (int i = 0; i < 5; i++) {
// 获取读锁
pthread_rwlock_rdlock(&data->rwlock);
printf("读者 %lu: 版本=%d, 消息=%s\n",
pthread_self(), data->version, data->message);
// 释放读锁
pthread_rwlock_unlock(&data->rwlock);
usleep(100000); // 100ms
}
return NULL;
}
void* writer(void *arg) {
struct shared_data *data = (struct shared_data*)arg;
for (int i = 0; i < 3; i++) {
// 获取写锁
pthread_rwlock_wrlock(&data->rwlock);
// 更新数据
data->version++;
snprintf(data->message, sizeof(data->message),
"写入时间戳: %ld", time(NULL));
printf("写者 %lu: 更新版本为 %d\n", pthread_self(), data->version);
// 释放写锁
pthread_rwlock_unlock(&data->rwlock);
usleep(200000); // 200ms
}
return NULL;
}
int main() {
pthread_t readers[READER_COUNT];
pthread_t writers[WRITER_COUNT];
struct shared_data data;
// 初始化共享数据和读写锁
strcpy(data.message, "初始消息");
data.version = 1;
pthread_rwlock_init(&data.rwlock, NULL);
// 创建读者线程
for (int i = 0; i < READER_COUNT; i++) {
pthread_create(&readers[i], NULL, reader, &data);
}
// 创建写者线程
for (int i = 0; i < WRITER_COUNT; i++) {
pthread_create(&writers[i], NULL, writer, &data);
}
// 等待所有读者线程
for (int i = 0; i < READER_COUNT; i++) {
pthread_join(readers[i], NULL);
}
// 等待所有写者线程
for (int i = 0; i < WRITER_COUNT; i++) {
pthread_join(writers[i], NULL);
}
// 清理资源
pthread_rwlock_destroy(&data.rwlock);
return 0;
}#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <unistd.h>
#define THREAD_COUNT 4
pthread_barrier_t barrier;
void* worker(void *arg) {
int id = *(int*)arg;
printf("线程 %d: 第一阶段工作开始\n", id);
sleep(id + 1); // 模拟工作负载
printf("线程 %d: 第一阶段工作完成,等待其他线程\n", id);
// 等待所有线程到达屏障
int result = pthread_barrier_wait(&barrier);
if (result == PTHREAD_BARRIER_SERIAL_THREAD) {
printf("线程 %d 是序列线程,负责协调工作\n", id);
}
printf("线程 %d: 第二阶段工作开始\n", id);
sleep(id + 1);
printf("线程 %d: 第二阶段工作完成\n", id);
return NULL;
}
int main() {
pthread_t threads[THREAD_COUNT];
int thread_ids[THREAD_COUNT];
// 初始化屏障
pthread_barrier_init(&barrier, NULL, THREAD_COUNT);
// 创建工作线程
for (int i = 0; i < THREAD_COUNT; i++) {
thread_ids[i] = i;
pthread_create(&threads[i], NULL, worker, &thread_ids[i]);
}
// 等待所有线程完成
for (int i = 0; i < THREAD_COUNT; i++) {
pthread_join(threads[i], NULL);
}
// 清理屏障
pthread_barrier_destroy(&barrier);
return 0;
}#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <unistd.h>
#define THREAD_COUNT 4
#define ITERATIONS 100000
int counter = 0;
pthread_spinlock_t spinlock;
void* incrementer(void *arg) {
for (int i = 0; i < ITERATIONS; i++) {
pthread_spin_lock(&spinlock);
counter++;
pthread_spin_unlock(&spinlock);
}
return NULL;
}
int main() {
pthread_t threads[THREAD_COUNT];
// 初始化自旋锁
pthread_spin_init(&spinlock, PTHREAD_PROCESS_PRIVATE);
// 创建线程
for (int i = 0; i < THREAD_COUNT; i++) {
pthread_create(&threads[i], NULL, incrementer, NULL);
}
// 等待线程完成
for (int i = 0; i < THREAD_COUNT; i++) {
pthread_join(threads[i], NULL);
}
printf("最终计数器值: %d (期望值: %d)\n", counter, THREAD_COUNT * ITERATIONS);
// 清理自旋锁
pthread_spin_destroy(&spinlock);
return 0;
}在Linux系统中,进程之间存在复杂的家族关系,形成了一个庞大的进程树。
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/wait.h>
#include <sys/types.h>
#include <signal.h>
void print_process_info(const char *name) {
printf("进程: %s, PID: %d, PPID: %d, PGID: %d, SID: %d\n",
name, getpid(), getppid(), getpgid(0), getsid(0));
}
void child_process(int generation) {
char process_name[32];
snprintf(process_name, sizeof(process_name), "第%d代子进程", generation);
print_process_info(process_name);
if (generation < 3) {
// 创建下一代进程
pid_t pid = fork();
if (pid == 0) {
child_process(generation + 1);
exit(0);
} else if (pid > 0) {
wait(NULL); // 等待子进程结束
}
} else {
// 最后一代进程执行一些工作
sleep(2);
}
}
// 进程监控器
void process_monitor() {
printf("\n=== 进程家族树监控 ===\n");
pid_t pid = fork();
if (pid == 0) {
// 孙子进程
print_process_info("孙子进程");
// 创建守护进程
pid_t daemon_pid = fork();
if (daemon_pid == 0) {
// 重孙子进程成为守护进程
setsid(); // 创建新会话
chdir("/");
close(STDIN_FILENO);
close(STDOUT_FILENO);
close(STDERR_FILENO);
// 守护进程工作
while(1) {
// 在实际应用中,这里会有具体的工作逻辑
sleep(10);
}
exit(0);
} else {
exit(0);
}
} else if (pid > 0) {
// 子进程
print_process_info("子进程");
wait(NULL); // 等待孙子进程结束
}
}
int main() {
printf("=== 进程家族关系演示 ===\n");
print_process_info("父进程");
// 演示多代进程
pid_t pid = fork();
if (pid == 0) {
// 第一代子进程
child_process(1);
exit(0);
} else if (pid > 0) {
wait(NULL); // 等待子进程结束
}
// 演示进程监控
process_monitor();
return 0;
}#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/wait.h>
#include <signal.h>
void create_process_group() {
printf("\n=== 创建进程组演示 ===\n");
pid_t pid1 = fork();
if (pid1 == 0) {
// 第一个子进程 - 创建新进程组
setpgid(0, 0);
printf("进程1: PID=%d, PGID=%d\n", getpid(), getpgid(0));
sleep(2);
exit(0);
}
pid_t pid2 = fork();
if (pid2 == 0) {
// 第二个子进程 - 加入第一个进程的进程组
setpgid(0, pid1);
printf("进程2: PID=%d, PGID=%d\n", getpid(), getpgid(0));
sleep(2);
exit(0);
}
printf("父进程: PID=%d, 子进程1 PID=%d, 子进程2 PID=%d\n",
getpid(), pid1, pid2);
// 等待所有子进程
wait(NULL);
wait(NULL);
}
void create_session() {
printf("\n=== 创建会话演示 ===\n");
pid_t pid = fork();
if (pid == 0) {
// 子进程创建新会话
pid_t session_id = setsid();
printf("新会话领导者: PID=%d, SID=%d, PGID=%d\n",
getpid(), session_id, getpgid(0));
// 检查是否是控制终端
if (tcgetpgrp(STDIN_FILENO) == -1) {
printf("已脱离控制终端\n");
}
sleep(3);
exit(0);
} else {
wait(NULL);
}
}
int main() {
printf("当前进程: PID=%d, PPID=%d, PGID=%d, SID=%d\n",
getpid(), getppid(), getpgid(0), getsid(0));
create_process_group();
create_session();
return 0;
}#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <sys/wait.h>
#include <signal.h>
#define PORT 8080
#define MAX_CLIENTS 10
#define BUFFER_SIZE 1024
// 全局变量,用于在信号处理函数中访问
int server_fd;
// 信号处理函数
void signal_handler(int sig) {
printf("\n收到信号 %d,正在关闭服务器...\n", sig);
close(server_fd);
exit(0);
}
// 处理客户端连接
void handle_client(int client_socket) {
char buffer[BUFFER_SIZE];
char response[] = "HTTP/1.1 200 OK\r\n"
"Content-Type: text/plain\r\n"
"Content-Length: 13\r\n"
"\r\n"
"Hello, World!";
// 读取客户端请求
read(client_socket, buffer, BUFFER_SIZE - 1);
printf("收到请求:\n%s\n", buffer);
// 发送响应
write(client_socket, response, strlen(response));
// 关闭连接
close(client_socket);
}
// 预创建进程池
void create_process_pool(int pool_size) {
for (int i = 0; i < pool_size; i++) {
pid_t pid = fork();
if (pid == 0) {
// 子进程 - 工作进程
printf("工作进程 %d 启动 (PID: %d)\n", i, getpid());
while (1) {
int client_socket;
struct sockaddr_in client_addr;
socklen_t client_len = sizeof(client_addr);
// 接受客户端连接
client_socket = accept(server_fd,
(struct sockaddr*)&client_addr,
&client_len);
if (client_socket < 0) {
perror("accept失败");
continue;
}
printf("工作进程 %d 处理客户端 %s:%d\n",
i, inet_ntoa(client_addr.sin_addr),
ntohs(client_addr.sin_port));
// 处理客户端请求
handle_client(client_socket);
}
exit(0);
} else if (pid < 0) {
perror("fork失败");
exit(1);
}
}
}
int main() {
struct sockaddr_in address;
int opt = 1;
// 注册信号处理函数
signal(SIGINT, signal_handler);
signal(SIGTERM, signal_handler);
// 创建套接字
if ((server_fd = socket(AF_INET, SOCK_STREAM, 0)) == 0) {
perror("socket创建失败");
exit(1);
}
// 设置套接字选项
if (setsockopt(server_fd, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT,
&opt, sizeof(opt))) {
perror("setsockopt失败");
exit(1);
}
address.sin_family = AF_INET;
address.sin_addr.s_addr = INADDR_ANY;
address.sin_port = htons(PORT);
// 绑定套接字
if (bind(server_fd, (struct sockaddr*)&address, sizeof(address)) < 0) {
perror("bind失败");
exit(1);
}
// 开始监听
if (listen(server_fd, 10) < 0) {
perror("listen失败");
exit(1);
}
printf("服务器启动,监听端口 %d\n", PORT);
// 创建进程池
create_process_pool(4);
// 父进程等待所有子进程
while (wait(NULL) > 0);
return 0;
}#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <sys/ipc.h>
#include <sys/shm.h>
#include <sys/msg.h>
#include <sys/sem.h>
#include <pthread.h>
#include <signal.h>
#define SHM_KEY 0x1234
#define MSG_KEY 0x5678
#define SEM_KEY 0x9ABC
// 共享数据结构
struct shared_data {
int request_count;
int active_workers;
char status[256];
};
// 消息结构
struct message {
long mtype;
int client_id;
char request[256];
char response[256];
};
// 信号量操作
void semaphore_op(int semid, int op) {
struct sembuf sb = {0, op, SEM_UNDO};
semop(semid, &sb, 1);
}
// 工作者进程
void worker_process(int worker_id, int shmid, int msgid, int semid) {
struct shared_data *shared = shmat(shmid, NULL, 0);
struct message msg;
if (shared == (void*)-1) {
perror("shmat失败");
exit(1);
}
printf("工作者 %d 启动\n", worker_id);
while (1) {
// 等待消息
if (msgrcv(msgid, &msg, sizeof(msg) - sizeof(long), 1, 0) == -1) {
perror("msgrcv失败");
continue;
}
// 获取信号量
semaphore_op(semid, -1);
// 更新共享数据
shared->request_count++;
printf("工作者 %d 处理请求: %s\n", worker_id, msg.request);
// 模拟处理时间
sleep(1);
// 准备响应
snprintf(msg.response, sizeof(msg.response),
"Worker %d processed: %s", worker_id, msg.request);
msg.mtype = msg.client_id;
// 发送响应
if (msgsnd(msgid, &msg, sizeof(msg) - sizeof(long), 0) == -1) {
perror("msgsnd失败");
}
// 释放信号量
semaphore_op(semid, 1);
}
}
// 客户端进程
void client_process(int client_id, int msgid) {
struct message msg;
for (int i = 0; i < 3; i++) {
// 准备请求
msg.mtype = 1; // 发送到工作者
msg.client_id = client_id;
snprintf(msg.request, sizeof(msg.request),
"Request %d from client %d", i, client_id);
// 发送请求
if (msgsnd(msgid, &msg, sizeof(msg) - sizeof(long), 0) == -1) {
perror("msgsnd失败");
continue;
}
printf("客户端 %d 发送请求: %s\n", client_id, msg.request);
// 等待响应
if (msgrcv(msgid, &msg, sizeof(msg) - sizeof(long), client_id, 0) == -1) {
perror("msgrcv失败");
continue;
}
printf("客户端 %d 收到响应: %s\n", client_id, msg.response);
sleep(2);
}
}
// 监控进程
void monitor_process(int shmid) {
struct shared_data *shared = shmat(shmid, NULL, 0);
if (shared == (void*)-1) {
perror("shmat失败");
exit(1);
}
while (1) {
printf("=== 系统监控 ===\n");
printf("总请求数: %d\n", shared->request_count);
printf("活跃工作者: %d\n", shared->active_workers);
printf("系统状态: %s\n", shared->status);
printf("================\n\n");
sleep(5);
}
}
int main() {
int shmid, msgid, semid;
// 创建共享内存
shmid = shmget(SHM_KEY, sizeof(struct shared_data), 0666 | IPC_CREAT);
if (shmid == -1) {
perror("shmget失败");
exit(1);
}
// 创建消息队列
msgid = msgget(MSG_KEY, 0666 | IPC_CREAT);
if (msgid == -1) {
perror("msgget失败");
exit(1);
}
// 创建信号量
semid = semget(SEM_KEY, 1, 0666 | IPC_CREAT);
if (semid == -1) {
perror("semget失败");
exit(1);
}
// 初始化信号量
if (semctl(semid, 0, SETVAL, 1) == -1) {
perror("semctl失败");
exit(1);
}
// 初始化共享数据
struct shared_data *shared = shmat(shmid, NULL, 0);
if (shared == (void*)-1) {
perror("shmat失败");
exit(1);
}
shared->request_count = 0;
shared->active_workers = 3;
strcpy(shared->status, "系统运行中");
// 创建工作进程
for (int i = 0; i < 3; i++) {
pid_t pid = fork();
if (pid == 0) {
worker_process(i, shmid, msgid, semid);
exit(0);
}
}
// 创建监控进程
pid_t monitor_pid = fork();
if (monitor_pid == 0) {
monitor_process(shmid);
exit(0);
}
// 创建客户端进程
for (int i = 0; i < 2; i++) {
pid_t pid = fork();
if (pid == 0) {
client_process(100 + i, msgid);
exit(0);
}
}
// 等待所有客户端进程结束
for (int i = 0; i < 2; i++) {
wait(NULL);
}
// 给时间让所有请求处理完成
sleep(5);
// 清理资源
shmctl(shmid, IPC_RMID, NULL);
msgctl(msgid, IPC_RMID, NULL);
semctl(semid, 0, IPC_RMID);
// 杀死工作进程和监控进程
system("pkill -P $");
return 0;
}# docker-compose.yml
version: '3.8'
services:
redis:
image: redis:alpine
ports:
- "6379:6379"
networks:
- app-network
app:
build: .
ports:
- "8080:8080"
depends_on:
- redis
networks:
- app-network
volumes:
- /tmp:/tmp # 用于共享内存通信
networks:
app-network:
driver: bridge// 容器间共享内存示例
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/mman.h>
#include <fcntl.h>
#include <unistd.h>
#include <sys/stat.h>
#define SHM_NAME "/tmp/container_shm"
#define SHM_SIZE 4096
int main() {
int fd;
char *ptr;
// 创建共享内存对象
fd = shm_open(SHM_NAME, O_CREAT | O_RDWR, 0666);
if (fd == -1) {
perror("shm_open失败");
exit(1);
}
// 设置共享内存大小
if (ftruncate(fd, SHM_SIZE) == -1) {
perror("ftruncate失败");
exit(1);
}
// 内存映射
ptr = mmap(NULL, SHM_SIZE, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
if (ptr == MAP_FAILED) {
perror("mmap失败");
exit(1);
}
// 写入数据
snprintf(ptr, SHM_SIZE, "Hello from container! PID: %d", getpid());
printf("数据已写入共享内存: %s\n", ptr);
// 保持运行以便观察
printf("按Enter键退出...\n");
getchar();
// 清理
munmap(ptr, SHM_SIZE);
close(fd);
shm_unlink(SHM_NAME);
return 0;
}# kubernetes-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: ipc-demo
spec:
replicas: 3
selector:
matchLabels:
app: ipc-demo
template:
metadata:
labels:
app: ipc-demo
spec:
containers:
- name: main-app
image: ipc-demo:latest
ports:
- containerPort: 8080
env:
- name: REDIS_HOST
value: "redis-service"
- name: SHARED_MEM_PATH
value: "/dev/shm"
volumeMounts:
- name: shared-memory
mountPath: /dev/shm
- name: sidecar
image: sidecar:latest
volumeMounts:
- name: shared-memory
mountPath: /dev/shm
volumes:
- name: shared-memory
emptyDir:
medium: Memory
---
apiVersion: v1
kind: Service
metadata:
name: redis-service
spec:
selector:
app: redis
ports:
- port: 6379
targetPort: 6379#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <sys/time.h>
#include <sys/mman.h>
#include <sys/ipc.h>
#include <sys/shm.h>
#include <sys/msg.h>
#include <fcntl.h>
#define ITERATIONS 100000
#define MESSAGE_SIZE 1024
// 时间测量函数
double get_time() {
struct timeval tv;
gettimeofday(&tv, NULL);
return tv.tv_sec + tv.tv_usec / 1000000.0;
}
// 管道性能测试
void test_pipe() {
int pipefd[2];
char buffer[MESSAGE_SIZE];
double start, end;
if (pipe(pipefd) == -1) {
perror("pipe失败");
return;
}
pid_t pid = fork();
if (pid == 0) {
// 子进程 - 读取端
close(pipefd[1]);
for (int i = 0; i < ITERATIONS; i++) {
read(pipefd[0], buffer, MESSAGE_SIZE);
}
close(pipefd[0]);
exit(0);
} else {
// 父进程 - 写入端
close(pipefd[0]);
start = get_time();
for (int i = 0; i < ITERATIONS; i++) {
write(pipefd[1], buffer, MESSAGE_SIZE);
}
end = get_time();
close(pipefd[1]);
wait(NULL);
printf("管道: %d次通信耗时 %.6f秒\n", ITERATIONS, end - start);
}
}
// 共享内存性能测试
void test_shared_memory() {
int shmid;
char *shared_memory;
double start, end;
// 创建共享内存
shmid = shmget(IPC_PRIVATE, MESSAGE_SIZE, 0666 | IPC_CREAT);
if (shmid == -1) {
perror("shmget失败");
return;
}
// 附加共享内存
shared_memory = shmat(shmid, NULL, 0);
if (shared_memory == (void*)-1) {
perror("shmat失败");
return;
}
pid_t pid = fork();
if (pid == 0) {
// 子进程 - 读取端
for (int i = 0; i < ITERATIONS; i++) {
while (shared_memory[0] == 0); // 等待数据
shared_memory[0] = 0; // 确认接收
}
exit(0);
} else {
// 父进程 - 写入端
start = get_time();
for (int i = 0; i < ITERATIONS; i++) {
shared_memory[0] = 1; // 写入数据
while (shared_memory[0] != 0); // 等待确认
}
end = get_time();
wait(NULL);
printf("共享内存: %d次通信耗时 %.6f秒\n", ITERATIONS, end - start);
// 清理
shmdt(shared_memory);
shmctl(shmid, IPC_RMID, NULL);
}
}
// POSIX共享内存性能测试
void test_posix_shm() {
int fd;
char *shared_memory;
double start, end;
const char *shm_name = "/posix_shm_test";
// 创建共享内存对象
fd = shm_open(shm_name, O_CREAT | O_RDWR, 0666);
if (fd == -1) {
perror("shm_open失败");
return;
}
// 设置大小
ftruncate(fd, MESSAGE_SIZE);
// 内存映射
shared_memory = mmap(NULL, MESSAGE_SIZE, PROT_READ | PROT_WRITE,
MAP_SHARED, fd, 0);
if (shared_memory == MAP_FAILED) {
perror("mmap失败");
return;
}
pid_t pid = fork();
if (pid == 0) {
// 子进程
for (int i = 0; i < ITERATIONS; i++) {
while (shared_memory[0] == 0);
shared_memory[0] = 0;
}
exit(0);
} else {
// 父进程
start = get_time();
for (int i = 0; i < ITERATIONS; i++) {
shared_memory[0] = 1;
while (shared_memory[0] != 0);
}
end = get_time();
wait(NULL);
printf("POSIX共享内存: %d次通信耗时 %.6f秒\n", ITERATIONS, end - start);
// 清理
munmap(shared_memory, MESSAGE_SIZE);
close(fd);
shm_unlink(shm_name);
}
}
int main() {
printf("=== IPC机制性能比较测试 ===\n");
printf("测试配置: %d次迭代, 消息大小: %d字节\n\n", ITERATIONS, MESSAGE_SIZE);
test_pipe();
test_shared_memory();
test_posix_shm();
return 0;
}#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>
// IPC错误处理包装器
#define IPC_CHECK(call, message) \
do { \
if ((call) == -1) { \
fprintf(stderr, "%s: %s\n", message, strerror(errno)); \
exit(EXIT_FAILURE); \
} \
} while(0)
// 资源自动清理宏
#define AUTO_CLEAN __attribute__((cleanup(cleanup_function)))
void cleanup_function(void *ptr) {
// 通用的资源清理函数
// 在实际使用中需要根据具体类型进行特化
printf("自动清理资源\n");
}
// 安全的共享内存分配
void* safe_shm_alloc(size_t size) {
void *ptr = mmap(NULL, size, PROT_READ | PROT_WRITE,
MAP_SHARED | MAP_ANONYMOUS, -1, 0);
if (ptr == MAP_FAILED) {
perror("共享内存分配失败");
return NULL;
}
// 初始化内存
memset(ptr, 0, size);
return ptr;
}
// 安全的共享内存释放
void safe_shm_free(void *ptr, size_t size) {
if (ptr != NULL) {
munmap(ptr, size);
}
}
// IPC配置结构
struct ipc_config {
size_t shm_size;
int msg_queue_size;
int semaphore_count;
int timeout_ms;
};
// IPC上下文
struct ipc_context {
void *shm_ptr;
size_t shm_size;
int msg_id;
int sem_id;
struct ipc_config config;
};
// 初始化IPC上下文
int ipc_context_init(struct ipc_context *ctx, const struct ipc_config *config) {
if (!ctx || !config) {
return -1;
}
memcpy(&ctx->config, config, sizeof(struct ipc_config));
// 分配共享内存
ctx->shm_ptr = safe_shm_alloc(config->shm_size);
if (!ctx->shm_ptr) {
return -1;
}
ctx->shm_size = config->shm_size;
// 创建消息队列(这里简化处理)
ctx->msg_id = -1; // 实际应用中需要正确初始化
// 创建信号量(这里简化处理)
ctx->sem_id = -1; // 实际应用中需要正确初始化
return 0;
}
// 清理IPC上下文
void ipc_context_cleanup(struct ipc_context *ctx) {
if (ctx) {
safe_shm_free(ctx->shm_ptr, ctx->shm_size);
// 实际应用中还需要清理消息队列和信号量
}
}
// 使用示例
int main() {
struct ipc_config config = {
.shm_size = 4096,
.msg_queue_size = 10,
.semaphore_count = 1,
.timeout_ms = 5000
};
struct ipc_context ctx;
if (ipc_context_init(&ctx, &config) == 0) {
printf("IPC上下文初始化成功\n");
// 使用IPC资源...
strcpy((char*)ctx.shm_ptr, "Hello, IPC Best Practices!");
printf("共享内存内容: %s\n", (char*)ctx.shm_ptr);
// 自动清理
ipc_context_cleanup(&ctx);
} else {
printf("IPC上下文初始化失败\n");
}
return 0;
}从最初的信号到现代的容器间通信,Linux进程通信技术的发展映射了人类社会交流方式的演进。信号如同原始社会的烽火传讯,简单但信息有限;管道好比两人之间的耳语,亲密但范围有限;共享内存像是一个公共黑板,高效但需要精心协调;网络套接字则如同现代通信网络,打破了地理限制。
在Ubuntu这个精心构建的Linux世界中,进程们通过各种IPC机制建立联系、协调工作、共享数据,共同构建了复杂的软件生态系统。理解这些通信机制不仅有助于我们编写更好的软件,也让我们对分布式系统、并行计算等现代计算范式有更深的理解。
正如人类社会一样,进程间的良好通信需要恰当的协议、可靠的传输和有效的冲突解决机制。在技术的星辰大海中,进程通信的故事还将继续书写,而Ubuntu将继续作为这个精彩故事的舞台,见证着数字世界中一个个"灵魂"的相遇、相知与相爱。
无论是单机上的进程间通信,还是跨网络的分布式通信,其核心思想都是相通的:在独立的实体之间建立可靠、高效的信息交换通道。这正是计算机科学魅力的体现——用严谨的技术解决复杂的协调问题,让独立的组件能够协同工作,创造出远超个体能力之和的系统价值。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。