C++编程框架实战——MQTT通信框架

一、MQTT通信协议简介

MQTT(Message Queuing Telemetry Transport)消息队列遥测传输,是一种轻量级、基于发布/订阅架构的通信协议,它专为低带宽、高延迟或传输不稳定的网络通信场景而设计,被广泛应用于物联网、移动设备和嵌入式系统,从而实现设备间的异步消息传输。

1.MQTT协议的核心组件
发布者(Publisher):通过主题(Topic)发布消息的设备。
订阅者(Subscriber):订阅特定主题并接收消息的设备。
代理服务(Broker):代理服务器,负责消息的路由、存储和传递。

2.MQTT协议的Qos机制
Qos0:消息最多传递一次
工作流程:
Publisher发送消息后立即丢弃记录,Subscriber接收消息时不发送确认。
特点:
Qos0策略单向发送消息,无重传和消息确认,延迟最低,带宽占用最小,但消息可能丢失。

Qos1:消息至少传递一次
工作流程:
Publisher发送消息并存储副本。
Subscriber接收到消息后,发送PUBACK确认。
如果Publisher未收到PUBACK,它会重传消息。
特点:
Qos1策略的流程可能重复多次,直至成功或超时,容易产生重复的消息。

Qos2:消息正好一次传递
状态码:
PUBREC(Received Acknowledgement):接收方已存储消息。
PUBREL(Release Message):发送方请求释放存储。
PUBCOMP(Complete Acknowledgement):完成确认。
工作流程:
Publisher发送PUBLISH消息:Publisher发送消息并存储状态。
Subscriber回复PUBREC确认:Subscriber接收消息后,发送PUBREC,表示消息已收到但未处理完毕。
Publisher发送PUBREL释放:Publisher收到PUBREC后,发送PUBREL,指示可以释放消息状态。
Subscriber回复PUBCOMP完成:Subscriber处理完消息后,发送PUBCOMP,至此双方都确认消息已可靠投递。
特点:
Qos2策略网络开销大,消息延迟大,内部实现复杂, 适用于对数据一致性要求极高的场景。

*Qos实现细节:
QoS 1/2中每个消息携带唯一Message ID,用于跟踪消息状态。
执行QoS 1/2时,Broker需缓存未确认消息直至超时或完成交互。

3.MQTT协议的通信流程
(1).建立连接(Connection Establishment)
发布者Publisher或订阅者Subscriber向MQTT服务Broker发送CONNECT报文,以此来发起连接。该报文包含客户端ID、用户名、密码、心跳间隔等参数。
Broker收到CONNECT报文后,开始验证客户端身份和权限,如果验证通过,则返回对应的ACK报文,用来表示连接确认,比如返回0表示连接成功,返回非0表示连接失败。

(2).订阅主题(Subscription)
订阅者Subscriber向Broker发送SUBSCRIBE订阅报文,订阅报文中包含一个或多个要订阅的主题(Topic)以及对应的QoS级别。Broker处理订阅请求,如果订阅成功,则返回SUBACK报文,用来表示订阅确认。

(3).消息发布(Publishing)
发布者Publisher向Broker发送PUBLISH报文,报文中包含主题(Topic)、主题对应的消息内容、QoS级别、消息ID等。
Broker接收到PUBLISH报文后,根据主题匹配所有已经订阅了该主题的订阅者Subscriber,并将主题对应的消息转发给订阅了该主题的订阅者Subscriber。

(4).消息分发(Message Delivery)
消息分发过程依赖Qos策略等级,比如:
QoS=0:Broker直接将消息发送给订阅者,不等待确认,也不重发。
Qos=1:订阅者收到消息后会发送PUBACK确认。Broker将消息发送给订阅者并等待PUBACK确认,如果在超时时间内未收到PUBACK,则重发直到收到确认为止。

(5).断开连接(Disconnection)
正常断开时,发布者Publisher或订阅者Subscriber发送DISCONNECT报文,Broker清理客户端并释放资源。

4.MQTT协议的优缺点
MQTT协议的优点:
低功耗:适用于电池供电设备。
高扩展性:支持大规模设备连接。
实时性高:基于事件驱动,减少延迟。

MQTT协议的缺点:
依赖Broker:可能导致单点故障。
安全性待增强:默认无加密,需结合TLS/SSL增强。

5.MQTT协议的应用场景
环境监测系统:
温湿度传感器周期性发布数据到Broker,监控应用实时接收环境参数信息。

