网络协议有很多协议族,常见的是AF_Inet、AF_Unix,前者是IPv4,后者用于本地通信。
Unix Socket用于环回地址通信,通信流程不经过网络层、数据链路层、物理层,不经过网络,只是内核缓冲区之间的数据拷贝,效率高一些。
unix socket是基于文件系统和缓冲区实现的,内核中有个缓冲区队列unix_datas,长度取决于socket数量。
#define AF_UNSPEC 0
#define AF_UNIX 1
#define AF_INET 2
#define PF_UNIX AF_UNIX
#define PF_INET AF_INET
//
extern struct unix_proto_data unix_datas[NSOCKETS];
struct unix_proto_data {
int refcnt; /* cnt of reference 0=free */
/* -1=not initialised -bgm */
struct socket *socket; /* socket we're bound to */
int protocol;
struct sockaddr_un sockaddr_un;//unix是基于文件系统实现的,这个保存了server socket监听的文件路径
short sockaddr_len; /* >0 if name bound */
char *buf;//数据
int bp_head, bp_tail;//循环队列
struct inode *inode;
struct unix_proto_data *peerupd; //unix连接的另一方缓冲区
struct wait_queue *wait; /* Lock across page faults (FvK) */
int lock_flag;
};
struct proto_ops unix_proto_ops = {
unix_proto_init,
unix_proto_create,
unix_proto_dup,
unix_proto_release,
unix_proto_bind,
unix_proto_connect,
unix_proto_socketpair,
unix_proto_accept,
unix_proto_getname,
unix_proto_read,
unix_proto_write,
unix_proto_select,
unix_proto_ioctl
};
调用socket()系统调用时需要指定family,才能找到对应的操作函数集合。然后从unix_datas上分配一个空闲槽位用于接收消息。
static int
unix_proto_create(struct socket *sock, int protocol)
{
struct unix_proto_data *upd;
dprintf(1, "UNIX: create: socket 0x%x, proto %d\n", sock, protocol);
if (protocol != 0) {
dprintf(1, "UNIX: create: protocol != 0\n");
return(-EINVAL);
}
//1 从unix_datas上分配一个空闲槽位
if (!(upd = unix_data_alloc())) {
printk("UNIX: create: can't allocate buffer\n");
return(-ENOMEM);
}
//2 分配buffer用于接收数据
if (!(upd->buf = (char*) get_free_page(GFP_USER))) {
printk("UNIX: create: can't get page!\n");
unix_data_deref(upd);
return(-ENOMEM);
}
upd->protocol = protocol;
upd->socket = sock;
UN_DATA(sock) = upd;
upd->refcnt = 1; /* Now its complete - bgm */
dprintf(1, "UNIX: create: allocated data 0x%x\n", upd);
return(0);
}
unix socket是基于文件系统实现的,server socket调用bind时绑定文件路径并创建文件,client socket根据文件路径连接server socket。
static int unix_proto_bind(struct socket* sock,
struct sockaddr* umyaddr,
int sockaddr_len) {
//1 基于文件系统实现,绑定一个文件路径
char fname[sizeof(((struct sockaddr_un*)0)->sun_path) + 1];
struct unix_proto_data* upd = UN_DATA(sock);
unsigned long old_fs;
int i;
int er;
dprintf(1, "UNIX: bind: socket 0x%x, len=%d\n", sock, sockaddr_len);
if (sockaddr_len <= UN_PATH_OFFSET ||
sockaddr_len > sizeof(struct sockaddr_un)) {
dprintf(1, "UNIX: bind: bad length %d\n", sockaddr_len);
return (-EINVAL);
}
if (upd->sockaddr_len || upd->inode) {
printk("UNIX: bind: already bound!\n");
return (-EINVAL);
}
er = verify_area(VERIFY_WRITE, umyaddr, sockaddr_len);
if (er)
return er;
//2 从用户空间拷贝数据
memcpy_fromfs(&upd->sockaddr_un, umyaddr, sockaddr_len);
upd->sockaddr_un.sun_path[sockaddr_len - UN_PATH_OFFSET] = '\0';
if (upd->sockaddr_un.sun_family != AF_UNIX) {
dprintf(1, "UNIX: bind: family is %d, not AF_UNIX(%d)\n",
upd->sockaddr_un.sun_family, AF_UNIX);
return (-EINVAL);
}
memcpy(fname, upd->sockaddr_un.sun_path, sockaddr_len - UN_PATH_OFFSET);
fname[sockaddr_len - UN_PATH_OFFSET] = '\0';
old_fs = get_fs();
set_fs(get_ds());
//3 创建文件
i = do_mknod(fname, S_IFSOCK | S_IRWXUGO, 0);
if (i == 0)
i = open_namei(fname, 0, S_IFSOCK, &upd->inode, NULL);
set_fs(old_fs);
if (i < 0) {
printk("UNIX: bind: can't open socket %s\n", fname);
return (i);
}
upd->sockaddr_len = sockaddr_len; /* now its legal */
dprintf(1, "UNIX: bind: bound socket address: ");
sockaddr_un_printk(&upd->sockaddr_un, upd->sockaddr_len);
dprintf(1, "to inode 0x%x\n", upd->inode);
return (0);
}
static int unix_proto_listen(struct socket* sock, int backlog) {
return (0);
}
connect主要逻辑如下:
static int unix_proto_connect(struct socket* sock,
struct sockaddr* uservaddr,
int sockaddr_len,
int flags) {
char fname[sizeof(((struct sockaddr_un*)0)->sun_path) + 1];
struct sockaddr_un sockun;
struct unix_proto_data* serv_upd;
struct inode* inode;
unsigned long old_fs;
int i;
int er;
dprintf(1, "UNIX: connect: socket 0x%x, servlen=%d\n", sock, sockaddr_len);
if (sockaddr_len <= UN_PATH_OFFSET ||
sockaddr_len > sizeof(struct sockaddr_un)) {
dprintf(1, "UNIX: connect: bad length %d\n", sockaddr_len);
return (-EINVAL);
}
if (sock->state == SS_CONNECTING)
return (-EINPROGRESS);
if (sock->state == SS_CONNECTED)
return (-EISCONN);
er = verify_area(VERIFY_READ, uservaddr, sockaddr_len);
if (er)
return er;
//1 复制文件路径
memcpy_fromfs(&sockun, uservaddr, sockaddr_len);
sockun.sun_path[sockaddr_len - UN_PATH_OFFSET] = '\0';
if (sockun.sun_family != AF_UNIX) {
dprintf(1, "UNIX: connect: family is %d, not AF_UNIX(%d)\n",
sockun.sun_family, AF_UNIX);
return (-EINVAL);
}
/*
* Try to open the name in the filesystem - this is how we
* identify ourselves and our server. Note that we don't
* hold onto the inode that long, just enough to find our
* server. When we're connected, we mooch off the server.
*/
memcpy(fname, sockun.sun_path, sockaddr_len - UN_PATH_OFFSET);
fname[sockaddr_len - UN_PATH_OFFSET] = '\0';
old_fs = get_fs();
set_fs(get_ds());
i = open_namei(fname, 0, S_IFSOCK, &inode, NULL);
set_fs(old_fs);
if (i < 0) {
dprintf(1, "UNIX: connect: can't open socket %s\n", fname);
return (i);
}
//2 查找server socket
serv_upd = unix_data_lookup(&sockun, sockaddr_len, inode);
iput(inode);
if (!serv_upd) {
dprintf(1, "UNIX: connect: can't locate peer %s at inode 0x%x\n", fname,
inode);
return (-EINVAL);
}
//3 进入半连接队列等待被accept,完成连接建立过程
if ((i = sock_awaitconn(sock, serv_upd->socket)) < 0) {
dprintf(1, "UNIX: connect: can't await connection\n");
return (i);
}
if (sock->conn) {
unix_data_ref(UN_DATA(sock->conn));
//4 client socket指向server socket的unix_data
UN_DATA(sock)->peerupd = UN_DATA(sock->conn); /* ref server */
}
return (0);
}
accept主要逻辑:
static int unix_proto_accept(struct socket* sock,
struct socket* newsock,
int flags) {
struct socket* clientsock;
dprintf(1, "UNIX: accept: socket 0x%x accepted via socket 0x%x\n", sock,
newsock);
/*
* If there aren't any sockets awaiting connection,
* then wait for one, unless nonblocking.
*/
// 1 如果没有待建立的连接就阻塞等待,client connect时会wakeup
while (!(clientsock = sock->iconn)) {
if (flags & O_NONBLOCK)
return (-EAGAIN);
//阻塞等待new client
//当有new client connect时会被唤醒
interruptible_sleep_on(sock->wait);
if (current->signal & ~current->blocked) {
dprintf(1, "UNIX: accept: sleep was interrupted\n");
return (-ERESTARTSYS);
}
}
/*
* Great. Finish the connection relative to server and client,
* wake up the client and return the new fd to the server.
*/
//2 从iconn上获取连接
sock->iconn = clientsock->next;
clientsock->next = NULL;
newsock->conn = clientsock;
//3 将socket状态修改为connected,表示连接建立完成
clientsock->conn = newsock;
clientsock->state = SS_CONNECTED;
newsock->state = SS_CONNECTED;
unix_data_ref(UN_DATA(clientsock));
//4 指向client的unix_data
UN_DATA(newsock)->peerupd = UN_DATA(clientsock);
UN_DATA(newsock)->sockaddr_un = UN_DATA(sock)->sockaddr_un;
UN_DATA(newsock)->sockaddr_len = UN_DATA(sock)->sockaddr_len;
//5 唤醒因connect阻塞的client
wake_up_interruptible(clientsock->wait);
return (0);
}
sock_close函数最后会调用对应协议族的release函数,unix socket调用了unix_proto_release函数来释放连接双方的buffer。
//将连接双方的socket释放掉buffer
static int unix_proto_release(struct socket* sock, struct socket* peer) {
struct unix_proto_data* upd = UN_DATA(sock);
dprintf(1, "UNIX: release: socket 0x%x, unix_data 0x%x\n", sock, upd);
if (!upd)
return (0);
if (upd->socket != sock) {
printk("UNIX: release: socket link mismatch!\n");
return (-EINVAL);
}
if (upd->inode) {
dprintf(1, "UNIX: release: releasing inode 0x%x\n", upd->inode);
//inode引用减1,如果引用为0且脏页则刷盘,
iput(upd->inode);
upd->inode = NULL;
}
UN_DATA(sock) = NULL;
upd->socket = NULL;
if (upd->peerupd)
unix_data_deref(upd->peerupd);
//释放数据
unix_data_deref(upd);
return (0);
}
//释放buffer,引用减1
static void unix_data_deref(struct unix_proto_data* upd) {
if (!upd) {
dprintf(1, "UNIX: data_deref: upd = NULL\n");
return;
}
if (upd->refcnt == 1) {
dprintf(1, "UNIX: data_deref: releasing data 0x%x\n", upd);
if (upd->buf) {
free_page((unsigned long)upd->buf);
upd->buf = NULL;
upd->bp_head = upd->bp_tail = 0;
}
}
--upd->refcnt;
}
发送端需要先获取接收端的unix buffer,如果没有剩余空间就需要阻塞等待接收端读进程唤醒。只要能够写入数据,哪怕一个字节,那么都不会阻塞,发送数据本质上是内核buffer之间的拷贝,不需要经过网络协议层和网络,简单高效。
static int unix_proto_write(struct socket* sock,
char* ubuf,
int size,
int nonblock) {
struct unix_proto_data* pupd;
int todo, space;
int er;
if ((todo = size) <= 0)
return (0);
if (sock->state != SS_CONNECTED) {
dprintf(1, "UNIX: write: socket not connected\n");
if (sock->state == SS_DISCONNECTING) {
send_sig(SIGPIPE, current, 1);
return (-EPIPE);
}
return (-EINVAL);
}
//1 拿到接收端的unix buffer,直接写入数据
pupd = UN_DATA(sock)->peerupd; /* safer than sock->conn */
//2 如果已经写满,就直接阻塞
while (!(space = UN_BUF_SPACE(pupd))) {
dprintf(1, "UNIX: write: no space left...\n");
if (nonblock)
return (-EAGAIN);
interruptible_sleep_on(sock->wait);
if (current->signal & ~current->blocked) {
dprintf(1, "UNIX: write: interrupted\n");
return (-ERESTARTSYS);
}
if (sock->state == SS_DISCONNECTING) {
dprintf(1, "UNIX: write: disconnected(SIGPIPE)\n");
send_sig(SIGPIPE, current, 1);
return (-EPIPE);
}
}
/*
* Copy from the user's buffer to the write buffer,
* watching for wraparound. Then we wake up the reader.
*/
//3 加锁,防止接收端读取数据
unix_lock(pupd);
do {
int part, cando;
if (space <= 0) {
printk("UNIX: write: SPACE IS NEGATIVE!!!\n");
send_sig(SIGKILL, current, 1);
return (-EPIPE);
}
/*
* We may become disconnected inside this loop, so watch
* for it (peerupd is safe until we close).
*/
if (sock->state == SS_DISCONNECTING) {
send_sig(SIGPIPE, current, 1);
unix_unlock(pupd);
return (-EPIPE);
}
if ((cando = todo) > space)
cando = space;
if (cando > (part = BUF_SIZE - pupd->bp_head))
cando = part;
dprintf(1, "UNIX: write: space=%d, todo=%d, cando=%d\n", space, todo,
cando);
er = verify_area(VERIFY_READ, ubuf, cando);
if (er) {
unix_unlock(pupd);
return er;
}
//4 从文件系统直接复制数据到接收端unix buffer
memcpy_fromfs(pupd->buf + pupd->bp_head, ubuf, cando);
pupd->bp_head = (pupd->bp_head + cando) & (BUF_SIZE - 1);
ubuf += cando;
todo -= cando;
//5 唤醒接收端读取数据
if (sock->state == SS_CONNECTED)
wake_up_interruptible(sock->conn->wait);
space = UN_BUF_SPACE(pupd);
} while (todo && space);
unix_unlock(pupd);
return (size - todo);
}
读取时直接从自己的buffer读取数据,并唤醒写进程,本质上是生产者消费者队列,需要加锁同步读写操作。
static int unix_proto_read(struct socket* sock,
char* ubuf,
int size,
int nonblock) {
struct unix_proto_data* upd;
int todo, avail;
int er;
if ((todo = size) <= 0)
return (0);
upd = UN_DATA(sock);
//1 没有数据就阻塞
while (!(avail = UN_BUF_AVAIL(upd))) {
if (sock->state != SS_CONNECTED) {
dprintf(1, "UNIX: read: socket not connected\n");
return ((sock->state == SS_DISCONNECTING) ? 0 : -EINVAL);
}
dprintf(1, "UNIX: read: no data available...\n");
if (nonblock)
return (-EAGAIN);
interruptible_sleep_on(sock->wait);
if (current->signal & ~current->blocked) {
dprintf(1, "UNIX: read: interrupted\n");
return (-ERESTARTSYS);
}
}
/*
* Copy from the read buffer into the user's buffer,
* watching for wraparound. Then we wake up the writer.
*/
//2 有数据就读取
unix_lock(upd);
do {
int part, cando;
if (avail <= 0) {
printk("UNIX: read: AVAIL IS NEGATIVE!!!\n");
send_sig(SIGKILL, current, 1);
return (-EPIPE);
}
if ((cando = todo) > avail)
cando = avail;
if (cando > (part = BUF_SIZE - upd->bp_tail))
cando = part;
dprintf(1, "UNIX: read: avail=%d, todo=%d, cando=%d\n", avail, todo,
cando);
if ((er = verify_area(VERIFY_WRITE, ubuf, cando)) < 0) {
unix_unlock(upd);
return er;
}
memcpy_tofs(ubuf, upd->buf + upd->bp_tail, cando);
upd->bp_tail = (upd->bp_tail + cando) & (BUF_SIZE - 1);
ubuf += cando;
todo -= cando;
//3 唤醒发送端继续写入
if (sock->state == SS_CONNECTED)
wake_up_interruptible(sock->conn->wait);
avail = UN_BUF_AVAIL(upd);
} while (todo && avail);
unix_unlock(upd);
return (size - todo);
}
Unix Socket只能用于本地通信,无法用于网络。连接是通过文件系统路径来绑定的,读写数据是通过两个缓冲区来实现的。
Unix Socket与Pipe的不同在于Pipe是单缓冲区,只能用于单向通信,而Unix Socket是双缓冲区,能够双向通信。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。