设为首页 收藏本站
查看: 549|回复: 0

[经验分享] Linux进程通信之System V消息队列

[复制链接]

尚未签到

发表于 2016-3-16 13:16:02 | 显示全部楼层 |阅读模式
  SystemV消息队列是OpenGroup定义的XSI,不属于POSIX标准。SystemVIPC的历史相对很早,在上个世70年代后期有贝尔实验室的分支机构开发,80年代加入SystemV的系统内核中,后来商用UNIX系统基本都加入了SystemVIPC的功能。
  SystemV消息队列相对于POSIX消息队列的区别主要是:



  • POSIX消息队列的读操作总是返回消息队列中优先级最高的最早消息,而对于SystemV消息队列可以返回任意指定优先级(通过消息类型)的消息。
  • 当向一个空消息队列中写入一个消息时,POSIX消息队列允许产生一个信号或启动一个线程,SystemV消息队列不提供类似的机制。
  系统内核都会为每一个SystemV消息队列维护一个信息结构,在Linux2.6.18中的定义如下:
  

<bits/msq.h>
struct msqid_ds
{
struct ipc_perm msg_perm;        /*IPC对象的属性信息和访问权限 */
__time_t msg_stime;              /* time of last msgsnd command */
__time_t msg_rtime;              /* time of last msgrcv command */
__time_t msg_ctime;              /* time of last change */
unsigned long int __msg_cbytes;  /* 当前队列中消息的字节数 */
msgqnum_t msg_qnum;              /* 当前队列中消息的个数 */
msglen_t msg_qbytes;             /* 队列允许存放的最大字节数 */
__pid_t msg_lspid;               /* pid of last msgsnd() */
__pid_t msg_lrpid;               /* pid of last msgrcv() */
//下面是保留字段
#if __WORDSIZE == 32
unsigned long int __unused1;
unsigned long int __unused2;
unsigned long int __unused3;
#endif
unsigned long int __unused4;
unsigned long int __unused5;
};
  
  消息队列的结构可能的设计如下:



1SystemV消息队列的创建和打开

  SystemV消息队列的创建和使用会使用下面的函数接口:
  

#include <sys/msg.h>
int msgget(key_t key, int oflg);
//成功返回非负消息队列描述符,失败返回-1
  
  key:消息队列的键,用来创建一个消息队列。SystemIPC都有一个key,作为IPC的外部标识符,创建成功后返回的描述符作为IPC的内部标识符使用。key的主要目的就是使不同进程在同一IPC汇合。key具体说可以有三种方式生成:



  • 不同的进程约定好的一个值;
  • 通过相同的路径名和项目ID,调用ftok()函数,生成一个键;
  • 还可以设置为IPC_PRIVATE,这样就会创建一个新的,唯一的IPC对象;然后将返回的描述符通过某种方式传递给其他进程;
  oflg:指定创建或打开消息队列的标志和读写权限(ipc_perm中的mode成员)。我们知道SystemVIPC定义了自己的操作标志和权限设置标志,而且都是通过该参数传递,这和open函数存在差别,open函数第三个参数mode用于传递文件的权限标志。SystemVIPC的操作标志包含:IPC_CREATIPC_EXCL,权限设置标志如下图:
  

  下面是创建消息队列的测试代码:
  

#include <iostream>
#include <cstring>
#include <errno.h>
#include <unistd.h>
#include <fcntl.h>
#include <sys/msg.h>
using namespace std;
#define  PATH_NAME "/tmp/anonymQueue"
int main(int argc, char **argv)
{
key_t key;
int fd;
if ((fd = open(PATH_NAME, O_CREAT, 0666)) < 0)
{
cout<<"open file "<<PATH_NAME<<"failed.";
cout<<strerror(errno)<<endl;
return -1;
}
close(fd);
key = ftok(PATH_NAME, 0);
int msgID;
if ((msgID = msgget(key, IPC_CREAT | 0666)) < 0)
{
cout<<"open message queue failed...";
cout<<strerror(errno)<<endl;
return -1;
}
cout<<"key:0x"<<hex<<key<<endl;
cout<<"descriptor id:"<<dec<<msgID<<endl;
}
  
  在Linux2.6.18下运行结果:

