Skip to content

消息队列-传递不定长的消息

李述铜

874字约3分钟

2025-08-30

邮箱仅支持传递固定大小的消息内容(在32位系统上是4字节),当需要传递较大的消息时,只能通过指针的方式去传递,在有些情况下,会加重程序中管理内存的负担。

因此,有时使用消息队列会更加的方便。


消息队列机制的用途

消息队列也可用于任务之间、任务与中断之间的通信;相比邮箱,其特点如下:

特性邮箱(Mailbox)消息队列(Message Queue)
传递内容4字节(值或指针)不定长的数据块
缓冲方式循环队列(值/指针)循环队列(结构体数据)
内存管理风险有(需手动管理指针)安全(消息会拷贝进队列中)

工作原理

RT-Thread 的消息队列由struct rt_messagequeue实现,内部维护了一个消息循环缓冲区,自动管理内存拷贝和顺序读取。其整体工作结构如下图所示:

alt text

对于上述结构图,相关说明如下:

  • 在消息队列内部,有相应的消息缓冲区可以存储消息,任务写入的消息会被拷贝到该缓冲区。因此,任务在完成消息发送之后,不需要继续持有待发消息的内存块。任务在读取消息时,消息会从缓冲中拷贝到任务自己的接收缓存。
  • 消息队例会自动管理这些消息缓冲区。在创建消息队列时,可以指定该缓冲区最大能缓存多少个消息,每个消息的最大允许的数量字节量。
  • 消息的写入和读取,可支持先进先出(FIFO),也支持后进先出(LIFO,用于发送紧急消息)
  • 当消息队列已满时,发送方可以阻塞等待直至消息写入;当消息队列为空时,接收方可以阻塞等待,直至收到消息。
特性描述
定长数据块每个消息大小固定,初始化时指定
先进先出接收者按照发送顺序获取消息
同步机制满时发送者阻塞,空时接收者阻塞

应用示例

下面对邮箱课时中的案例进行修改,使用消息队列完成同样的功能。

示例一:传递消息编号

#include "base.h"
#include <rtthread.h>

static struct rt_messagequeue mq;

static rt_ubase_t mq_buffer[8];

void sender_entry (void * param) {
    rt_ubase_t msg = 0;
    
    while (1) {
        rt_mq_send_wait(&mq, &msg, sizeof(msg), RT_WAITING_FOREVER);
        msg++;
    }
}

void recv_entry (void * param) {
    while (1) {
        // recv
        rt_ubase_t msg;
        
        rt_mq_recv(&mq, &msg, sizeof(msg), RT_WAITING_FOREVER);
        rt_kprintf("recv: %d\n", msg);
        rt_thread_mdelay(1000);
    }
}

int main(void) {
    hardware_init();
    
    rt_mq_init(&mq, "mq", mq_buffer, sizeof(rt_ubase_t), 8, RT_IPC_FLAG_FIFO);
    
    rt_thread_t t1 = rt_thread_create("t1", sender_entry, RT_NULL, 4096, 10, 10);
    rt_thread_startup(t1);
    rt_thread_t t2 = rt_thread_create("t2", recv_entry, RT_NULL, 4096, 10, 10);
    rt_thread_startup(t2);
    
    return 0;
}

示例二:传递结构化温湿度数据

#include "base.h"
#include <rtthread.h>

static struct rt_messagequeue mq;

struct sensor_data {
    int temp;
    int hum;
};

static struct sensor_data mq_buffer[8];

static void sensor_read (struct sensor_data * data) {
    data->temp = 20 + rt_tick_get();
    data->hum = 50 + rt_tick_get();
}

void sender_entry (void * param) {    
    while (1) {
        //struct sensor_data * data = (struct sensor_data *)malloc(sizeof(struct sensor_data));
        struct sensor_data data;
        sensor_read(&data);
        //rt_mb_send_wait(&mbox, (rt_ubase_t)data, RT_WAITING_FOREVER);
        rt_mq_send_wait(&mq, &data, sizeof(struct sensor_data), RT_WAITING_FOREVER);
        rt_thread_mdelay(1000);
    }
}

void recv_entry (void * param) {
    while (1) {
        // recv
        struct sensor_data data;
    
        rt_mq_recv(&mq, &data, sizeof(struct sensor_data), RT_WAITING_FOREVER);
        //rt_mb_recv(&mbox, (rt_ubase_t* )&data, RT_WAITING_FOREVER);
        
        rt_kprintf("temp: %d, hum: %d\n", data.temp, data.hum);
        //free(data);
    }
}

int main(void) {
    hardware_init();
    
    rt_mq_init(&mq, "mq", mq_buffer, sizeof(struct sensor_data), sizeof(mq_buffer), RT_IPC_FLAG_FIFO);
    
    rt_thread_t t1 = rt_thread_create("t1", sender_entry, RT_NULL, 4096, 10, 10);
    rt_thread_startup(t1);
    rt_thread_t t2 = rt_thread_create("t2", recv_entry, RT_NULL, 4096, 10, 10);
    rt_thread_startup(t2);
    
    return 0;
}