关于多线程消息队列的一些思考

9 分钟阅读

软件开发过程中非常多的场景需要用到消息队列,我自己也实现过一些,也看过一些开源的实现,最近对于多线程消息队列的实现方式,使用场景等有了一些新的思考,遂抽空记录一下,也是为我之后的新消息队列的实现做铺垫。

下面提到两个多线程消息队列uorb和disruptor,在这里有个uORB和disruptor的简要介绍

多任务之间应该如何共享消息

这里的多任务可以是多进程或多线程。主要描述基于任务本身的队列和基于消息类型的队列的特点。

一个任务一个队列

在嵌入式软件开发中,一些实时操作系统(ucos,ti-rtos,freertos)大多都提供一些线程间共享数据的机制,有些叫邮箱,有些就叫队列,这是实时操作系统中普遍实现的通信机制,这种队列一般是每个线程一个消息队列,然后由其他线程给自己发送消息。

这种队列的一般使用方法是:假设有任务A和B,A想让B做点事(发送指令)可以直接发送消息到B的队列中。但如果有A有一个状态要共享给五个其他任务,它就要发送五条相同的消息到五个其他任务的队列中。 每个任务各自有自己的队列接收其他任务的消息,在原理上可以看作是一种多发布者单订阅者的队列。这样的通信方式常见于使用这些实时操作系统的嵌入式软件中,估计开发人员手头没有更好的数据共享机制就用这个了(还有用全局变量)。

其实如果每个任务都非常独立,互相之间不需要共享很多信息,这种方式也可行,但有些软件系统中需要在多任务间共享很多数据,这时候单任务单队列的架构就会慢慢导致整个软件的复杂度升高,变得难以理解,任务间的关系更加耦合,消息的发送者需要了解谁是消息的接受者。

降低这种场景复杂度的一种重构方式就是从任务中剥离出数据,让任务仅仅是任务,任务和任务间共享的数据另外的东西,于是一些新的消息队列机制诞生了。

一种消息一个队列

目前常见的消息队列都是一种消息一个队列,比如大名鼎鼎的ROS(机器人操作系统,核心是多进程通信),uorb和disruptor,这些现代消息队列都是中介者模式和观察者模式的典型应用。

uorb和ROS这种消息队列的目的是为了更好的组织代码,软件内大部分任务间的信息共享都通过各种消息类型的队列实现,这种机制使得代码的编写更独立,开发人员思想负担更小,各个模块之间的消息共享架构也更加清晰,甚至对于技术管理来说开发任务的分配都更容易了。

这样在开发多任务的软件系统时,大部分开发人员仅需要关心自己的任务需要获取那些数据(主动订阅),然后根据这些数据需要产出哪些数据(主动发布)就可以。

disruptor的常见应用场景是处理某一类消息,比如网络交易订单的处理(disruptor的初衷),日志记录系统log4j2使用disruptor作为日志队列核心,但disruptor也可以做到uorb做的事,目前也准备对disruptor进行扩展以支持更多场景。

我很早就了解到PX4开源飞控软件进而接触到消息队列这个概念,工作中节省很多精力,为了感谢PX4,在重写uorb的过程中的一些新想法也贡献到了上游。

多线程消息队列的设计