工业设备远程控制:
比如机械臂、阀门控制器,应用程序向工业设备发送控制指令,工业设备执行控制指令并回复PUBACK确认。

智能家居联动系统:
空调订阅门窗状态,自动关闭。

车联网数据传输:
GPS定位信息、车速、电池状态、故障码等海量数据传输。

二、Eclipse Mosquitto项目简介

Eclipse Mosquitto是一个开源的轻量级MQTT项目,采用EPL/EDL许可证, 允许免费商用和二次开发,是Eclipse基金会项目生态的一部分。
Eclipse Mosquitto实现了MQTT协议标准,专为物联网(IoT)场景设计,适用于从低功耗嵌入式设备到云服务器的分布式通信场景。该项目采用纯C语言编写,资源占用低,内存占用不到10Mb,适用于树莓派等嵌入式设备,硬件配置支持的情况下,可以达到单机支持数万并发连接。
Eclipse Mosquitto提供了TLS/SSL加密通道、支持WebSocket连接以及双向认证(客户端/服务端证书)等。
Eclipse Mosquitto完整支持的MQTT协议版本有:v5.0版本(支持流量控制、会话有效期等)、v3.1.1版本(主流版本)、3.1版本等。

Eclipse Mosquitto项目的核心组件:

  1. Mosquitto Broker服务进程
    MQTT 消息代理服务器,处理订阅/发布消息。

2.MQTT命令行工具
发布消息的shell命令:mosquitto_pub
订阅消息的shell命令:mosquitto_sub

命令行样例:
mosquitto_sub -h 127.0.0.1 -t “sensors/temperature”

3.C/C++实现的客户端so动态库
开发Publisher/Subscriber时可以集成库文件”libmosquitto.so”。

Eclipse Mosquitto项目TLS配置样例:
mosquitto.conf中关于TLS的配置:

listener 8883
cert_file /path/server.crt #TLS加密
key_file /path/server.key
password_file /etc/passwd #账号密码认证
Eclipse Mosquitto官网:
https://mosquitto.org/
Eclipse Mosquitto源码下载地址:
https://mosquitto.org/download/
Eclipse Mosquitto官方文档:
https://mosquitto.org/documentation/
Eclipse Mosquitto源码仓库:
https://github.com/eclipse-mosquitto/mosquitto

三、 Eclipse Mosquitto开发环境准备

1.Ubuntu环境安装mosquitto:

sudo apt install mosquitto
sudo apt install mosquitto-clients
安装成功以后,会在本地/usr/lib/目录下找到libmosquitto.so库文件。

2.mosquitto常用命令如下:

(1).手动启动mosquitto Broker服务,并打印mosquitto版本
mosquitto -v

(2).启动/关闭moquitto Broker服务
service mosquitto start
service mosquitto stop
(3).查看moquitto服务状态
systemctl status mosquitto

(4).查看moquitto命令使用帮助
mosquitto –help

(5).发布Topic对应的MQTT命令
mosquitto_pub -h -t -m
-h:指定MQTT Broker对应的域名
-t:指定Topic主题
-m:指定主题对应的消息

(6).订阅Topic对应的MQTT命令
mosquitto_sub -h -t
-h:指定MQTT Broker对应的域名
-t:指定Topic主题
3.MQTT命令行实战
step.01: 订阅主题”weather”
mosquitto_sub -v -t weather
step.02: 向主题”weather”发布三条消息
mosquitto_pub -t weather -m snow_01
mosquitto_pub -t weather -m snow_02
mosquitto_pub -t weather -m snow_03
step.03: 订阅端打印如下:

四、Eclipse Mosquitto常用API接口

1.基础功能
mosquitto_connect:连接到MQTT Broker
mosquitto_disconnect:从MQTT Broker断开连接
mosquitto_publish:发布特定主题对应的消息
mosquitto_subscribe:订阅特定主题
mosquitto_subscribe_multiple:订阅多个主题
mosquitto_unsubscribe:取消订阅特定主题
mosquitto_unsubscribe_multiple:取消订阅多个主题

2.TLS配置
mosquitto_tls_set:证书的SSL/TLS配置
mosquitto_tls_opts_set:配置高级的SSL/TLS选项

