进程通讯——消息队列

进程通讯——消息队列

在Linux系统中,进程间通信(IPC, Inter-Process Communication)是一种允许不同进程或同一进程的不同线程之间交换数据的机制。消息队列是IPC的一种形式,它允许一个或多个进程向它写入或从中读取消息。这些消息是用户定义的数据块,它们存储在内核中,直到被接收进程取走。

1.相关函数

1.1 数据类型mqd_t

**该数据类型定义在 **mqueue.h 中,是用来记录消息队列描述符的

1
2
typedef int mqd_t; 
//实质上是 int 类型的别名。

1.2 结构体struct mq_attr

在POSIX消息队列中,mq_attr 结构体用于指定消息队列的属性。这个结构体在 <mqueue.h> 头文件中定义,并且当你创建或打开消息队列时,可以通过这个结构体来指定或获取消息队列的属性

1
2
3
4
5
6
7
8
9
10
11
12
13
14
/**
* @brief 消息队列的属性信息
* mq_flags 标记,对于 mq_open,忽略它,因为这个标记是通过前者的调用传递的
* mq_maxmgs 队列可以容纳的消息的最大数量
* mq_msgsize 单条消息的最大允许大小,以字节为单位
* mq_curmsgs 当前队列中的消息数量,对于 mq_open,忽略它
*/
struct mq_attr {
long mq_flags; /* Flags (ignored for mq_open()) */
long mq_maxmsg; /* Max. # of messages on queue */
long mq_msgsize; /* Max. message size (bytes) */
long mq_curmsgs; /* # of messages currently in queue
(ignored for mq_open()) */
};

1.3 结构体struct timespec

struct timespec 是 C 语言中用于表示时间的一个结构体,它定义在 <time.h> 头文件中。这个结构体通常用于需要高精度时间表示的场景,比如定时器、等待特定时间间隔等。struct timespec 提供了秒(tv_sec)和纳秒(tv_nsec)两个字段,以允许非常精确的时间表示。

1
2
3
4
struct timespec {  
   time_t tv_sec;  // 秒  
   long   tv_nsec; // 纳秒  
};

struct timespec 常用于需要精确时间控制的函数,如 nanosleep()(用于使线程暂停执行指定的时间)、clock_nanosleep()(类似于 nanosleep(),但允许指定时钟源)、mq_timedsend()mq_timedreceive()(用于 POSIX 消息队列的带超时的发送和接收操作)等。

1.4系统调用 mq_open()

mq_open() 函数是 POSIX 消息队列接口的一部分,用于打开(如果已存在)或创建(如果不存在)一个消息队列。这个函数在 <mqueue.h> 头文件中声明,并返回一个消息队列描述符(mqd_t 类型),该描述符用于后续的 mq_send(), mq_receive(), mq_close(), 和 mq_unlink() 等函数调用。

函数原型:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
#include <mqueue.h>  
#include <fcntl.h>  
#include <sys/stat.h>  
/**
* @brief 创建或打开一个已存在的 POSIX 消息队列,消息队列是通过名称唯一标识的。
*
* @param name 消息队列的名称
* 命名规则:必须是以正斜杠/开头,以\0 结尾的字符串,中间可以包含若干字符,但
不能有正斜杠
* @param oflag 指定消息队列的控制权限,必须也只能包含以下三者之一
* O_RDONLY 打开的消息队列只用于接收消息
* O_WRONLY 打开的消息队列只用于发送消息
* O_RDWR 打开的消息队列可以用于收发消息
* 可以与以下选项中的 0 至多个或操作之后作为 oflag
* O_CLOEXEC 设置 close-on-exec 标记,这个标记表示执行 exec 时关闭文件描述符
* O_CREAT 当文件描述符不存在时创建它,如果指定了这一标记,需要额外提供 mode
和 attr 参数
* O_EXCL 创建一个当前进程独占的消息队列,要同时指定 O_CREAT,要求创建的消息
队列不存在,否则将会失败,并提示错误 EEXIST
* O_NONBLOCK 以非阻塞模式打开消息队列,如果设置了这个选项,在默认情况下收发
消息发生阻塞时,会转而失败,并提示错误 EAGAIN
* @param mode 每个消息队列在 mqueue 文件系统对应一个文件,mode 是用来指定消息
队列对应文件的权限的
* @param attr 属性信息,如果为 NULL,则队列以默认属性创建
* @return mqd_t 成功则返回消息队列描述符,失败则返回(mqd_t)-1,同时设置
errno 以指明错误原因
*/
mqd_t mq_open(const char *name, int oflag, mode_t mode, struct mq_attr *attr);

/**
* @brief 当 oflag 没有包含 O_CREAT 时方可调用
*
* @param name 同上
* @param oflag 同上
* @return mqd_t 同上
*/
mqd_t mq_open(const char *name, int oflag);

1.5系统调用mq_timedsend()

mq_timedsend() 函数是 POSIX 消息队列接口中的一个函数,用于将消息发送到指定的消息队列,并允许调用者指定一个超时时间。如果队列满并且无法立即发送消息,则调用将等待指定的时间量,直到消息被发送或超时发生。

mq_timedsend() 函数的原型如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
#include <time.h>
#include <mqueue.h>
/**
* @brief 将 msg_ptr 指向的消息追加到消息队列描述符 mqdes 指向的消息队列的尾部。
如果消息队列已满,默认情况下,调用阻塞直至有充足的空间允许新的消息入队,或者达
到 abs_timeout 指定的等待时间节点,或者调用被信号处理函数打断。需要注意的是,
正如上文提到的,如果在 mq_open 时指定了 O_NONBLOCK 标记,则转而失败,并返回错误
EAGAIN。
*
* @param mqdes 消息队列描述符
* @param msg_ptr 指向消息的指针
* @param msg_len msg_ptr 指向的消息长度,不能超过队列的 mq_msgsize 属性指定
的队列最大容量,长度为 0 的消息是被允许的
* @param msg_prio 一个非负整数,指定了消息的优先级,消息队列中的数据是按照优
先级降序排列的,如果新旧消息的优先级相同,则新的消息排在后面。
* @param abs_timeout 指向 struct timespec 类型的对象,指定了阻塞等待的最晚
时间。如果消息队列已满,且 abs_timeout 指定的时间节点已过期,则调用立即返回。
* @return int 成功返回 0,失败返回-1,同时设置 errno 以指明错误原因。
*/
int mq_timedsend(mqd_t mqdes, const char *msg_ptr, size_t msg_len,
unsigned int msg_prio, const struct timespec *abs_timeout);

1.6系统调用mq_timedreceive()

mq_timedreceive() 函数是 POSIX 消息队列接口的一部分,用于从消息队列中接收消息。与 mq_receive() 函数不同,mq_timedreceive() 允许调用者指定一个超时时间,如果在指定的时间内没有消息到达,函数将返回错误。

mq_timedreceive() 函数的原型如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
#include <time.h>
#include <mqueue.h>
/**
* @brief 从消息队列中取走最早入队且权限最高的消息,将其放入 msg_ptr 指向的缓
存中。如果消息队列为空,默认情况下调用阻塞,此时的行为与 mq_timedsend 同理。
*
* @param mqdes 消息队列描述符
* @param msg_ptr 接收消息的缓存
* @param msg_len msg_ptr 指向的缓存区的大小,必须大于等于 mq_msgsize 属性指
定的队列单条消息最大字节数
* @param msg_prio 如果不为 NULL,则用于接收接收到的消息的优先级
* @param abs_timeout 阻塞时等待的最晚时间节点,同 mq_timedsend
* @return ssize_t 成功则返回接收到的消息的字节数,失败返回-1,并设置 errno
指明错误原因
*/
ssize_t mq_timedreceive(mqd_t mqdes, char *msg_ptr, size_t msg_len, unsigned int *msg_prio, const struct timespec *abs_timeout);

mq_unlink() 函数是 POSIX 消息队列接口的一部分,用于删除一个已存在的消息队列。这个函数会移除消息队列的名称,并释放与消息队列相关联的所有资源。但是,请注意,mq_unlink() 并不关闭任何已经打开的消息队列描述符;已经打开的消息队列描述符仍然有效,直到它们被 mq_close() 关闭。

mq_unlink() 函数的原型如下:

1
2
3
4
5
6
7
8
9
#include <mqueue.h>
/**
* @brief 清除 name 对应的消息队列,mqueue 文件系统中的对应文件被立即清除。消
息队列本身的清除必须等待所有指向该消息队列的描述符全部关闭之后才会发生。
*
* @param name 消息队列名称
* @return int 成功返回 0,失败返回-1,并设置 errno 指明错误原因
*/
int mq_unlink(const char *name);

1.8系统调用clock_gettime()

clock_gettime() 函数是 POSIX 标准中定义的一个用于获取特定时钟当前时间的函数。这个函数允许你查询系统上的不同时钟,比如实时时钟(REALTIME)、单调递增时钟(MONOTONIC)等。clock_gettime() 提供了比传统 gettimeofday() 函数更高的精度和更多的时钟选项。

**函数原型定义在 **<time.h> 头文件中(在某些系统上可能是 <sys/time.h>,但 <time.h> 是更标准的做法):

1
2
3
4
5
6
7
8
9
10
#include <time.h>
/**
* @brief 获取以 struct timespec 形式表示的 clockid 指定的时钟
*
* @param clockid 特定时钟的标识符,常用的是 CLOCK_REALTIME,表示当前真实时
间的时钟
* @param tp 用于接收时间信息的缓存
* @return int 成功返回 0,失败返回-1,同时设置 errno 以指明错误原因
*/
int clock_gettime(clockid_t clockid, struct timespec *tp);

2.通讯流程

在Linux中,POSIX消息队列和System V消息队列是两种常见的消息队列实现。

  1. 指定消息队列的属性: 调用结构体mq_attr
  2. 创建时间: 创建结构体timespec,指定超时时间;
  3. 创建消息队列 :系统调用mq_open()
  4. 信息写入缓冲区:系统调用read()将消息写入缓冲区
  5. 发送消息队列:系统调用mq_timedsend(),将缓冲区写入消息队列,设置超时时间
  6. 接收消息队列:系统调用mq_timedreceive(),将mqd消息写入缓冲区
  7. 缓冲区读取信息:系统调用write(),将缓冲区写入标准输出stdout
  8. 关闭消息队列:系统调用mq_close(),关闭文件描述符mqd
  9. 删除消息队列:系统调用mq_unlink(),删除对应名称的消息队列,并回收资源

3.性质

3.1消息队列的特点

  1. 异步性:消息的发送和接收是异步的,发送者发送消息后可以继续执行其他任务,而无需等待接收者接收消息。
  2. 消息独立性:每个消息都是独立的,接收者必须按照发送者发送的顺序来接收消息,但消息本身在逻辑上是相互独立的。
  3. 灵活性:消息可以有优先级,这允许系统根据优先级来决定先处理哪些消息。
  4. 持久性:消息队列可以持久化存储消息,即使在发送者或接收者崩溃后,消息也不会丢失(取决于队列的属性和配置)。

3.2使用场景

消息队列适用于多种场景,包括但不限于:

  • 任务分发:在多进程或多线程环境下,主进程/线程可以将任务以消息的形式发送到消息队列,由其他进程/线程从队列中取出并执行。
  • 数据交换:不同进程间需要交换数据时,可以使用消息队列作为中介。
  • 日志记录:将日志信息发送到消息队列,由专门的日志处理进程/线程从队列中取出并写入日志文件或进行其他处理。

3.3 注意事项

  • 同步与互斥:虽然消息队列本身提供了基本的同步机制,但在多进程或多线程环境下,仍需要考虑额外的同步和互斥措施,以避免竞态条件。
  • 性能:在高性能要求的应用中,需要仔细考虑消息队列的配置和参数,以优化性能和资源利用率。
  • 安全性:确保消息队列的访问权限设置得当,避免未授权访问或数据泄露。
  • 我们可以通过设置 POSIX 消息队列的模式为 O_RDWR,使它可以用于收发数据,从技术上讲,单条消息队列可以用于双向通信,但是这会导致消息混乱,无法确定队列中的数 据是本进程写入的还是读取的,因此,不会这么做,通常单条消息队列只用于单向通信。 为了实现全双工通信,我们可以使用两条消息队列,分别负责两个方向的通信。类似于管道。

4.实际案例

4.1消息队列生产者

4.1.1定义

消息队列生产者是指产生消息并将消息发布到消息队列或主题中的应用程序。它负责构建或生成消息,这些消息可以是简单的文本、复杂的结构化数据或二进制数据,具体内容、格式和结构取决于应用程序的需求和消息中间件的设计。

4.1.2主要职责

  1. 创建消息:消息生产者需要构建或生成消息,并根据需求设置消息的各种属性,如优先级、生存时间(TTL)、延迟发送等。
  2. 发送消息:一旦消息被创建,消息生产者会负责将其发送到指定的消息队列或主题。这通常涉及与消息中间件的通信,可能涉及网络传输。
  3. 选择目标:消息生产者需要指定消息应该发送到的目标,这可能是一个特定的消息队列、主题或分区(在支持分区的消息系统中)。

4.1.3代码实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
#include <time.h>
#include <mqueue.h>
#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
#include <string.h>
int main(int agrc,char *argv[]){
   //设置消息属性
   struct mq_attr atter;
   atter.mq_maxmsg=10;
   atter.mq_msgsize=100;
   atter.mq_flags=0;
   atter.mq_curmsgs=0;

   //设置消息路径
   char *mq_name="/producer_mq";

   //创建消息队列
   mqd_t mqd=mq_open(mq_name,O_WRONLY | O_CREAT,0664,&atter);
   if(mqd==-1){
       perror("mq_open error");
       exit(EXIT_FAILURE);
  }

   char buffer[100];

   //创建时间结构体
   struct timespec time_info;

   while (1){
       memset(buffer,0,100);//清空缓冲区
       //读取输入
       ssize_t readCout=read(STDIN_FILENO,buffer,sizeof(buffer));
       //报错
       if (readCout==-1){
           perror("read error");
           continue;
      } else if(readCout==0){
           //ctrl+d 停止发送消息
           printf("EOF,exit\n");
           char eof = EOF;
           ssize_t sendRes=mq_timedsend(mqd,&eof,1,0,&time_info);
           if (sendRes==-1){
               perror("mq_timedsend error");
          }
           break;
      }
       int sendRes=mq_timedsend(mqd,buffer,strlen(buffer),0,&time_info);
       if (sendRes==-1){
           perror("mq_timedsend error");
      }
       printf("消息发送成功\n");
  }
   //关闭消息队列
   mq_close(mqd);
   return 0;
}

4.2消息队列消费者

4.2.1定义

消息队列消费者(Message Consumer)是消息队列系统或消息中间件中的另一个关键角色,它负责从消息队列中取出(或订阅并接收)消息,并对这些消息进行进一步的处理。

4.2.2主要职责

  1. 监听消息队列:消费者需要持续监听指定的消息队列或主题,以便在消息到达时能够及时接收。
  2. 接收消息:一旦有消息到达,消费者将从队列中取出这些消息。这通常涉及与消息中间件的交互,包括网络通信和消息确认。
  3. 处理消息:消费者需要能够解析和处理接收到的消息。处理的具体内容取决于应用程序的业务逻辑,可能包括数据转换、业务逻辑处理、数据存储等。
  4. 消息确认:在处理完消息后,消费者通常需要向消息中间件发送一个确认信号,表示消息已被成功处理。这个确认过程对于确保消息的可靠性至关重要。

4.2.3代码实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
#include <time.h>
#include <mqueue.h>
#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
#include <string.h>
using namespace std;
int main(int agrc,char *argv[]){
   //设置消息属性
   struct mq_attr atter;
   atter.mq_maxmsg=10;
   atter.mq_msgsize=100;
   atter.mq_flags=0;
   atter.mq_curmsgs=0;

   //设置消息路径
   char *mq_name="/producer_mq";

   //创建消息队列
   mqd_t mqd=mq_open(mq_name,O_RDONLY | O_CREAT,0664,&atter);
   if(mqd==-1){
       perror("mq_open error");
       exit(EXIT_FAILURE);
  }

   char buffer[100];

   //创建时间结构体
   struct timespec time_info;

   while (1){
       memset(buffer,0,100);//清空缓冲区
       clock_gettime(CLOCK_REALTIME,&time_info);
       time_info.tv_sec+=5;
       //读取消息队列
       ssize_t receiveRes= mq_timedreceive(mqd,buffer,sizeof(buffer),0,&time_info);
       //报错
       if (receiveRes==-1){
           perror("mq_timedreceive error");
           continue;
      }
       if(buffer[0]==EOF){
           printf("接收到EOF,退出\n");
           break;
      }else{
           printf("接收到生产者消息:%s\n",buffer);
      }
       printf("消息接收成功\n");
  }
   //关闭消息队列
   close(mqd);

   //删除消息队列
   mq_unlink(mq_name);
   return 0;
}

5.消息队列在 mqueue 文件系统的表示

我们启动 producer 和 consumer,然后查看/dev/mqueue 目录,终止消费和生产者后,上述文件被清除。