消息队列的设计过程中经常会考虑这几个问题场景,我用问答的形式讨论下。

  • 队列长度是否有限制?
    • 为了防止内存占用太多,队列一般都有最长限制,有些使用数组ring_buffer刚开始就规定好长度,有些使用链表实现可以扩张一部分,但有最大限制。我更倾向于使用固定长度的ring_buffer实现,它内存占用稳定,且速度比链表快。
  • *队列中的数据能不能丢?
    • disruptor本身设计用来处理交易订单的,这种场景中肯定不能丢数据,所以disruptor本身的设计就是可靠队列。uorb本身设计用来共享机器人的各种传感器数据,状态信息等等,大部分传感器数据丢掉一些并不会影响系统整体稳定,所以uorb会把旧数据直接用新数据覆盖。uorb没有策略来保证队列中的所有消息都被订阅者拿到了,然而一些关键数据是一个都不能丢的,在这一点上uorb还需要改进。我准备在未来的某个空闲时间贡献一下。
  • 如果队列可以丢弃数据,丢新的 or 丢旧的?
    • 在我的大部分场景中是新的数据更重要,所以我选择丢弃旧的数据。比如uorb传递的传感器数据,肯定是新的数据更重要,所以它选择覆盖旧的。
  • 当队列满写入时和队列空读取时,使用怎样的等待策略?
    • 虽然disruptor中有各种各样的等待策略,我认为大体分为两类:
      • 1. 用cpu轮询,这种策略是disruptor常用的,它就是为并发而生的,它需要发挥每一台服务器的最大潜力把cpu跑到100%。我测试过这种压榨cpu的策略一般可以保证它每秒的并发量是用互斥锁策略的数倍。
      • 2. 用互斥锁和条件变量通知,这种通知机制的单次延迟也很低,从发布者通知到接受者收到平均延迟都可以到0.1ms左右(未开启linux实时优化,大部分平台甚至几百M主频的cpu), 由于每次发布数据都需要通知一次, 经实测处理能力(每秒处理量)相比cpu轮询下降一半左右。
    • 所以根据场景选择就是:在乎单位时间内的高并发速度,且cpu不需要做别的任务,就选择用轮询榨干cpu。 其他场景我都推荐使用互斥锁和条件变量配合,这个通知的效率也足以做很多事了。
  • 无锁 or 有锁?
    • 无锁更快,具有更少的线程间数据竞争。无锁的实现一般使用cas指令实现,这种方式在等待的时候比较耗费cpu,但哪怕等待策略使用条件变量的方式,还是无锁队列快,还有一些使用多个多个队列实现无锁的,这种队列的处理是无序的。
  • 如何支持偷窥者的角色?(偶尔消费的消费者)
    • 与消费者相同,正常拿数据,只不过拿数据的进度不被生产者跟踪。在这种情况下,数据的可靠性可以由消费者自己保证,具体就是常规无锁编程做法:1. 用自己的数据索引位置直接先拿数据出来,2. 判断数据索引是否已经失效,如果已经失效就返回第一步重新拿数据。
  • 如何支持变长数据?
    • 首先把发布的数据单位变为单字节,把消息队列变为字节流。
    • 生产者:在每次发布时添加一个4字节的数据长度头,紧接着放数据,然后一起提交。
    • 消费者:假设消息队列是可靠的,总是能保证消费者处于一个新的数据包头的位置,先拿4字节长度,再根据长度拿数据。
    • 偷窥者(不可靠消费者):如果消费者准备拿的数据被覆盖了,如何重新找到数据头位置? 方法是让生产者在生产时总是维护一个尾节点的位置。如果消费者数据失效,那么它总是可以拿到最旧的数据。
    • 如何满足只想要最新数据的消费者? 还是需要依靠生产者每次发布数据缓存一下上次的发布位置。

uORB和disruptor的简要介绍

目前我最熟悉的两个消息队列实现一个是嵌入式领域PX4开源飞控的uORB库,一个是java语言领域的disruptor,我需要这两个库来描述我的思考,所以简单介绍一下这俩消息队列。

uORB和disruptor的共同点:都提供了线程间发布订阅式的消息队列,都基于数组ring buffer实现,都支持多生产者多消费者。

比较大的不同点:

  1. uORB的实现是一个纯粹的广播队列,生产者只管把数据放进队列里,队列满了旧的消息就会被新的替代,而不管消费者是否使用。disruptor是可靠的消息队列,它保证消息不会丢失,如果队列满了,生产者会等待消费者先把旧数据消费完。也可类比于tcp和udp网络协议。
  2. uORB使用互斥锁保证数据读写原子性,disruptor使用CAS指令及无锁算法实现。

为了学习和其他一些原因(uORB是PX4的一部分且实现不够通用。disruptor在c++领域没有成熟实现,且原理简单),我都根据他们的原理实现了新的库:参考原uORB库的API基于POSIX重新实现:uorb,disruptor的c++实现:lockfree_queue

留下评论