3.回调函数设置
mosquitto_connect_callback_set:设置连接成功时触发的回调函数
mosquitto_disconnect_callback_set:设置断开连接时触发的回调函数
mosquitto_publish_callback_set:设置消息发布时触发的回调函数
mosquitto_message_callback_set:设置接收消息时触发的回调函数
mosquitto_subscribe_callback_set:设置订阅时触发的回调函数
mosquitto_unsubscribe_callback_set:设置取消时触发的回调函数

五、Eclipse Mosquitto C++代码实战

Demo1: 发布者Publisher C++代码

include

include

include

include

int main() {
// 初始化mosquitto库
mosquitto_lib_init();

// 创建一个mosquitto客户端实例
struct mosquitto *mosq = mosquitto_new("publisher", true, nullptr);
if (!mosq) {
    std::cerr << "Failed to create mosquitto instance" << std::endl;
    return-1;
}

// 连接到MQTT代理服务器(使用默认端口1883)
if (mosquitto_connect(mosq, "localhost", 1883, 60) != MOSQ_ERR_SUCCESS) {
    std::cerr << "Unable to connect to broker" << std::endl;
    mosquitto_destroy(mosq);
    return-1;
}

// 开启一个线程处理网络流量(异步)
mosquitto_loop_start(mosq);

// 开始发布消息
constchar* topic = "customer/topic_01";
constchar* message = "MQTT MSG TEST1";
mosquitto_publish(mosq, nullptr, topic, strlen(message), message, 0, false);
std::cout << "Message: " << message << ", published to topic: " << topic << std::endl;

sleep(1);
message = "MQTT MSG TEST2";
mosquitto_publish(mosq, nullptr, topic, strlen(message), message, 0, false);
std::cout << "Message: " << message << ", published to topic: " << topic << std::endl;

sleep(1);
message = "MQTT MSG TEST3";
mosquitto_publish(mosq, nullptr, topic, strlen(message), message, 0, false);
std::cout << "Message: " << message << ", published to topic: " << topic << std::endl;

// 断开连接,清理资源
mosquitto_loop_stop(mosq, true);
mosquitto_disconnect(mosq);
mosquitto_destroy(mosq);
mosquitto_lib_cleanup();

return0;

}
Demo2: 订阅者SubscriberC++代码

include

include

include

// 收到消息时的回调函数
void on_message(struct mosquitto *mosq, void *obj, const struct mosquitto_message msg) { std::cout << “Received message on topic: ” << msg->topic << “, message: ” << (char)msg->payload << std::endl;
}

int main() {
// 初始化mosquitto库
mosquitto_lib_init();

// 创建一个mosquitto客户端实例
struct mosquitto *mosq = mosquitto_new("subscriber", true, nullptr);
if (!mosq) {
    std::cerr << "Failed to create mosquitto instance" << std::endl;
    return-1;
}

// 设置消息回调函数
mosquitto_message_callback_set(mosq, on_message);

// 连接到MQTT代理服务器(这里使用本地的mosquitto,默认端口1883)
if (mosquitto_connect(mosq, "localhost", 1883, 60) != MOSQ_ERR_SUCCESS) {
    std::cerr << "Unable to connect to broker" << std::endl;
    mosquitto_destroy(mosq);
    return-1;
}

// 订阅主题
constchar* topic = "customer/topic_01";
if (mosquitto_subscribe(mosq, nullptr, topic, 0) != MOSQ_ERR_SUCCESS) {
    std::cerr << "Failed to subscribe to topic: " << topic << std::endl;
    mosquitto_destroy(mosq);
    return-1;
} else {
    std::cout << "Subscribed to topic: " << topic << std::endl;
}

// 循环处理网络流量
mosquitto_loop_forever(mosq, -1, 1);

// 断开连接,释放资源
mosquitto_disconnect(mosq);
mosquitto_destroy(mosq);
mosquitto_lib_cleanup();

return0;

}

运行结果:
Broker端打印:

发布者端打印:

订阅者端打印:

参考阅读:
https://mqtt.org/
https://mosquitto.org/api/files/mosquitto-h.html
https://www.eginnovations.com/glossary/mosquitto_mqtt
https://www.cavliwireless.com/blog/nerdiest-of-things/what-is-the-mqtt-protocol

声明:来自程序员与背包客,仅代表创作者观点。链接:https://eyangzhen.com/3700.html

程序员与背包客的头像程序员与背包客

相关推荐

关注我们
关注我们
购买服务
购买服务
返回顶部