【Linux】消息传递的艺术:探索Linux消息队列机制
本文所讲的共享内存为System V
版的消息队列
0.前言
共享内存没有进行同步于互斥以及异步
System V 是一种经典的 UNIX 进程间通信(IPC)机制,提供了一套 API 来支持进程之间的高效数据交换和同步。消息队列 和 信号量 是其中的两个关键部分,它们各自解决了不同的通信和同步问题,但都基于 System V 的 IPC 框架。
虽然 System V IPC 功能强大,但其接口较为复杂,现代系统中逐渐被 POSIX IPC 替代。
1.什么是消息队列
消息队列(Message Queue)是进程间通信(IPC)的一种方式,通过将消息存入内核维护的队列中,实现异步的进程数据传递。与管道不同,消息队列不仅允许不同大小的数据块传递,还支持消息的优先级排序,从而提供了更灵活的通信机制。
1.1 消息队列的特点
- 异步通信:发送方和接受方不需要同时进行,消息会存储在队列中,直到接收方读取。
- 持久性:消息队列由内核维护,即使发送方或者接收方意外退出,消息仍然保留在队列中。
- 消息分类:每个消息都有一个消息类型,可以根据类型有选择地读取特定地消息。
- 容量限制:消息队列地大小是有限制地,需要合理地管理和清空,避免队列满导致堵塞。
1.2 消息队列地核心概念
- 标识符:消息队列使用一个唯一地标识符(Queue ID)来区分。
- 消息类型:每条消息包含一个正整数的类型,用户可以根据类型选择性地读取消息。
- 结构:
- 消息队列:实际传输地数据内容。
- 消息长度:消息地字节数。
- 消息类型地分类标识。
遍历消息时,存数据块还是取数据块,取决于数据块中的类型type
1.3 消息队列的数据结构
我们使用man
手册来查看,输入man msgctl
struct msqid_ds {
struct ipc_perm msg_perm; /* Ownership and permissions */
time_t msg_stime; /* Time of last msgsnd(2) */
time_t msg_rtime; /* Time of last msgrcv(2) */
time_t msg_ctime; /* Time of creation or last
modification by msgctl() */
unsigned long msg_cbytes; /* # of bytes in queue */
msgqnum_t msg_qnum; /* # number of messages in queue */
msglen_t msg_qbytes; /* Maximum # of bytes in queue */
pid_t msg_lspid; /* PID of last msgsnd(2) */
pid_t msg_lrpid; /* PID of last msgrcv(2) */
};
我们要注意类型为struct ipc_prem
这个类型,该类型在共享内存的数据结构也出现过。
下面是它的具体内容:
struct ipc_perm {
key_t __key; /* Key supplied to msgget(2) */
uid_t uid; /* Effective UID of owner */
gid_t gid; /* Effective GID of owner */
uid_t cuid; /* Effective UID of creator */
gid_t cgid; /* Effective GID of creator */
unsigned short mode; /* Permissions */
unsigned short __seq; /* Sequence number */
};
1.4 消息队列的使用
如果你已经学习过共享内存,那么消息队列的使用也一定会得心应手的。
1.4.1 创建消息队列
使用msgget
函数来创建消息队列。
msgget
是用于创建或访问 System V 消息队列 的系统调用。它根据指定的键值创建或获取一个消息队列标识符,用于后续的消息操作。
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
int msgget(key_t key, int msgflg);
使用方法和shmget
高度相似
参数说明:
key
- 一个唯一标识消息队列的值。
- 常通过
ftok
函数生成,也可以直接使用整数值(不推荐)
msgflg
- 用于指定消息队列和权限和行为。
- 访问权限:权限掩码
- 行为控制:
IPC_CREAT
:如果队列不存在那么创建IPC_RXCL
:需要与IPC_CREAT
结合使用,单独使用没有意义。表示如果队列已经存在,那么返回错误。
返回值
- 成功:返回消息队列的标识符
msgid
。 - 失败:返回-1。
是不是和shmget
相似,简直一模一样啊!
如果你没有看过我的共享内存文章,推荐一看,会对你理解消息队列也是有帮助的哦~
【Linux】「共享内存揭秘」:高效进程通信的终极指南-CSDN博客
还是老样子,我们先创建一个共享区域:common.hpp
#include <iostream>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <assert.h>
#define PATHNAME "./"
#define PROJ_ID 0x3f
const mode_t mode = 0666;
key_t getKey()
{
/**
* :return 返回key值
* @return
*/
key_t ret = ftok(PATHNAME,PROJ_ID);
assert(ret!=-1);
return ret;
}
int creatMsg()
{
/**:function 创建消息队列
* :return 返回msgid
* @return
*/
int msgid = msgget(getKey(),IPC_CREAT|IPC_EXCL|mode);
assert(msgid!=-1);
return msgid;
}
然后创建一个消息队列
#include "common.hpp"
int main()
{
//创建消息队列
int msgid = creatMsg();
return 0;
}
运行后,我们在命令行输入指令ipcs -q
由于现在我们并没有使用消息队列,其used-bytes
和消息数messages
都是0
消息队列的生命周期也是跟随操作系统的,并不会因为进程的结束而结束
1.4.2 释放消息队列
同共享内存一样,你可以通过指令或者程序中的函数来将消息队列释放。
释放指令:ipcrm -q msqid
函数释放:msgctl
例行惯例,我们先在介绍msgctl
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
int msgctl(int msqid, int cmd, struct msqid_ds *buf);
参数说明
msqid
- 消息队列的标识符(由
msgget
返回)。
- 消息队列的标识符(由
cmd
- 指定要执行的操作:
IPC_STAT
:获取消息队列的当前状态,并将其存储到buf
中。IPC_SET
:设置消息队列的属性(使用buf
中的数据)。IPC_RMID
:删除消息队列。
- 指定要执行的操作:
buf
- 指向
struct msqid_ds
的指针,表示消息队列的状态和属性。 - 对于
IPC_STAT
和IPC_SET
,需要使用此参数;对IPC_RMID
则可以为NULL
。
返回值
成功返回0,失败返回-1
只是释放消息队列,函数的使用就是套路了。
- 指向
msgctl(msqid,IPC_RMID,nullptr);
1.4.2 使用消息队列
发送消息
为了将数据发送到消息队列,现在需要的函数为msgsnd
msgsnd
是用于向 System V 消息队列 发送消息的系统调用。它将一条消息放入消息队列中,以实现进程间通信。
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
int msgsnd(int msqid, const void *msgp, size_t msgsz, int msgflg);
参数说明:
msqid
- 消息队列的标识符,由
msgget
返回。
- 消息队列的标识符,由
msgp
- 指向消息数据的指针。这个指针指向的结构必须包含一个
long
类型的成员(用于存储消息类型),后面跟随实际的数据。
- 指向消息数据的指针。这个指针指向的结构必须包含一个
msgsz
- 消息正文部分的大小(以字节为单位,不包括消息类型字段)。
msgflg
- 标志选项,用于控制消息发送行为:
IPC_NOWAIT
:如果消息队列已满,则不阻塞,立即返回错误。- 默认情况下,如果队列满,调用会阻塞直到有空间可用。
返回值
成功:0。失败:-1
关于参数2
表示的是待发送的数据块,是一个结构体剋们需要自己定义
- 标志选项,用于控制消息发送行为:
struct msgbuf {
long mtype; /* message type, must be > 0 */
char mtext[1]; /* message data */
};
接受
为了接受消息,同样我们还需要一个函数msgrcv
msgrcv
是一个用于从 System V 消息队列 中接收消息的系统调用。通过它,进程可以从指定的消息队列中读取一条符合条件的消息。
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
ssize_t msgrcv(int msqid, void *msgp, size_t msgsz, long msgtyp, int msgflg);
参数说明:
-
msqid
- 消息队列的标识符,由
msgget
返回。用于标识从哪个队列中接收消息。
- 消息队列的标识符,由
-
msgp
- 指向一个用户定义的消息缓冲区。该缓冲区的第一个字段必须是
long
类型的消息类型mtype
,后续是消息正文。
示例消息缓冲区定义:
struct msgbuf { long mtype; // 消息类型 char mtext[MSGSZ]; // 消息正文 };
- 指向一个用户定义的消息缓冲区。该缓冲区的第一个字段必须是
-
msgsz
- 指定消息正文
mtext
的大小(以字节为单位)。如果接收到的消息大小超过该值,行为将根据msgflg
参数决定。
- 指定消息正文
-
msgtyp
- 指定要接收的消息类型:
>0
:接收队列中类型等于msgtyp
的第一条消息。0
:接收队列中的第一条消息,不论类型。<0
:接收队列中类型绝对值小于等于|msgtyp|
的第一条消息。
- 指定要接收的消息类型:
-
msgflg
- 控制消息接收行为的标志,可选值包括:
IPC_NOWAIT
:如果没有符合条件的消息,函数立即返回 -1,而不是阻塞。MSG_NOERROR
:如果消息正文大小超过msgsz
,多余部分将被截断,而不是导致错误。
common.hpp
- 控制消息接收行为的标志,可选值包括:
#include <iostream>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <cstring>
#include <unistd.h>
#include <assert.h>
#define PATHNAME "./"
#define PROJ_ID 0x3f
#define MSGSZ 128
const mode_t mode = 0666;
key_t getKey()
{
/**
* :return 返回key值
* @return
*/
key_t ret = ftok(PATHNAME,PROJ_ID);
assert(ret!=-1);
return ret;
}
int creatMsg()
{
/**:function 创建消息队列
* :return 返回msgid
* @return
*/
int msgid = msgget(getKey(),IPC_CREAT|IPC_EXCL|mode);
assert(msgid!=-1);
return msgid;
}
int getMsg()
{
/**:function 获取消息队列
* :return 返回msgid
* @return
*/
int msgid = msgget(getKey(),IPC_CREAT);
assert(msgid!=-1);
return msgid;
}
client.cc
用于向server.cc
发送消息
/**
* 用户端发送消息
*/
#include "common.hpp"
struct my_msgbuf{
long mtype;
char mtext[MSGSZ];//MSGSZ为新定义的标识常量:128
};
int main()
{
int msgid = creatMsg();
struct my_msgbuf msg;
msg.mtype = 1;//设置消息类型
strcpy(msg.mtext,"hello,Message Queue!");
//开始发送消息
int n = msgsnd(msgid,&msg,strlen(msg.mtext)+1,0);
assert(n!=-1);
std::cout<<"消息已发送"<<std::endl;
sleep(10);
msgctl(msgid,IPC_RMID,nullptr);//释放
return 0;
}
server.cc
用于接收client.cc
消息
/**
* 服务端接收消息
*/
#include "common.hpp"
struct my_msgbuf{
long mtype;
char mtext[MSGSZ];//MSGSZ为新定义的标识常量:128
};
int main()
{
int msgid = getMsg();
struct my_msgbuf msg;
//开始接受信息
ssize_t n = msgrcv(msgid,&msg,MSGSZ,1,0);
assert(n!=-1);
std::cout<<"已接收消息:"<<msg.mtext<<std::endl;
return 0;
}
执行效果:
2. 总结
消息队列的大部分接口都与共享内存类似,如果你使用过更现代的POSIX IPC
,可能会觉得System V
已经落后,事实上也确实是如此,System V
已经过于老旧,现在用的很少,对于此版本,不需要太过深入了解,在实际情况中遇到,查查文档就是了。
- 点赞
- 收藏
- 关注作者
评论(0)