消息队列是LinuxIPC中很常用的一种通信方式,今天分析一下Posix消息队列,本文中所讲的消息队列均为Posix消息队列。什么是Posix消息队 消息队列可以认为它是一个消息链表,有足够写权限的进程可以往队列中发送消息,有足够读权限的进程可以往队列中接收消息。 每个消息都是一个记录,它由发送者赋予一个优先级。在某个进程往一个队列写入消息之前,并不需要另外某个进程在该队列上等待消息的到达。这也就说明消息队列具有随内核的持续性,也就是说进程关闭后,消息队列依然存在,除非内核重新自举。 Posix消息队列有如下特点:对Posix消息队列的读总是返回优先级最高最早的消息。当往空的消息队列中放置一个消息时,Posix消息队列允许产生一个信号或者启动一个接收线程。 Posix消息队列中的每条消息通常具有以下属性:一个表示优先级的整数;消息的数据部分的长度;消息数据本身; 消息队列的基本操作 打开或创建一个posix消息队列操作接口 mqdtmqopen(constcharname,intoflag,modetmode,structmqattrattr); Linkwithlrt。 参数name为posixIPC名字,即将要被打开或创建的消息队列对象,为了便于移植,需要指定为name的格式。 参数oflag必须要有ORDONLY(只读)、标志ORDWR(读写),OWRONLY(只写)之一,除此之外还可以指定OCREAT(没有该对象则创建)、OEXCL(如果OCREAT指定,但name不存在,就返回错误),ONONBLOCK(以非阻塞方式打开消息队列,在正常情况下mqreceive和mqsend函数会阻塞的地方,使用该标志打开的消息队列会返回EAGAIN错误)。 当操作一个新队列时,使用OCREAT标识,此时后面两个参数需要被指定,参数mode为指定权限位,attr指定新创建队列的属性。 关闭进程描述符操作接口intmqclose(mqdtmqdes); 关闭之后告诉进程不在使用该描述符,但消息队列不会从系统中删除。 系统中删除某个消息队列操作接口intmqunlink(constcharname); 参数为mqopen()函数第一个参数,调用该接口后删除会马上发生,即使该队列的描述符引用计数仍然大于0。 关于消息队列中设置和和获取消息队列属性接口mqdtmqgetattr(mqdtmqdes,structmqattrattr);mqdtmqsetattr(mqdtmqdes,structmqattrnewattr,structmqattroldattr); 每个消息队列有四个属性,mqgetattr返回所有的这些属性,mqsetattr设置其中的某个属性 消息队列的消息具体属性如下structmqattr{longmqflags;Flags:0orONONBLOCKlongmqmaxmsg;Max。ofmessagesonqueuelongmqmsgsize;Max。messagesize(bytes)longmqcurmsgs;ofmessagescurrentlyinqueue} 指向mqattr的指针可以作为mqopen函数的第四个参数传递,从而在创建队列初就设置好每个消息的最大长度和允许存在的最大消息数量,另外两个成员被忽略。 mqsetattr给所指定队列设置属性,但是只使用由attr指向的mqattr结构的mqflags成员,以设置或清除非阻塞标志,其他三个成员则被忽略(其中两个只能在创建队列时指定,还有一个及时获取)。当然,mqsetattr的最后一个参数用于接收之前的属性和当前状态 向消息队列放置和取走消息的操作接口intmqsend(mqdtmqdes,constcharmsgptr,sizetmsglen,unsignedmsgprio);ssizetmqreceive(mqdtmqdes,charmsgptr,sizetmsglen,unsignedmsgprio); 参数msgptr为指向消息的指针。 msglen为消息长度,该值不能大于属性值中mqmsgsize的值。 msgprio为优先级,消息在队列中将按照优先级大小顺序来排列消息。 如果消息队列已满,mqsend()函数将阻塞,直到队列有可用空间再次允许放置消息或该调用被信号打断;如果ONONBLOCK被指定,mqsend()那么将不会阻塞,而是返回EAGAIN错误。 如果队列空,mqreceive()函数将阻塞,直到消息队列中有新的消息;如果ONONBLOCK被指定,mqreceive()那么将不会阻塞,而是返回EAGAIN错误。消息队列的原理分析 消息队列的初始化staticintinitinitmqueuefs(void){。。。注册消息队列文件系统errorregisterfilesystem(mqueuefstype);构建structvfsmount结构主要是获取文件系统的superblock对象与根目录的inode与dentry对象,并将这些对象加入到系统链表if(ISERR(mqueuemntkernmount(mqueuefstype))){。。。}queuescount0;spinlockinit(mqlock);return0;outfilesystem:outsysctl:returnerror;}initcall(initmqueuefs); 消息队列文件系统初始化很简单,主要工作如下:注册文件系统,把mqueuefstype加入到filesystems链表中。构建structvfsmount结构,把获取的文件系统的superblock对象与根目录的inode与dentry对象,并将这些对象加入到系统链表中。 mqopen接口分析asmlinkagelongsysmqopen(constcharuseruname,intoflag,modetmode,structmqattruseruattr){。。。获取一个未使用的文件描述符fdgetunusedfd();mutexlock(mqueuemntmntrootdinodeimutex);获取一个名字为name的dentry结构dentrylookuponelen(name,mqueuemntmntroot,strlen(name));mntget(mqueuemnt);若是新创建if(oflagOCREAT){if(dentrydinode){entryalreadyexistsauditinode(name,dentry);errorEEXIST;if(oflagOEXCL)gotoout;若已经存在,则直接打开filefilpdoopen(dentry,oflag);}else{创建一个file结构,把inode和dentry与之关联filpdocreate(mqueuemntmntroot,dentry,oflag,mode,uattr);}}else{否则,直接获取errorENOENT;if(!dentrydinode)gotoout;auditinode(name,dentry);filpdoopen(dentry,oflag);}if(ISERR(filp)){errorPTRERR(filp);gotooutputfd;}给描述符设置closeonexec标志setcloseonexec(fd,1);文件描述符与file进行关联fdinstall(fd,filp);gotooutupsem;。。。returnfd;} mqopen的操作很简单,操作如下:获取一个未使用的文件描述符fd;根据参数name获取一个dentry;根据oflag表示判断是否是新创建还是使用已存在的file,若新创建,则生成一个file结构,同时与fd、inode、dentry进行关联;否则打开一个已存在的file。 mqsend接口分析 当使用mqsend发送消息时,比如如下调用,mqsend(mqd,msg,msglen,msgprio) 发送消息时,最终会调用如下函数mqtimedsend(mqd,msg,msglen,msgprio,NULL) asmlinkagelongsysmqtimedsend(mqdtmqdes,constcharuserumsgptr,sizetmsglen,unsignedintmsgprio,conststructtimespecuseruabstimeout){。。。获取file结构filpfget(mqdes);if(unlikely(!filp))gotoout;inodefilpfpath。dentrydinode;获取inode下的mqueueinodeinfoinfoMQUEUEI(inode);把用户传来的消息转换成内核消息链表,若用户消息长度大于PAGESIZE,则链表结构为msgmsgmsgmsgsegmsgmsgseg每个节点都已一个页大小,除了最后一个节点若用户消息大小小于PAGESIZE,链表只有一个节点msgmsgmsgptr指向链表头节点msgmsgmsgptrloadmsg(umsgptr,msglen);msgmsg中记录消息的总长度,和消息优先级msgptrmtsmsglen;msgptrmtypemsgprio;spinlock(infolock);若消息数量达到最大值if(infoattr。mqcurmsgsinfoattr。mqmaxmsg){若非阻塞,则返回if(filpfflagsONONBLOCK){spinunlock(infolock);retEAGAIN;若超时时间小于0,则返回超时时间}elseif(unlikely(timeout0)){spinunlock(infolock);rettimeout;}else{阻塞调用,则进程休眠,把wait加入到info中的SEND等待队列中wait。taskcurrent;wait。msg(void)msgptr;wait。stateSTATENONE;retwqsleep(info,SEND,timeout,wait);}}else{若info接收队列中有阻塞的进程,则把要发送的数据挂到阻塞的进程的消息节点上,然后唤醒阻塞的接收进程receiverwqgetfirstwaiter(info,RECV);if(receiver){pipelinedsend(info,msgptr,receiver);}else{把消息挂到消息队列上,并进行通知msginsert(msgptr,info);donotify(info);}inodeiatimeinodeimtimeinodeictimeCURRENTTIME;spinunlock(infolock);ret0;}returnret;} sysmqtimedsend功能如下:发送消息前先把用户消息转换成消息链表。若消息队列满了,则根据条件进行是否阻塞。消息队列未满,若接收队列中存在阻塞的接收进程,则把要发送的数据挂到阻塞的进程的消息节点上,然后唤醒阻塞的接收进程;否则把消息加到消息队列中。 mqreceive接口分析 当使用mqreceive接收消息时,比如如下调用, 接收消息时,最终会调用如下函数mqtimedreceive(mqd,msg,msglen,msgprio,NULL)具体实现如下asmlinkagessizetsysmqtimedreceive(mqdtmqdes,charuserumsgptr,sizetmsglen,unsignedintuserumsgprio,conststructtimespecuseruabstimeout){。。。获取file结构filpfget(mqdes);inodefilpfpath。dentrydinode;获取inode下的mqueueinodeinfoinfoMQUEUEI(inode);auditinode(NULL,filpfpath。dentry);spinlock(infolock);消息队列当前没有消息if(infoattr。mqcurmsgs0){若非阻塞,则返回if(filpfflagsONONBLOCK){spinunlock(infolock);retEAGAIN;msgptrNULL;若超时时间小于0,则返回超时时间}elseif(unlikely(timeout0)){}else{阻塞调用,则进程休眠,把wait加入到info中的RECV等待队列中wait。taskcurrent;wait。stateSTATENONE;retwqsleep(info,RECV,timeout,wait);msgptrwait。msg;}}else{从消息队列的最高优先级中获取一个消息msgptrmsgget(info);inodeiatimeinodeimtimeinodeictimeCURRENTTIME;由于从消息队列中已经取出一个消息了,有剩余的空间,因此把等待SEND队列上的进程的消息挂到消息队里中,然后唤醒发送消息的进程pipelinedreceive(info);spinunlock(infolock);ret0;}if(ret0){retmsgptrmts;把消息拷贝到用户态if((umsgprioputuser(msgptrmtype,umsgprio))storemsg(umsgptr,msgptr,msgptrmts)){retEFAULT;}释放消息空间freemsg(msgptr);}outfput:fput(filp);out:returnret;} sysmqtimedreceive功能如下:获取消息前,先判断消息队列中是否有消息,若没有,则根据条件进行是否阻塞。若消息队列中有消息,则从消息队列的获取一个最高优先级的消息。唤醒因为消息队列满而阻塞的发送进程。把消息拷贝到用户缓冲区中。 mqunlink接口分析asmlinkagelongsysmqunlink(constcharuseruname){。。。引用计数减1,删除目录项errvfsunlink(dentrydparentdinode,dentry);returnerr;} 该函数作用很简单,就是减少引用计数,删除目录项。 经过上述的分析,有关消息队列的内存结构可以总结如下: