进程通讯——消息队列
进程通讯——消息队列
Beyond进程通讯——消息队列
在Linux系统中,进程间通信(IPC, Inter-Process Communication)是一种允许不同进程或同一进程的不同线程之间交换数据的机制。消息队列是IPC的一种形式,它允许一个或多个进程向它写入或从中读取消息。这些消息是用户定义的数据块,它们存储在内核中,直到被接收进程取走。
1.相关函数
1.1 数据类型mqd_t
**该数据类型定义在 **mqueue.h 中,是用来记录消息队列描述符的
1 | typedef int mqd_t; |
1.2 结构体struct mq_attr
在POSIX消息队列中,mq_attr
结构体用于指定消息队列的属性。这个结构体在 <mqueue.h>
头文件中定义,并且当你创建或打开消息队列时,可以通过这个结构体来指定或获取消息队列的属性。
1 | /** |
1.3 结构体struct timespec
struct timespec
是 C 语言中用于表示时间的一个结构体,它定义在 <time.h>
头文件中。这个结构体通常用于需要高精度时间表示的场景,比如定时器、等待特定时间间隔等。struct timespec
提供了秒(tv_sec
)和纳秒(tv_nsec
)两个字段,以允许非常精确的时间表示。
1 | struct timespec { |
struct timespec
常用于需要精确时间控制的函数,如 nanosleep()
(用于使线程暂停执行指定的时间)、clock_nanosleep()
(类似于 nanosleep()
,但允许指定时钟源)、mq_timedsend()
和 mq_timedreceive()
(用于 POSIX 消息队列的带超时的发送和接收操作)等。
1.4系统调用 mq_open()
mq_open()
函数是 POSIX 消息队列接口的一部分,用于打开(如果已存在)或创建(如果不存在)一个消息队列。这个函数在 <mqueue.h>
头文件中声明,并返回一个消息队列描述符(mqd_t
类型),该描述符用于后续的 mq_send()
, mq_receive()
, mq_close()
, 和 mq_unlink()
等函数调用。
函数原型:
1 |
|
1.5系统调用mq_timedsend()
mq_timedsend()
函数是 POSIX 消息队列接口中的一个函数,用于将消息发送到指定的消息队列,并允许调用者指定一个超时时间。如果队列满并且无法立即发送消息,则调用将等待指定的时间量,直到消息被发送或超时发生。
mq_timedsend()
函数的原型如下:
1 |
|
1.6系统调用mq_timedreceive()
mq_timedreceive()
函数是 POSIX 消息队列接口的一部分,用于从消息队列中接收消息。与 mq_receive()
函数不同,mq_timedreceive()
允许调用者指定一个超时时间,如果在指定的时间内没有消息到达,函数将返回错误。
mq_timedreceive()
函数的原型如下:
1 |
|
1.7系统调用mq_unlink()
mq_unlink()
函数是 POSIX 消息队列接口的一部分,用于删除一个已存在的消息队列。这个函数会移除消息队列的名称,并释放与消息队列相关联的所有资源。但是,请注意,mq_unlink()
并不关闭任何已经打开的消息队列描述符;已经打开的消息队列描述符仍然有效,直到它们被 mq_close()
关闭。
mq_unlink()
函数的原型如下:
1 |
|
1.8系统调用clock_gettime()
clock_gettime()
函数是 POSIX 标准中定义的一个用于获取特定时钟当前时间的函数。这个函数允许你查询系统上的不同时钟,比如实时时钟(REALTIME)、单调递增时钟(MONOTONIC)等。clock_gettime()
提供了比传统 gettimeofday()
函数更高的精度和更多的时钟选项。
**函数原型定义在 **<time.h>
头文件中(在某些系统上可能是 <sys/time.h>
,但 <time.h>
是更标准的做法):
1 |
|
2.通讯流程
在Linux中,POSIX消息队列和System V消息队列是两种常见的消息队列实现。
- 指定消息队列的属性: 调用结构体mq_attr
- 创建时间: 创建结构体timespec,指定超时时间;
- 创建消息队列 :系统调用mq_open()
- 信息写入缓冲区:系统调用read()将消息写入缓冲区
- 发送消息队列:系统调用mq_timedsend(),将缓冲区写入消息队列,设置超时时间
- 接收消息队列:系统调用mq_timedreceive(),将mqd消息写入缓冲区
- 缓冲区读取信息:系统调用write(),将缓冲区写入标准输出stdout
- 关闭消息队列:系统调用mq_close(),关闭文件描述符mqd
- 删除消息队列:系统调用mq_unlink(),删除对应名称的消息队列,并回收资源
3.性质
3.1消息队列的特点
- 异步性:消息的发送和接收是异步的,发送者发送消息后可以继续执行其他任务,而无需等待接收者接收消息。
- 消息独立性:每个消息都是独立的,接收者必须按照发送者发送的顺序来接收消息,但消息本身在逻辑上是相互独立的。
- 灵活性:消息可以有优先级,这允许系统根据优先级来决定先处理哪些消息。
- 持久性:消息队列可以持久化存储消息,即使在发送者或接收者崩溃后,消息也不会丢失(取决于队列的属性和配置)。
3.2使用场景
消息队列适用于多种场景,包括但不限于:
- 任务分发:在多进程或多线程环境下,主进程/线程可以将任务以消息的形式发送到消息队列,由其他进程/线程从队列中取出并执行。
- 数据交换:不同进程间需要交换数据时,可以使用消息队列作为中介。
- 日志记录:将日志信息发送到消息队列,由专门的日志处理进程/线程从队列中取出并写入日志文件或进行其他处理。
3.3 注意事项
- 同步与互斥:虽然消息队列本身提供了基本的同步机制,但在多进程或多线程环境下,仍需要考虑额外的同步和互斥措施,以避免竞态条件。
- 性能:在高性能要求的应用中,需要仔细考虑消息队列的配置和参数,以优化性能和资源利用率。
- 安全性:确保消息队列的访问权限设置得当,避免未授权访问或数据泄露。
- 我们可以通过设置 POSIX 消息队列的模式为 O_RDWR,使它可以用于收发数据,从技术上讲,单条消息队列可以用于双向通信,但是这会导致消息混乱,无法确定队列中的数 据是本进程写入的还是读取的,因此,不会这么做,通常单条消息队列只用于单向通信。 为了实现全双工通信,我们可以使用两条消息队列,分别负责两个方向的通信。类似于管道。
4.实际案例
4.1消息队列生产者
4.1.1定义
消息队列生产者是指产生消息并将消息发布到消息队列或主题中的应用程序。它负责构建或生成消息,这些消息可以是简单的文本、复杂的结构化数据或二进制数据,具体内容、格式和结构取决于应用程序的需求和消息中间件的设计。
4.1.2主要职责
- 创建消息:消息生产者需要构建或生成消息,并根据需求设置消息的各种属性,如优先级、生存时间(TTL)、延迟发送等。
- 发送消息:一旦消息被创建,消息生产者会负责将其发送到指定的消息队列或主题。这通常涉及与消息中间件的通信,可能涉及网络传输。
- 选择目标:消息生产者需要指定消息应该发送到的目标,这可能是一个特定的消息队列、主题或分区(在支持分区的消息系统中)。
4.1.3代码实现
1 |
|
4.2消息队列消费者
4.2.1定义
消息队列消费者(Message Consumer)是消息队列系统或消息中间件中的另一个关键角色,它负责从消息队列中取出(或订阅并接收)消息,并对这些消息进行进一步的处理。
4.2.2主要职责
- 监听消息队列:消费者需要持续监听指定的消息队列或主题,以便在消息到达时能够及时接收。
- 接收消息:一旦有消息到达,消费者将从队列中取出这些消息。这通常涉及与消息中间件的交互,包括网络通信和消息确认。
- 处理消息:消费者需要能够解析和处理接收到的消息。处理的具体内容取决于应用程序的业务逻辑,可能包括数据转换、业务逻辑处理、数据存储等。
- 消息确认:在处理完消息后,消费者通常需要向消息中间件发送一个确认信号,表示消息已被成功处理。这个确认过程对于确保消息的可靠性至关重要。
4.2.3代码实现
1 |
|
5.消息队列在 mqueue 文件系统的表示
我们启动 producer 和 consumer,然后查看/dev/mqueue 目录,终止消费和生产者后,上述文件被清除。