基于RocksDB实现高可靠、低时延的MQTT数据持久化

引言:原生MQTT会话持久化支持

MQTT协议标准中规定Broker必须存储离线客户端的消息。在之前的版本中,EMQX开源版采用了基于内存的会话存储,企业版则在此基础上进一步提供了外部数据库存储方案,借此实现数据持久化。

这种基于内存、非持久化的会话存储方式虽然是基于吞吐量和延迟之间相互权衡下的最优解,但在某些场景下仍会给用户使用带来一定的限制。

本着关注社区反馈、不断完善为用户带来更易用产品的理念,我们在EMQX 5.x的产品规划中增加了基于RocksDB的原生MQTT会话持久化支持。目前这一功能已进入正式开发阶段,预计将在5.1.0版本中和各位用户见面。

本文是对这一特性的抢鲜技术分享。通过对MQTT会话相关概念以及EMQX会话持久化功能设计原理的介绍,帮助读者了解这一更加高可靠、低时延的数据持久化方案。同时,我们还将基于RocksDB持久化能力进行更多新功能探索。

了解MQTT会话

在协议规范中,QoS 1和QoS 2消息首先会在客户端与Broker存储起来,在最终确认抵达订阅端后才会被删除,此过程需要Broker将状态与客户端相关联,这称为会话状态。除了消息存储外,订阅信息(客户端订阅的主题列表)也是会话状态的一部分。

QoS 1消息流程示意图

QoS 2消息流程示意图


客户端中的会话状态包括:

  • 已发送到服务器,但尚未完全确认的QoS 1和QoS 2消息
  • 已从服务器收到但尚未完全确认的QoS 2消息

服务器中的会话状态包括:

  • 会话的存在状态,即使会话为空
  • 客户订阅信息
  • 已发送到客户端,但尚未完全确认的QoS 1和QoS 2消息
  • 等待传输到客户端的QoS 0(可选)、QoS 1和QoS 2消息
  • 已从客户端收到但尚未完全确认的QoS 2消息,Will Message(遗嘱消息)和Will Delay Interval(遗嘱延时间隔)

会话生命周期与会话存储

会话是MQTT协议通信的关键,MQTT协议要求网络连接打开时必须保留会话状态;当网络连接关闭后,则根据Clean Session(MQTT 3.1.1)以及Clean Start+会话过期间隔(MQTT 5.0)的设置情况控制实际的丢弃时机。

MQTT 3.1.1中会话生命周期与Clean Session的关系

MQTT 5.0中会话生命周期与Session Expiry Interval的关系

本文不再赘述不同机制下会话生命周期差异,相关内容可以参考文章《Clean Start与Session Expiry Interval-MQTT 5.0新特性》。

总而言之,当Broker中存在会话的时候,消息将持续进入会话,当会话对应的客户端断开连接或不具备消息处理能力时,消息将在会话中堆积。

MQTT协议并未规定会话持久性上的实现,这意味着客户端和Broker可以根据场景需求和自身设计,选择将其存储在内存或磁盘中。

过往版本的EMQX会话持久化设计

在此前的版本中,EMQX并未支持Broker内部消息持久化,这是吞吐量和延迟之间的权衡以及架构设计选择:

  1. EMQX解决的核心问题是连接与路由,极少情况下需要将消息持久存储,而保留消息作为一种特例是支持存储在磁盘的。
  2. EMQX作为云端服务,这类环境下服务器稳定性足够可靠,即使消息都在内存中也不会有太大的丢失风险。
  3. 内置持久化设计需要权衡高吞吐场景下内存与磁盘的使用、多服务器分布集群架构下数据的存储与复制设计,在快速发展的项目中很难确保持久化设计一步到位。

尽管从性能的角度来看将所有消息存储在内存中是有益的,但基于内存的会话存储仍不可避免地会带来一些问题:大量的连接和可能存在的会话消息堆积将带来较高的内存占用,这将限制用户大规模使用持久会话功能(Clean Session=0);同时,在对EMQX进行重启操作或者EMQX意外宕机时也可能会导致会话数据丢失,从而对数据可靠性带来一定影响。

随着服务器市场SSD磁盘的大量应用,内存与磁盘两种方案之间的差距其实已经可以做到很小了。另外LevelDB和RocksDB基础架构的繁荣发展以及在Erlang中的成熟使用也为原生会话持久化支持的实现奠定了基础。

EMQX自5.0正式开启了亿级物联网连接时代,无论在功能还是性能方面均以匹配行业最新需求为目标进行了规划设计,一个新的会话持久化能力支持设计方案也因此被提上日程。

Why RocksDB:全新会话层选型

结合EMQX接入的数据特性,对比各种存储引擎后我们最终选择RocksDB作为新的持久化层。

RocksDB简介