key:0x8015
descriptor id:917511
  实现SystemVIPC的任何系统都提供两个特殊的程序ipcsipcrmipcs输出IPC的各种信息,ipcrm则用于删除各种SystemVIPC。由于SystemVIPC不属于POSIX标准,所以这两个命令也未被标准化。下面是通过ipcs命令来查看刚刚创建的消息队列。
  

[iyunv@idcserver program]# ipcs -q -i 917511
Message Queue msqid=917511
uid=0   gid=0   cuid=0  cgid=0  mode=0666
cbytes=0        qbytes=65536    qnum=0  lspid=0 lrpid=0
send_time=Not set                  
rcv_time=Not set                  
change_time=Wed Aug  7 16:39:46 2013  
  

2SystemV消息队列的使用

  SystemV消息队列的写入消息使用下面的函数接口:
  

#include <sys/msg.h>
int msgsnd(int msqid, const void *msgp, size_t msgsz, int msgflg);
//成功返回0,失败返回-1
  
  msqid:消息队列的描述符;
  msgp:指向存放消息的缓冲区,该缓冲区中包含消息类型消息体两部分内容。该缓冲区的结构是由用户定义的,在<sys/msg.h>中有关于该缓冲区结构定义的参版考模
  

struct msgbuf
{
long int mtype;             /* type of received/sent message */
char mtext[1];              /* text of the message */
};
  
  缓冲区的开头是一个long型的消息类型,该消息类型必须是一个非负数。紧跟在消息类型后面的是消息体部分(如果消息长度大于0),参考模版中定义的mtext只是说明消息体,该部分可以自定义长度。我们自己的应用都会定义特定的消息结构。
  msgsz:缓冲区中消息体部分的长度;
  msgflg:设置操作标志。可以为0IPC_NOWAIT用于在消息队列中没有可用的空间时,调用线程采用何种操作方式
  标志为IPC_NOWAIT,表示msgsnd操作以非阻塞的方式进行,在消息队列中没有可用的空间时,msgsnd操作会立刻返回。并指定EAGAIN错误;
  标志为0,表示msgsnd操作以阻塞的方式进行,这种情况下在消息队列中没有可用的空间时调用线程会被阻塞,直到下面的情况发生:



  • 等到有存放消息的空间;
  • 消息队列从系统中删除,这种情况下回返回一个EIDRM错误;
  • 调用线程被某个捕捉到的信号中断,这种情况下返回一个EINTR错误;
  SystemV消息队列的读取消息使用下面的函数接口:
  

#include <sys/msg.h>
ssize_t msgrcv(int msqid, void *msgp, size_t msgsz, long msgtyp, int msgflg);
//成功返回接收到的消息的消息体的字节数,失败返回-1
  
  msqid:消息队列的描述符;
  msgp:指向待存放消息的缓冲区,该缓冲区中将会存放接收到的消息的消息类型消息体两部分内容。该缓冲区的结构是由用户定义的,和msgsnd相对应。
  msgsz:缓冲区中能存放消息体部分的最大长度,这也是该函数能返回的最大数据量;该字段的大小应该是sizeof(msgbuffer)-sizeof(long)
  msgtyp:希望从消息队列中获取的消息类型。


  • msgtyp0,返回消息队列中的第一个消息;
  • msgtyp>0,返回该消息类型的第一个消息;
  • msgtyp<0,返回小于或等于msgtyp绝对值的消息中类型最小的第一个消息;
  msgflg:设置操作标志。可以为0IPC_NOWAITMSG_NOERROR用于在消息队列中没有可用的指定消息时,调用线程采用何种操作方式
  标志为IPC_NOWAIT,表示msgrcv操作以非阻塞的方式进行,在消息队列中没有可用的指定消息时,msgrcv操作会立刻返回,并设定errnoENOMSG
  标志为0,表示msgrcv操作是阻塞操作,直到下面的情况发生:



  • 消息队列中有一个所请求的消息类型可以获取;
  • 消息队列从系统中删除,这种情况下回返回一个EIDRM错误;
  • 调用线程被某个捕捉到的信号中断,这种情况下返回一个EINTR错误;
  标志为MSG_NOERROR,表示接收到的消息的消息体的长度大于msgsz长度时,msgrcv采取的操作。如果设置了该标志msgrcv在这种情况下回截断数据部分,而不返回错误,否则返回一个E2BIG错误。
  下面是关于消息队列读写的测试代码:
  

