1 MQTT基础知识
1.1 MQTT的概念
– MQTT即英文“Message Queuing Telemetry Transport”的缩写,中文翻译为“消息队列遥测传输”
– MQTT协议通过轻量级的消息发布和订阅的模型实现消息传递
– MQTT协议适用于物联网的消息传递(低功率传感器或移动设备,如电话、嵌入式计算机或微控制器)
1.2 MQTT架构的原理
– MQTT协议基于消息发布和订阅主题(pub和sub)的模型实现
– 多个客户端连接到代理并订阅他们感兴趣的主题
– 多个客户端可订阅相同的主题并根据自己的需求处理信息
– 客户端亦可连接到代理并将消息发布到主题
– 代理(broker)与MQTT充当连接所有内容的简单通用接口
关于以上的特性,意味着,
– 如果将订阅的消息转存储到数据库、Twitter、Cosm或简单文本文件的客户端
– 那么将新传感器或其他数据添加到数据库、Twiter等将变得十分简单
1.3 MQTT主题的层次结构
sensors/COMPUTER_NAME/temperature/HARDDRIVE_NAME
如上所示,
– 主题是通过“/”来分隔的体现父子之间的层次关系(类似文件夹的层次结构)
– 以上层次结构可体现不同Computer的不同的硬盘温度
1.4 MQTT订阅消息的匹配方式
1.4.1 显式主题匹配
由于显式主题的订阅是完全匹配的,所以该订阅仅接收到完全匹配的主题消息
1.4.2 模式主题匹配
– 即主题可使用通配符(“+”或者“#”)来模糊匹配主题
– “+”可用于单层次结构的通配符
– “#”可用于剩余层次结构的通配符(注意,必须是订阅主题最后面的层次结构才能匹配)
1.4.3 单层次结构的通配符使用
使用范例1,
sensors/+/temperature/+
如上所示,
– 通过“COMPUTER_NAME”与“HARDDRIVE_NAME”层使用单层次的通配符“+”
– 该模式可获取包含任意计算机名称和任意硬盘驱动器名称的主题消息
使用范例2,
a/b/c/d
如上所示,该主题可以通过以下任意一种模式匹配成功,
+/b/c/d a/+/c/d a/+/+/d +/+/+/+
以下模式的任意一种不能匹配成功,
a/b/c b/+/c/d +/+/+
1.4.4 剩余层次结构的通配符使用
使用范例,
a/b/c/d
如上所示,该主题可以通过以下任意一种模式匹配成功,
# a/# a/b/# a/b/c/# +/b/c/#
1.5 MQTT的QoS
1.5.1 QoS的概念
– QoS即Quality of service的英文缩写,中文翻译服务质量
– QoS用于定义代理与客户端尝试确保接收消息的难度级别
1.5.2 QoS的级别
– MQTT的QoS分三个级别的,分别用QoS 0、QoS 1和QoS2表示
– 更高级别的QoS意味着需要更低的延迟和更高的带宽
– QoS 0,代理与客户端将只传递一次消息且不确认消息是否接受成功
– QoS 1,代理与客户端将只传递一次消息并且确认消息是否接受成功
– QoS 2,代理与客户端将使用四步握手的方式进行一次性消息传递
1.5.3 QoS的接收规则
– 如果以QoS 2发布的消息,客户端以QoS 0订阅,消息以QoS 0传递给客户端
– 如果以QoS 2发布的消息,客户端以QoS 2订阅,消息以QoS 2传递给客户端
– 如果以QoS 0发布的消息,客户端以QoS 2订阅,消息以QoS 0传递给客户端
注:笔者通过以上实例注意到就低原则
1.6 MQTT的消息性能
– MQTT支持将所有消息设置为保留
– 设置保留后,消息发送给所有当前订阅者后,代理将保留该消息
– 如果新订阅与保留消息匹配,则消息将发送到客户端
– 保留设置适用于不经常更新的主题
– 相对于没有保留消息的设置,客户端将需要等待更长的时间才能接收到消息
1.7 MQTT的会话模式
– 建立连接时,客户端设置“clean session”标志(也称“clean start”)
– 如果“clean session”设置为“false”,则建立持久连接
– 如果“clean session”设置为“true”,则建立非持久连接
– 持久连接模式下,客户端断开连接,则保留客户端的所有订阅并存储后续的QoS 1或2消息,直到再次连接
– 非持久连接模式下,客户端断开连接,则删除客户端的所有订阅
1.8 MQTT的消息嘱咐
– 客户端可以向代理申请嘱咐
– 代理在客户端意外断开时发送具有与任何其他消息相同的主题、QoS和保留状态
2 EMQX的基础知识
1.1 软件简介
– EMQX是完全开源、高度可扩展、高度可用的分布式MQTT消息代理
– MQTT适用于物联网、M2M和移动应用程序,支持处理数千万个并发客户端
– EMQX是云、边、端统一的MQTT平台
– EMQX连接、处理并实时传输百万设备数据
– EMQX轻松介入云、AI与分析系统,将海量物联网数据转化为可落地的业务洞察
– EMQX是全球汽车、制造和AI基础设置领导者的信赖之选
1.2 软件的功能
– 持久化消息流,将MQTT消息持久化到自定义命名的专属数据流,并支持配置消息保留策略。
– 基于偏移量的消息回溯,消费者可通过流偏移量订阅属性,自主控制消息的回放起始位置。
– 普通流 & 末值压缩流,同时支持完整事件流;启用末值语义后,可自动压缩数据流,仅保留每个消息键的最新一条消息。
– 按键保序,同一流键下的所有消息,严格按照发布顺序投递。
– 原生兼容 MQTT,无需改造现有 MQTT 发布端和客户端,直接兼容接入。
1.2 授权
– EMQX在v5.9.0之前基于Apache 2.0协议发布
– EMQX从v5.9.0开始将开源版和企业版所有功能统一到一个功能强大的产品中
– EMQX从v5.9.0开始变更为Business Source License (BSL) 1.1协议
– EMQX的BSL 1.1协议源码可见、可修改、可非生产自由使用
– EMQX的BSL 1.1协议免费生产(单节点),在 “附加使用授权” 范围内,单节点生产环境可免费、无需授权码
– EMQX的BSL 1.1协议集群/商业托管,即集群≥2节点、对外托管服务、嵌入式商用等场景,需申请商业许可证
– MQX的BSL 1.1协议自动开源,即每个版本发布满4年后,该版本自动转为Apache 2.0开源许可证
3 最佳实践
3.1 环境信息
OS = Oracle Linux 8.x x86_64
HostName = emqx01.cmdschool.org
IP Addresses = any
3.2 部署操作
3.2.1 安装软件包
dnf install -y https://www.emqx.com/en/downloads/enterprise/v4.3.8/emqx-ee-centos8-4.3.8-amd64.rpm
3.2.2 查询软件当前配置
egrep -v "^#|^$" /etc/emqx/emqx.conf
可见如下默认配置,
node.name = emqx@127.0.0.1 node.cookie = emqxsecretcookie node.data_dir = /var/lib/emqx node.global_gc_interval = 15m node.crash_dump = /var/log/emqx/crash.dump node.dist_listen_min = 6369 node.dist_listen_max = 6369 node.backtrace_depth = 16 allow_anonymous = true acl_nomatch = allow acl_file = /etc/emqx/acl.conf enable_acl_cache = on acl_cache_max_size = 32 acl_cache_ttl = 1m acl_deny_action = ignore flapping_detect_policy = 30, 1m, 5m mqtt.max_packet_size = 1MB mqtt.max_clientid_len = 65535 mqtt.max_topic_levels = 128 mqtt.max_qos_allowed = 2 mqtt.max_topic_alias = 65535 mqtt.retain_available = true mqtt.wildcard_subscription = true mqtt.shared_subscription = true mqtt.ignore_loop_deliver = false mqtt.strict_mode = false plugins.etc_dir = /etc/emqx/plugins/ plugins.loaded_file = /var/lib/emqx/loaded_plugins plugins.expand_plugins_dir = /var/lib/emqx/plugins/ broker.sys_interval = 1m broker.sys_heartbeat = 30s broker.session_locking_strategy = quorum broker.shared_subscription_strategy = random broker.shared_dispatch_ack_enabled = false broker.route_batch_clean = off broker.perf.trie_compaction = false license.file = /etc/emqx/emqx.lic license.connection_high_watermark_alarm = 80% license.connection_low_watermark_alarm = 75% include /etc/emqx/rpc.conf include /etc/emqx/logger.conf include /etc/emqx/listeners.conf include /etc/emqx/zones.conf include /etc/emqx/cluster.conf include /etc/emqx/sys_mon.conf
3.2.3 启动服务并设置为自启动
systemctl start emqx.service systemctl enable emqx.service systemctl status emqx.service
如果遇到如下错误提示,
May 06 11:23:45 emqx01.cmdschool.org bash[6052]: /usr/lib/emqx/erts-11.1.8/bin/beam.smp: error while loading shared libraries: libtinfo.so.5: cannot open shared object file: No such file or directory
你可能需要使用如下命令解决依赖关系,
find /usr/ -name \*libtinfo.so\*
可见如下显示,
/usr/lib64/libtinfo.so.6 /usr/lib64/libtinfo.so.6.1
然后你需要使用如下命令将“libtinfo.so.6”伪装成“libtinfo.so.5”,
ln -s /usr/lib64/libtinfo.so.6 /usr/lib64/libtinfo.so.5
服务启动后,你可以使用如下命令查看进程,
pgrep -u emqx -a beam.smp
可见如下信息,
2106 emqx -P 2097152 -Q 1048576 -e 256000 -spp true -A 4 -IOt 4 -SDio 8 -- -root /usr/lib/emqx -progname usr/bin/emqx -- -home /var/lib/emqx -- -boot /usr/lib/emqx/releases/4.3.8/start -mode embedded -boot_var ERTS_LIB_DIR /usr/lib/emqx/erts-11.1.8/../lib -mnesia dir "/var/lib/emqx/mnesia/emqx@127.0.0.1" -config /var/lib/emqx/configs/app.2026.05.06.11.40.34.config -kernel net_ticktime 120 -shutdown_time 30000 -pa /var/lib/emqx/patches -pa /usr/lib/emqx/releases/4.3.8/consolidated -setcookie emqxsecretcookie -name emqx@127.0.0.1 -- -vm_args /var/lib/emqx/configs/vm.2026.05.06.11.40.34.args -start_epmd false -epmd_module ekka_epmd -proto_dist ekka -- console --
然后,你可以使用如下命令查看进程倾听的端口,
ss -antp | grep -f <(pgrep -u emqx beam.smp)
可见如下信息,
LISTEN 0 1024 0.0.0.0:1883 0.0.0.0:* users:(("beam.smp",pid=2106,fd=29))
LISTEN 0 1024 0.0.0.0:8083 0.0.0.0:* users:(("beam.smp",pid=2106,fd=30))
LISTEN 0 512 0.0.0.0:8081 0.0.0.0:* users:(("beam.smp",pid=2106,fd=33))
LISTEN 0 1024 0.0.0.0:8084 0.0.0.0:* users:(("beam.smp",pid=2106,fd=32))
LISTEN 0 512 127.0.0.1:11883 0.0.0.0:* users:(("beam.smp",pid=2106,fd=28))
LISTEN 0 1024 0.0.0.0:18083 0.0.0.0:* users:(("beam.smp",pid=2106,fd=34))
LISTEN 0 1024 0.0.0.0:8883 0.0.0.0:* users:(("beam.smp",pid=2106,fd=31))
LISTEN 0 128 0.0.0.0:4370 0.0.0.0:* users:(("beam.smp",pid=2106,fd=17))
LISTEN 0 5 0.0.0.0:5370 0.0.0.0:* users:(("beam.smp",pid=2106,fd=23))
根据以上显示,你可以执行如下命令开放mqtt的服务端口1883/tcp,
firewall-cmd --permanent --add-service mqtt firewall-cmd --reload firewall-cmd --list-all
参阅文档
===============
官方主页
—————
https://www.emqx.com/zh
官方文档
————-
https://docs.emqx.com/en/emqx/latest/
软件的下载
—————-
https://www.emqx.com/en/downloads/enterprise
https://www.emqx.com/en/downloads-and-install/enterprise?os=RHEL
软件安装
————–
https://docs.emqx.com/en/emqx/latest/deploy/install-rhel.html
没有评论