RocksDB是一个嵌入式、持久化的键值存储引擎。它针对快速、低延迟的存储进行了优化,具有很高的写入吞吐。RocksDB支持预写日志,范围扫描和前缀搜索,在高并发读写以及大容量存储时能够提供一致性的保证。

选型依据

在EMQX会话层设计中,会话存储于本地节点,我们倾向于在EMQX内部存储数据,而不是把EMQX作为外部数据库的一个前端,因此选型范围限制在嵌入式数据库中。

除了RocksDB之外,我们还主要考察了以下数据库:

  • Mnesia:Mnesia是Erlang/OTP自带的分布式实时数据库系统,在Mnesia集群中,所有节点都是平等的。它们中的每一个节点都可以存储一份数据副本,也可以启动事务或执行读写操作。Mnesia可以凭借复制特性而支持极高的读取吞吐,但这一特性也限制了其写入吞吐,因为这意味着MQTT消息基本上是在集群内广播的,广播并不能横向扩展。

  • LevelDB:RocksDB是LevelDB的一个改进分支,从功能上来说它们大多是等同的,但LevelDB在Erlang中缺少积极维护的驱动(Erlang NIF)因此没有被采用。

相比之下,RocksDB的优势非常明显:

  • 极高的写入吞吐:RocksDB基于为数据写入而优化的LSM-Tree结构,能够支持EMQX海量消息吞吐与快速订阅时高频的数据写入
  • 迭代器和快速范围查询:RocksDB支持对排序的键进行迭代,基于此特性EMQX可以扩展更多功能
  • 支持Erlang:用于RocksDB的NIF库已经成熟并得到积极支持

在对RocksDB会话持久化方案的初步测试中,RocksDB的性能优势得到充分发挥,相比内存存储,在其他模块达到瓶颈之前即可达到相同的发布率。

EMQX基于RocksDB的会话持久化设计

RocksDB将替换当前apps/emqx/src/persistent_session目录下的所有模块,以使用RocksDB来存储MQTT会话数据。

EMQX允许全部客户端或使用QoS、主题前缀等过滤器配置需要启用持久化的客户端以及主题。在磁盘性能不足或可以接受消息丢失、需要极端性能的场景中,允许用户关闭持久化功能使用内存存储方案。

数据分发

RocksDB作为嵌入式数据库,不具备集群内数据分发的能力。在需要节点间传递数据的操作中,如会话从一个节点移动至另一个节点,会通过EMQX的消息分发机制处理。

我们将Mnesia的复制特性与RocksDB的持久化特性结合到一起,会话可以存储到RocksDB,但是使用的是Mnesia的API,RocksDB只是Mnesia的一个后端。

哪些数据可以通过RocksDB持久化

  1. 以Clean Start=0连接的客户端的会话记录
  2. 订阅数据(Subscriptions),在订阅时写入RocksDB,取消订阅时从RocksDB删除
  3. 每次客户端发布消息QoS 1、QoS 2消息时,数据会写入RocksDB,保留至确认后删除
  4. 作为其他高吞吐低延迟场景的Storage,如保留消息、数据桥接缓存队列

持久化能力拓展

RocksDB的引入为EMQX提供了一个高性能、可靠的持久化层,在此基础上EMQX可以扩展更多的功能。

消息重放

在某些场景下,发布端不需要关心订阅者是否在线,但又要求消息必须到达订阅端,即使订阅端不在线甚至会话不存在。

通过持久层的支持,EMQX能够扩展MQTT协议实现以支持类似Kafka的消息重放功能:消息发布时允许设置特殊的标志位以持久保存在发布目标主题中,订阅者携带非标准的订阅属性时,允许获取主题中指定位置之后的消息。

消息重放能够用于设备初始化、OTA升级这类不关心指令时效性的场景中,在发布者和订阅者之间更灵活的传输数据。

消息重放典型流程

  1. 发布端发布一条持久性消息
  2. EMQX将消息存储至重放队列中,无需关心订阅者是否在线
  3. 订阅端发起订阅
  4. EMQX从指定位置读取消息
  5. 重放消息发布到订阅者

数据桥接缓存队列

将持久层用于数据桥接的缓存队列,当桥接资源不可用时可以将数据存储至缓存队列,等待资源恢复后再继续传输,避免大量数据在内存中堆积。

结语

基于RocksDB实现的原生MQTT会话持久化是EMQX发布以来的一项突破性的重要功能变革,这一能力将为开源用户提供更可靠的业务保证,可以不受限制地充分利用MQTT协议特性进行物联网应用开发。使用外部数据存储的企业用户则可以迁移到RocksDB,从而获得更低时延的数据持久化方案。

同时,结合物联网实际使用场景,EMQX还将围绕持久化能力扩展更多的功能支持,以满足日益多样化的物联网数据需求。

发表评论
留言与评论(共有 0 条评论) “”
   
验证码:

相关文章

推荐文章