#include <iostream>
#include <cstring>
#include <errno.h>
#include <unistd.h>
#include <fcntl.h>
#include <sys/msg.h>
using namespace std;
#define  PATH_NAME "/tmp/anonymQueue"
key_t CreateKey(const char *pathName)
{
int fd;
if ((fd = open(PATH_NAME, O_CREAT, 0666)) < 0)
{
cout<<"open file "<<PATH_NAME<<"failed.";
cout<<strerror(errno)<<endl;
return -1;
}
close(fd);
return ftok(PATH_NAME, 0);
}
int main(int argc, char **argv)
{
key_t key;
key = CreateKey(PATH_NAME);
int msgID;
if ((msgID = msgget(key, IPC_CREAT | 0666)) < 0)
{
cout<<"open message queue failed...";
cout<<strerror(errno)<<endl;
return -1;
}
if (fork() == 0)
{
char msg[] = "cute yuki...";
char *msgBuf = new char[sizeof(long) + sizeof(msg)];
long mtype = 1;
memmove(msgBuf, &mtype, sizeof(mtype));
memmove(msgBuf + sizeof(mtype), msg, sizeof(msg));
for (int i = 1; i <= 5; ++i)
{
if (msgsnd(msgID, msgBuf, sizeof(msg), 0) < 0)
{
cout<<"send message "<<i<<"failed...";
cout<<strerror(errno)<<endl;
continue;
}
cout<<"child: send message "<<i<<" success..."<<endl;
sleep(1);
}
exit(0);
}
char msgBuf[256];
long mtype;
int recvLen;
for (int i = 1; i <= 5; ++i)
{
recvLen = msgrcv(msgID, msgBuf, 256 - sizeof(long), 0, 0);
if (recvLen < 0)
{
cout<<"receive message failed...";
cout<<strerror(errno)<<endl;
continue;
}
memmove(&mtype, msgBuf, sizeof(long));
cout<<"parent receive a message:"<<endl;
cout<<"message type:"<<mtype<<endl;
cout<<"message body:"<<msgBuf + sizeof(long)<<endl;
}
return 0;
}
  
  在Linux2.6.18下的执行结果为:
  

child: send message 1 success...
parent receive a message:
message type:1
message body:cute yuki...
child: send message 2 success...
parent receive a message:
message type:1
message body:cute yuki...
child: send message 3 success...
parent receive a message:
message type:1
message body:cute yuki...
child: send message 4 success...
parent receive a message:
message type:1
message body:cute yuki...
child: send message 5 success...
parent receive a message:
message type:1
message body:cute yuki...
  

3SystemV消息队列的控制操作

  对SystemV消息队列的删除,属性的设置和获取等控制操作要使用下面的函数接口:
  

#include <sys/msg.h>
int msgctl(int msqid, int cmd, struct msqid_ds *buf);
//成功返回0,失败返回-1
  
  msqid:消息队列的描述符;
  cmd:控制操作的命令,SUS标准提供以下三个命令:



  • IPC_RMID,删除一个消息队列。执行该命令系统会立刻把该消息队列从内核中删除,该消息队列中的所有消息将会被丢弃。这和已经讨论过的POSIX消息队列有很大差别POSIX消息队列通过调用mq_unlink来从内核中删除一个消息队列,但消息队列的真正析构会在最后一个mq_close结束后发生。
  • IPC_SET,根据buf的所指的值来设置消息队列msqid_ds结构中的msg_perm.uidmsg_perm.gidmsg_perm.modemsg_qbytes四个成员。
  • IPC_STAT,通过buf返回当前消息队列的msqid_ds结构。
  • Linux下还有例如IPC_INFOMSG_INFO等命令,具体可以参考Linux手册;
  buf:指向msqid_ds结构的指针;
  下面是测试代码:
  

