进程间通信(IPC)系列Posix消息队列
消息队列是 Linux IPC 中很常用的一种通信方式,今天分析一下Posix消息队列,本文中所讲的消息队列均为Posix消息队列。什么是 Posix 消息队
消息队列可以认为它是一个消息链表,有足够写权限的进程可以往队列中发送消息,有足够读权限的进程可以往队列中接收消息。
每个消息都是一个记录,它由发送者赋予一个优先级。在某个进程往一个队列写入消息之前,并不需要另外某个进程在该队列上等待消息的到达。这也就说明消息队列具有随内核的持续性,也就是说进程关闭后,消息队列依然存在,除非内核重新自举。
Posix 消息队列有如下特点:对Posix消息队列的读总是返回优先级最高最早的消息。当往空的消息队列中放置一个消息时,Posix消息队列允许产生一个信号或者启动一个接收线程。
Posix 消息队列中的每条消息通常具有以下属性:一个表示优先级的整数;消息的数据部分的长度;消息数据本身;
消息队列的基本操作
打开或创建一个 posix 消息队列操作接口
mqd_t mq_open(const char *name, int oflag, mode_t mode, struct mq_attr *attr);
Link with -lrt.
参数 name 为 posix IPC 名字, 即将要被打开或创建的消息队列对象,为了便于移植,需要指定为"/name"的格式。
参数oflag必须要有O_RDONLY(只读)、标志O_RDWR (读写), O_WRONLY(只写)之一,除此之外还可以指定 O_CREAT(没有该对象则创建)、O_EXCL(如果 O_CREAT 指定,但 name 不存在,就返回错误),O_NONBLOCK(以非阻塞方式打开消息队列,在正常情况下mq_receive和mq_send 函数会阻塞的地方,使用该标志打开的消息队列会返回 EAGAIN 错误)。
当操作一个新队列时,使用 O_CREAT 标识,此时后面两个参数需要被指定,参数 mode 为指定权限位,attr 指定新创建队列的属性。
关闭进程描述符操作接口int mq_close(mqd_t mqdes);
关闭之后告诉进程不在使用该描述符,但消息队列不会从系统中删除。
系统中删除某个消息队列操作接口int mq_unlink(const char *name);
参数为mq_open()函数第一个参数,调用该接口后删除会马上发生,即使该队列的描述符引用计数仍然大于0。
关于消息队列中设置和和获取消息队列属性接口mqd_t mq_getattr(mqd_t mqdes, struct mq_attr *attr); mqd_t mq_setattr(mqd_t mqdes, struct mq_attr *newattr, struct mq_attr *oldattr);
每个消息队列有四个属性, mq_getattr返回所有的这些属性, mq_setattr设置其中的某个属性
消息队列的消息具体属性如下struct mq_attr { long mq_flags; /* Flags: 0 or O_NONBLOCK */ long mq_maxmsg; /* Max. # of messages on queue */ long mq_msgsize; /* Max. message size (bytes) */ long mq_curmsgs; /* # of messages currently in queue */ }
指向mq_attr的指针可以作为mq_open函数的第四个参数传递, 从而在创建队列初就设置好每个消息的最大长度和允许存在的最大消息数量, 另外两个成员被忽略。
mq_setattr给所指定队列设置属性, 但是只使用由attr指向的mq_attr结构的mq_flags成员, 以设置或清除非阻塞标志, 其他三个成员则被忽略(其中两个只能在创建队列时指定, 还有一个及时获取)。当然, mq_setattr的最后一个参数用于接收之前的属性和当前状态
向消息队列放置和取走消息的操作接口int mq_send(mqd_t mqdes, const char *msg_ptr, size_t msg_len, unsigned msg_prio); ssize_t mq_receive(mqd_t mqdes, char *msg_ptr, size_t msg_len, unsigned *msg_prio);
参数msg_ptr为指向消息的指针。
msg_len为消息长度,该值不能大于属性值中mq_msgsize的值。
msg_prio为优先级,消息在队列中将按照优先级大小顺序来排列消息。
如果消息队列已满,mq_send()函数将阻塞,直到队列有可用空间再次允许放置消息或该调用被信号打断;如果O_NONBLOCK被指定,mq_send()那么将不会阻塞,而是返回EAGAIN错误。
如果队列空,mq_receive()函数将阻塞,直到消息队列中有新的消息;如果O_NONBLOCK被指定,mq_receive()那么将不会阻塞,而是返回EAGAIN错误。消息队列的原理分析
消息队列的初始化static int __init init_mqueue_fs(void) { ... //注册消息队列文件系统 error = register_filesystem(&mqueue_fs_type); //构建struct vfsmount结构主要是获取文件系统的super_block对象与根目录的inode与dentry对象,并将这些对象加入到系统链表 if (IS_ERR(mqueue_mnt = kern_mount(&mqueue_fs_type))) { ... } queues_count = 0; spin_lock_init(&mq_lock); return 0; out_filesystem: out_sysctl: return error; } __initcall(init_mqueue_fs);
消息队列文件系统初始化很简单,主要工作如下:注册文件系统,把 mqueue_fs_type 加入到 file_systems 链表中。构建struct vfsmount结构,把获取的文件系统的super_block对象与根目录的 inode 与 dentry 对象,并将这些对象加入到系统链表中。
mq_open 接口分析asmlinkage long sys_mq_open(const char __user *u_name, int oflag, mode_t mode, struct mq_attr __user *u_attr) { ... //获取一个未使用的文件描述符 fd = get_unused_fd(); mutex_lock(&mqueue_mnt->mnt_root->d_inode->i_mutex); //获取一个名字为name的dentry结构 dentry = lookup_one_len(name, mqueue_mnt->mnt_root, strlen(name)); mntget(mqueue_mnt); //若是新创建 if (oflag & O_CREAT) { if (dentry->d_inode) { /* entry already exists */ audit_inode(name, dentry); error = -EEXIST; if (oflag & O_EXCL) goto out; //若已经存在,则直接打开file filp = do_open(dentry, oflag); } else { //创建一个file结构,把inode和dentry与之关联 filp = do_create(mqueue_mnt->mnt_root, dentry, oflag, mode, u_attr); } } else { //否则,直接获取 error = -ENOENT; if (!dentry->d_inode) goto out; audit_inode(name, dentry); filp = do_open(dentry, oflag); } if (IS_ERR(filp)) { error = PTR_ERR(filp); goto out_putfd; } //给描述符设置close_on_exec标志 set_close_on_exec(fd, 1); //文件描述符与file进行关联 fd_install(fd, filp); goto out_upsem; ... return fd; }
mq_open 的操作很简单,操作如下:获取一个未使用的文件描述符 fd;根据参数name获取一个 dentry;根据 oflag 表示判断是否是新创建还是使用已存在的 file,若新创建,则生成一个 file 结构,同时与 fd 、 inode、dentry 进行关联;否则打开一个已存在的file。
mq_send 接口分析
当使用 mq_send 发送消息时,比如如下调用,mq_send(mqd, msg, msg_len, msg_prio)
发送消息时,最终会调用如下函数mq_timedsend(mqd, msg, msg_len, msg_prio, NULL)
asmlinkage long sys_mq_timedsend(mqd_t mqdes, const char __user *u_msg_ptr, size_t msg_len, unsigned int msg_prio, const struct timespec __user *u_abs_timeout) { ... //获取file结构 filp = fget(mqdes); if (unlikely(!filp)) goto out; inode = filp->f_path.dentry->d_inode; //获取inode 下的 mqueue_inode_info info = MQUEUE_I(inode); //把用户传来的消息转换成内核消息链表,若用户消息长度大于PAGE_SIZE,则链表结构为 msg_msg-> msg_msgseg -> msg_msgseg 每个节点都已一个页大小,除了最后一个节点 //若用户消息大小小于PAGE_SIZE,链表只有一个节点 msg_msg // msg_ptr 指向链表头节点 msg_msg msg_ptr = load_msg(u_msg_ptr, msg_len); //msg_msg 中记录消息的总长度,和 消息优先级 msg_ptr->m_ts = msg_len; msg_ptr->m_type = msg_prio; spin_lock(&info->lock); //若消息数量达到最大值 if (info->attr.mq_curmsgs == info->attr.mq_maxmsg) { //若非阻塞,则返回 if (filp->f_flags & O_NONBLOCK) { spin_unlock(&info->lock); ret = -EAGAIN; //若超时时间小于0,则返回超时时间 } else if (unlikely(timeout < 0)) { spin_unlock(&info->lock); ret = timeout; } else { //阻塞调用,则进程休眠,把wait加入到info中的SEND等待队列中 wait.task = current; wait.msg = (void *) msg_ptr; wait.state = STATE_NONE; ret = wq_sleep(info, SEND, timeout, &wait); } } else { //若info 接收队列中有阻塞的进程,则把要发送的数据挂到阻塞的进程的消息节点上,然后唤醒阻塞的接收进程 receiver = wq_get_first_waiter(info, RECV); if (receiver) { pipelined_send(info, msg_ptr, receiver); } else { //把消息挂到消息队列上,并进行通知 msg_insert(msg_ptr, info); __do_notify(info); } inode->i_atime = inode->i_mtime = inode->i_ctime = CURRENT_TIME; spin_unlock(&info->lock); ret = 0; } return ret; }
sys_mq_timedsend 功能如下:发送消息前先把用户消息转换成消息链表。若消息队列满了,则根据条件进行是否阻塞。消息队列未满,若接收队列中存在阻塞的接收进程,则把要发送的数据挂到阻塞的进程的消息节点上,然后唤醒阻塞的接收进程;否则把消息加到消息队列中。
mq_receive 接口分析
当使用 mq_receive 接收消息时,比如如下调用,
接收消息时,最终会调用如下函数mq_timedreceive(mqd, msg, msg_len, &msg_prio, NULL)具体实现如下asmlinkage ssize_t sys_mq_timedreceive(mqd_t mqdes, char __user *u_msg_ptr, size_t msg_len, unsigned int __user *u_msg_prio, const struct timespec __user *u_abs_timeout) { ... //获取file结构 filp = fget(mqdes); inode = filp->f_path.dentry->d_inode; //获取inode 下的 mqueue_inode_info info = MQUEUE_I(inode); audit_inode(NULL, filp->f_path.dentry); spin_lock(&info->lock); //消息队列当前没有消息 if (info->attr.mq_curmsgs == 0) { //若非阻塞,则返回 if (filp->f_flags & O_NONBLOCK) { spin_unlock(&info->lock); ret = -EAGAIN; msg_ptr = NULL; //若超时时间小于0,则返回超时时间 } else if (unlikely(timeout < 0)) { } else { //阻塞调用,则进程休眠,把wait加入到info中的RECV等待队列中 wait.task = current; wait.state = STATE_NONE; ret = wq_sleep(info, RECV, timeout, &wait); msg_ptr = wait.msg; } } else { //从消息队列的最高优先级中获取一个消息 msg_ptr = msg_get(info); inode->i_atime = inode->i_mtime = inode->i_ctime = CURRENT_TIME; //由于从消息队列中已经取出一个消息了,有剩余的空间,因此把等待SEND队列上的进程的消息挂到消息队里中,然后唤醒发送消息的进程 pipelined_receive(info); spin_unlock(&info->lock); ret = 0; } if (ret == 0) { ret = msg_ptr->m_ts; //把消息拷贝到用户态 if ((u_msg_prio && put_user(msg_ptr->m_type, u_msg_prio)) || store_msg(u_msg_ptr, msg_ptr, msg_ptr->m_ts)) { ret = -EFAULT; } //释放消息空间 free_msg(msg_ptr); } out_fput: fput(filp); out: return ret; }
sys_mq_timedreceive 功能如下:获取消息前,先判断消息队列中是否有消息,若没有,则根据条件进行是否阻塞。若消息队列中有消息,则从消息队列的获取一个最高优先级的消息。唤醒因为消息队列满而阻塞的发送进程。把消息拷贝到用户缓冲区中。
mq_unlink 接口分析asmlinkage long sys_mq_unlink(const char __user *u_name) { ... //引用计数减1,删除目录项 err = vfs_unlink(dentry->d_parent->d_inode, dentry); return err; }
该函数作用很简单,就是减少引用计数,删除目录项。
经过上述的分析,有关消息队列的内存结构可以总结如下:
古村寻踪大山深处藏古村泉水飞溅似珍珠天上二十八宿,珍珠二十八泉。淄博市博山区源泉镇的珍珠村建于明末清初,当初为了躲避战乱,附近山民迁居于此。8月14日,一片欢声笑语让这颗珍珠更加璀璨,连日的降雨使得山上泉水相继涌现,
中国大手笔!匈牙利双喜临门赚大了近日,匈牙利真的是双喜临门!第一喜是俄罗斯送的俄匈已经达成协议,俄超大规模的供气,并且过路费也给了乌克兰。在德国为了省能源都开始烧劈柴洗澡只洗四个部位和火葬场5人合炉火花之际,匈牙
山西地质博物馆一日游地质博物馆大门恐龙化石大自然的鬼斧神工大家伙人还不少漂亮大家伙大家伙今日有空,约同事一起逛一下地质博物馆,不用预约,需五日核酸检测报告,健康码。暑假期间人很多,大部分是家长带小孩子
探幽鬼谷洞鬼谷大仙披门先师修真之所,王利隐居处鬼谷洞,位于玉泉街道三桥村清溪山北面的半山腰上。这里山势陡峭,凌空交合,形成一条很长的峡谷。古时候,生长在这里的山民,沿袭着巴人洞葬的奔丧习俗。死了的人,便将尸体搬到峡谷两边山腰的
晒晒我的退休生活60岁退休后,我回到了天津,第一件事就是参加了夕阳红骑行队。日常,与老伙伴们结队跨上自行車,在城里的街道和乡间公路上飞驰。我们的身影遍布津沽大地,逛公园,看展览,游寺庙特别是春秋两
皖北往事之秦山寺作者赵汗青刘欣华一山一寺一苦僧,数年坚守敲磬钟。若非朱棣来擒山,寺名仍然叫卧龙。题秦山寺2022年8月10日上午,在表兄丙奇的带领下,我们前往宿州市埇桥区曹村镇西境大山里的秦山寺采
古村寻踪大山深处藏古村泉水飞溅似珍珠天上二十八宿,珍珠二十八泉。淄博市博山区源泉镇的珍珠村建于明末清初,当初为了躲避战乱,附近山民迁居于此。8月14日,一片欢声笑语让这颗珍珠更加璀璨,连日的降雨使得山上泉水相继涌现,
中国大手笔!匈牙利双喜临门赚大了近日,匈牙利真的是双喜临门!第一喜是俄罗斯送的俄匈已经达成协议,俄超大规模的供气,并且过路费也给了乌克兰。在德国为了省能源都开始烧劈柴洗澡只洗四个部位和火葬场5人合炉火花之际,匈牙
经常喝小米粥,到底是会升高血糖,还是能降低血糖?告诉你真相王先生今年50岁,是一名糖尿病患者,平时血糖控制得还算不错,但有一件事却成为了他的困扰,就是每次喝完小米粥,他的餐后血糖就会有所升高。一开始以为只是小问题,可最近情况逐渐加重,本来
经常喝小米粥,到底是会升高血糖,还是能降低血糖?告诉你真相王先生今年50岁,是一名糖尿病患者,平时血糖控制得还算不错,但有一件事却成为了他的困扰,就是每次喝完小米粥,他的餐后血糖就会有所升高。一开始以为只是小问题,可最近情况逐渐加重,本来
华为概念第一龙头,多重形态叠加,股价7。2元,能否冲高回暖?技术派的看过来,这货完全可以用技术解读。头肩底启明星仙人指路同时出现,操盘的人一定是个技术高手,股价已经经历了86。10的跌幅,如今只剩7。20元,能否成功变盘,拭目以待。核心概念