#include <iostream>
#include <cstring>
#include <errno.h>
#include <unistd.h>
#include <fcntl.h>
#include <sys/msg.h>
using namespace std;
#define  PATH_NAME "/tmp/anonymQueue"
key_t CreateKey(const char *pathName)
{
int fd;
if ((fd = open(PATH_NAME, O_CREAT, 0666)) < 0)
{
cout<<"open file "<<PATH_NAME<<"failed.";
cout<<strerror(errno)<<endl;
return -1;
}
close(fd);
return ftok(PATH_NAME, 0);
}
int main(int argc, char **argv)
{
key_t key;
key = CreateKey(PATH_NAME);
int msgID;
if ((msgID = msgget(key, IPC_CREAT | 0666)) < 0)
{
cout<<"open message queue failed...";
cout<<strerror(errno)<<endl;
return -1;
}
msqid_ds msgInfo;
msgctl(msgID, IPC_STAT, &msgInfo);
cout<<"msg_qbytes:"<<msgInfo.msg_qbytes<<endl;
cout<<"msg_qnum:"<<msgInfo.msg_qnum<<endl;
cout<<"msg_cbytes:"<<msgInfo.msg_cbytes<<endl;
return 0;
}
  
  在Linux2.6.18下的执行结果为:
  

msg_qbytes:65536
msg_qnum:2
msg_cbytes:26
  
  关于消息队列中允许存放最大的字节数可以通过IPC_SET命令进行修改,该修改只能针对本消息队列生效。如下测试代码:
  

int main(int argc, char **argv)
{
key_t key;
key = CreateKey(PATH_NAME);
int msgID;
if ((msgID = msgget(key, IPC_CREAT | 0666)) < 0)
{
cout<<"open message queue failed...";
cout<<strerror(errno)<<endl;
return -1;
}
msqid_ds msgInfo;
msgctl(msgID, IPC_STAT, &msgInfo);
cout<<"msg_qbytes:"<<msgInfo.msg_qbytes<<endl;
cout<<"msg_qnum:"<<msgInfo.msg_qnum<<endl;
cout<<"msg_cbytes:"<<msgInfo.msg_cbytes<<endl;
msgInfo.msg_qbytes = 6553600;
if (msgctl(msgID, IPC_SET, &msgInfo) < 0)
{
cout<<"set message queue failed...";
cout<<strerror(errno)<<endl;
return -1;
}
msgctl(msgID, IPC_STAT, &msgInfo);
cout<<"msg_qbytes:"<<msgInfo.msg_qbytes<<endl;
cout<<"msg_qnum:"<<msgInfo.msg_qnum<<endl;
cout<<"msg_cbytes:"<<msgInfo.msg_cbytes<<endl;
return 0;
}
  
  在Linux2.6.18下的执行结果为:
  

msg_qbytes:65536
msg_qnum:0
msg_cbytes:0
msg_qbytes:6553600
msg_qnum:0
msg_cbytes:0
  

4SystemV消息队列的内核限制

  对SystemVIPC,系统往往会存在一些限制,对于消息队列,在Linux2.6.18中,系统内核存在以下限制:
  

[iyunv@idcserver program]# sysctl -a |grep msg
kernel.msgmnb = 65536 //一个消息队列上允许的最大字节数
kernel.msgmni = 16 //系统范围内允许存在的最大消息队列数
kernel.msgmax = 65536     //每个消息的最大字节数
  
  对于SystemV消息队列一般内核还有一个限制:系统范围内的最大消息数,在Linux下这个限制由msgmnb*msgmni决定。
  上面已经说过可以通过IPC_SET来设置使用中的消息队列的最大字节数。但是要在系统范围内对内核限制进行修改,在Linux下面可以通过修改/etc/sysctl.conf内核参数配置文件,然后配合sysctl命令来对内核参数进行设置。例如下面示例:
  

[iyunv@idcserver program]#echo "kernel.msgmnb = 6553600" >>/etc/sysctl.conf
[iyunv@idcserver program]#echo "kernel.msgmni = 100" >>/etc/sysctl.conf
[iyunv@idcserver program]#echo "kernel.msgmax = 6553600" >>/etc/sysctl.conf
[iyunv@idcserver program]#sysctl -p
[iyunv@idcserver program]# sysctl -a |grep msg
kernel.msgmnb = 6553600
kernel.msgmni = 100
kernel.msgmax = 6553600
  



Aug 8, 2013 AM 08:54 @lab<T...T>

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-191460-1-1.html 上篇帖子: Darkstat:基于web的网络监控工具 下篇帖子: linux下删除的文件真的可以恢复,NB
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表