本文虽是C++代码讲解,但JDK也有对应的两种实现,学习Java的同学也可阅读一并了解一下。
背景
在多线程的并发模型中,无论是CSP还是Actor模式,都需要借助一个通道来在多个线程间传递消息来通讯。队列在计算机中是非常重要的一种数据结构,队列典型的特征是先进先出(FIFO),符合流水线业务流程。在进程间通信、网络通信之间经常采用队列做缓存,缓解数据处理压力。
节前定位某一C++开发的部件的性能问题,涉及到阻塞队列唤醒延迟问题。队列是采用ACE提供ACE_Message_Queue,使用场景是单生产/单消费。
ACE_Message_Queue的模型是仿照System V streams提供的队列设施设计的。消息块ACE_Message_Block是消息队列中的固定的对象结构。ACE大量采用了设计模式,代码一层套一层的,这也使得代码变得复杂不容易看懂。ACE_Message_Queue为了支持在多线程或单线程不同场景使用,采用了基于traits策略,通过模板参数来指定是需否要支持多线程。
在Java语言中,JDK中有ArrayBlockingQueue/LinkedBlockingQueue(有锁,有界)与ConcurrentLinkedQueue/LinkedTransferQueue(无锁,无界),开源高性能的Disruptor框架实现了无锁队列。
为了判断ACE提供的队列是否存在调度性能问题(纯入队到出队需要40~50us延迟,并且出现抖动历害),需要进行对比测试,但由于此部件编译环境是suse11,gcc的版本是4.X,不支持c++11,也未采用Boost库(它提供无锁队列)。那我们就来参考一些开源实现来实现自己的多线程安全的队列,顺带学习一下队列背后的机制。
有锁队列
队列的底层结构通常是一个链表,C++程序通常对内存极其敏感,动态地创建删除链表节点会申请与释放内存,长时间运行会导致内存空洞问题。肯定不能采用普通链表了,同时考虑队列长度一定要有上限设置,否则过大会出现内存问题。即两点要求:
- 上限设置
- 固定内存分配
上述两点要求很快联想到循环队列,循环队列基于数组实现,借用一张图如下:
不同的线程之间通讯,需要完成如下逻辑:
- 队列满时,消息生产者阻塞,直到队列有空闲时释放并加入新消息
- 队列空时,消息消费者阻塞,直到队列有消息时释放并取出消息
考虑到不能采用C++11(它提供了更为友好的多线程支持),那只能采用较为底层的pthread库,涉及到两个知识点:
- 互斥锁(pthread_mutex_t):用于对竞争资源加锁保护
- 条件变量(pthread_cond_t): 用于等待阻塞与通知唤醒
精减的代码如下,基本等同实现了一个简化版本的ACE_Message_Queue(它的实现也是基于pthread,为了跨平台还支持其它,不是本文重点):
#include <pthread.h>
#include <vector>
typedef unsigned long ulong;
template <typename T>
class BlockingQueue {
private:
pthread_mutex_t mutex_;
pthread_cond_t not_full_;
pthread_cond_t not_empty_;
volatile ulong head_;
volatile ulong tail_;
ulong capacity_;
std::vector<T> queue_;
public:
explicit BlockingQueue(ulong capacity):
capacity_(capacity),
queue_(capacity + 1),
head_(0),
tail_(0) {
pthread_mutex_init(&mutex_, NULL) ;
pthread_cond_init(¬_full_, NULL) ;
pthread_cond_init(¬_empty_, NULL) ;
}
~BlockingQueue() {
queue_.clear();
pthread_mutex_destroy(&mutex_);
pthread_cond_destroy(¬_full_);
pthread_cond_destroy(¬_empty_);
}
ulong size() const {
// 注tail_/head_采用volatile修饰,此方法并没有加锁保护
return (tail_ - head_ + capacity_) % capacity_;
}
void push(const T& e) {
pthread_mutex_lock(&mutex_);
while (is_full()) { // [1] 为什么要使用while
pthread_cond_wait(¬_full_, &mutex_); // [2]为什么要传入互斥锁
}
queue_[tail++] = e;
tail_ %= (capacity_ + 1);
pthread_cond_signal(¬_empty_); // [3] 采用signal还是broadcast
pthread_mutex_unlock(&mutex_);
}
void pop(T& t) {
pthread_mutex_lock(&mutex_);
while (is_empty()) {
pthread_cond_wait(¬_empty_, &mutex_);
}
T res = queue_[head_++];
head_ %= (capacity_ + 1);
pthread_cond_signal(¬_full_);
pthread_mutex_unlock(&mutex_);
t = res;
}
private:
inline bool is_empty() {
return tail_ == head_;
}
inline bool is_full() {
return (head_ + capacity_ - tail_) % (capacity_ + 1) == 0;
}
};
代码只实现最基本的能力:
- 采用模板,参数T可参实例化为
ACE_Message_Block*
- capacity_: 指定队列的容量,即上限
- push: 消息入队列,当队列满阻塞
- pop: 消息出队列,当队列空阻塞
代码中三处注释说明一下:
- [1] 为什么要使用while?因为pthread_cond_wait存在虚假唤醒(Spurious Wakeup),spurious wakeup 指的是一次 signal() 调用唤醒两个或以上 wait/waiting 的线程,或者没有调用 signal() 却有线程从 wait() 返回。 pthread_cond_wait()底层是futex系统调用。在linux中,任何慢速的阻塞的系统调用当接收到信号的时候,就会返回-1,并且设置errno为EINTR。在系统调用返回前,用户程序注册的信号处理函数会被调用处理。 所以当虚假唤醒时,采用while来检查是否再次满足条件,从而避免导致问题。
- [2] 为什么要传入互斥锁?调用pthread_cond_wait()阻塞自己,但是它持有的锁怎么办呢,如果他不归还操作系统,那么其他线程将会无法访问公有资源。pthread_cond_wait()内部实现机制会自动释放互斥锁,释放时机是线程从调用pthread_cond_wait到操作系统把他放在线程等待队列之后。
- [3] 采用signal还是broadcast? pthread_cond_signal()激活一个等待该条件的线程,存在多个等待线程时按入队顺序激活其中一个;而pthread_cond_broadcast()则激活所有等待线程。
学习Java的同学,可以参考JDK中ArrayBlockingQueue的源码实现,它同样采用了上述机制。
无锁队列
无锁并不是不支持多线程,而是在需要确保操作是线程安全的地方使用CAS(Compare And Swap/Set)等原子操作。CAS是一个CPU级别的指令(X86下对应的是CMPXCHG汇编指令)。工作方式类似于乐观锁,参见Wikipedia的Compare And Swap。
C++的Boost也提供了无锁队列实现(单生产者/单消费者),Boost是一个庞大的库,我们先不引入,还是手写一个。无锁队列可以说历史悠久了,无锁队列实现主要有两篇论文可参考:
- [论文1]数组方式:John D. Valois 1994《Implementing Lock-Free Queues》
- [论文2]链表方式: Maged M. Michael 和 Michael L. Scott 1996《Simple, Fast, and Practical Non-Blocking and Blocking ConcurrentQueue Algorithms》,简称MSQueue
同样我需要考虑不能动态申请与释放内存,采用循环队列数组实现。C++的CAS也有像JavaJDK提供相应的高阶封装,C++11才提供。GCC编译器支持多个原子操作(GCC Atomic Builtins),我们需要使用两个函数:
- __sync_bool_compare_and_swap: CAS,用于对原子数据的修改
- __sync_synchronize: 内存屏障,用于对原子数据的存放与读取
熟悉CAS的同学可能知道,CAS存在ABA的问题(参见WikipediaABA_problem)。Wikipedia上给了一个方法,使用double-CAS,借助另一个辅助计数器,当ABA发生时,虽然值一样,但是计数器不一样了。采用循环数组可以避免ABA问题。
代码如下,各节点以及队列同时记录tail与head,代码改自craflin/LockFreeQueue:
#include <memory>
#include <vector>
#include <unistd.h>
typedef unsigned long ulong;
template <typename T>
class LockFreeQueue {
private:
struct Node
{
T data;
ulong tail; // [1] 用于判断是否队列是否空,或满
ulong head;
};
std::vector<Node> queue_;
ulong capacity_;
ulong capacity_mask_;
ulong tail_;
ulong head_;
public:
explicit LockFreeQueue(ulong capacity): queue_ (capacity) {
capacity_mask_ = capacity - 1;
// capacity是2^N,方便 idx & capacity_mask_
for(unsigned long i = 1; i <= sizeof(void*) * 4; i <<= 1) {
capacity_mask_ |= capacity_mask_ >> i;
}
capacity_ = capacity_mask_ + 1;
queue_.resize(capacity_);
for(unsigned long i = 0; i < capacity_; ++i) {
queue_[i].tail = i; // [2] 用于标识是否此下标被占用
queue_[i].head = -1;
}
tail_ = 0;
head_ = 0;
}
~LockFreeQueue() {
queue_.clear();
}
ulong size() const {
ulong head = load(head_);
return tail_ - head;
}
bool push_without_wait(const T& data) {
Node* node;
ulong next, tail = tail_;
while(tail != next) {
// [3] 定位尾节点,并判断队列是否已满
node = &queue_[tail & capacity_mask_];
if(load(node->tail) != tail) { // 双指针,一个是入队时tail_+ 1, 一个是出队时node->tail = head + capacity_
return false;
}
// [4] 更新tail_ +=1,操作之前的值next与tail相同,可以插入,否则说明多线程修改了,需要循环再次判断
if((next = compare_and_swap(tail_, tail, tail + 1)) == tail) {
break;
}
}
// [5] 插入,节点的head指向原tail,即数组的下标,标识有值
node->data = data;
store(node->head, tail);
return true;
}
bool pop_without_wait(T& result) {
Node* node;
ulong next, head = head_;
while(head != next) {
// [6] 定位首节点,并判断是否为空
node = &queue_[head & capacity_mask_];
if(load(node->head) != head) {
return false;
}
// [7] head_ += 1
if((next = compare_and_swap(head_, head, head + 1)) == head) {
break;
}
}
result = node->data;
store(node->tail, (head + capacity_);
return true;
}
void pop(T& t) {
// 队列空则休眠
while(!pop_without_wait(t)) {
usleep(5);
}
}
private:
inline ulong load(const ulong volatile& var) {
__sync_synchronize();
return var;
}
inline void store(ulong volatile& var, ulong value) {
__sync_synchronize();
var = value;
}
inline ulong compare_and_swap(ulong volatile& var, ulong oldVal, ulong newVal) {
// 返回值是操作之前的值
return __sync_val_compare_and_swap(&var, oldVal, newVal);
}
};
代码逻辑结合了数组与链表两种实现思想方式:
- [1] 数组的元素是一个结构体Node,data存放数据,head/tail指针记录循环数据数组的首尾下标
- [2] 节点初始化,tail标实为数组下标,表示此节点未被占用
- [3] 入队:定位尾节点,tail节点的tail值若为tail_值,说明没有被占用,若占用,说明队列已满
- [4] 循环队列的tail_ + 1,可能存在多线程操作,通过CAS确保只有一个线程在获以得的系统调度时间片上原子操作
- [5] 通过CAS之后,可以真正插入值,则节点的head指向队列的tail,标识有值
上面的注释还是难以理解,先忽略数组方式,假定有一个堆栈,节点如下定义(参见设计不使用互斥锁的并发数据结构):
struct Node {
T data;
Node* next;
Node(const T& d) : data(d), next(0) { }
} Node;
插入操作:
push(const T& data) {
Node *n = new Node(data);
while (1) {
n->next = top;
if (__sync_bool_compare_and_swap(&top, n->next, n)) { // CAS
break;
}
}
}
若是多线程,逻辑如下,采用CAS,关键是需要采用循环不断判断(Re-Try-Loop):
- 可能有两个或更多线程同时试图把数据压入堆栈, 假设线程 A 试图把 20 压入堆栈,线程 B 试图压入 30
- 而线程 A 先获得了时间片,在 n->next = top 指令结束之后,调度程序暂停了线程 A
- 现在线程 B 获得了时间片, 能够完成 CAS,把 30 压入堆栈后结束
- 接着线程 A 恢复执行,显然对于这个线程 *top 和 n->next 不匹配,因为线程 B 修改了 top 位置的内容
- 代码回到循环的开头,指向正确的 top 指针(线程 B 修改后的)调用 CAS,把 20 压入堆栈后结束
学习Java的同学,可以参考JDK中ConcurrentLinkedQueue源码,它是基于论文2实现,你可以对着它的源码进一步了解一下其机制。
等待策略
我们通常会认为,无锁队列会比较有锁队列的性能高,但无锁队列有一个缺点,当队列为空或满时,调用者怎么处理?这涉及到线程的等待,通常的等待策略是采用nanosleep/usleep休眠。但它们与pthread_cond_wait面临的问题是一样的,需要系统调度唤醒。像Linux这种非实时操作系统,只要存在唤醒就会出现调度上的延迟。
事实上,我们在单生产/单消费场景下,对有锁队列与无锁队列进行性能测试:
- 带usleep的无锁队列 与 有锁队列是在平均唤醒延迟上是差不多的,这取决于系统的调度算法
- 不带usleep的无锁队列,会导致空闲时CPU空转,因为不存在唤醒,也就不存在唤醒延迟问题。平均延迟的确下降了,但CPU占用也上升了
无锁队列并不是为了性能而生(要看使用的场景),它能解决有锁队列可能代码问题导致出现死锁或崩溃(假设会崩溃)的问题,因为锁是一定需要释放才能被其它的线程使用。无锁队列也可能带来一定的性能问题,因为它的实现需要Re-Try-Loop,若这个Loop中的CAS长期处理失败状态(生产或消费的线程过多),会加重无谓的CPU占用。
注:实际对比验证并不是前面参照开源实现的无锁队列,而是使用另一成熟老产品已实现的无锁队列,代码中部分还存在汇编代码优化(公司资产不便贴出代码)。
除了sleep函数,linux下还提供sched_yield函数,它的作用是让当前线程放弃CPU使用权,同时把这个线程移动到它静态优先级的调度队列末尾,并且从调度队列中找一个合适的线程来运行。如果当前线程的线程优先级列表是最高的,那么该函数立刻返回,当前线程将继续运行。合理地使用sched_yield将有助于减少竞争来提高性能。
参考Java的Disruptor中实现几种等待策略(高性能队列——Disruptor)
- BlockingWaitStrategy: 加锁,吞吐量和延迟并不重要的场景
- BusySpinWaitStrategy: 自旋,通过不断重试,减少切换线程导致的系统调用,而降低延迟。
- PhasedBackoffWaitStrategy: 自旋 + yield + 自定义策略,吞吐量和延迟并不重要的场景
- SleepingWaitStrategy: 自旋 + yield + sleep, 性能和CPU资源之间有很好的折中。延迟不均匀
- TimeoutBlockingWaitStrategy: 加锁 + 超时限制, 吞吐量和延迟并不重要的场景
- YieldingWaitStrategy: 自旋 + yield + 自旋, 性能和CPU资源之间有很好的折中。延迟比较均匀
系统调度
前面所提到pthread_cond_wait与usleep,它都是阻塞调用,会进入系统队列等待再次被调度。线程从创建、运行到结束总是处于下面四个状态:
- 准备:等待可用的CPU资源,其他条件一切准备好。当线程被pthread_create创建时或者阻塞状态结束后就处于准备状态
- 运行:线程已经获得CPU的使用权,并且正在运行,在多核心的机器中同时存在多个线程正在运行。
- 阻塞:指一个线程在执行过程中暂停,以等待某个条件的触发。
- 终止:线程已经从回调函数中返回,或者调用pthread_exit返回,或者被强制终止。
linux内核的三种调度策略:
- SCHED_OTHER:分时调度策略,系统默认
- SCHED_FIFO:实时调度策略,先到先服务
- SCHED_RR:实时调度策略,时间片轮转
在测试中,我们调整了进程的调度策略(SCHED_OTHER -> SCHED_RR),发现有采用有锁队列也在达到我们预期想要的结果(平均延迟下降了,延迟也平稳了)。 注:pthread_create创建线程时可以指定线程调度的策略与优先级,也可以采用chrt命令来更改调度策略与优先级,不过都需要root权限。
这里点一下SCHED_RR,操作系统对待相同优先级别的线程是平等,操作系统以轮叫循环的方式分配时间片给最高优先级的线程。当我们把进程的内的线程调度都设置为SCHED_RR,则分给他们的时间片更小,也更均匀,所以延迟也平稳了。
我们能不能把应用进程设置为实时调度策略?设计实时进程的时候要谨慎,这类进程变得至高无上,也可能轻易托跨整个系统:
- 任何一个CPU密集型循环在一个实时进程中会继续无止境地运行下去
- 实时进程会好用系统上一切资源,可能出现让系统其他进程等不到处理时间
- 如果不会出现死循环,存在调用阻塞,且延迟敏感的程序则可以考虑实时调度
网上找了三篇系统调度文章:
- 入门:Linux系统调度原理浅析
- 初阶:linux内核–进程调度
- 高阶:Linux 2.6 调度系统分析
结语
队列是多线程编程的基础设施,各种语言都有现成的实现。文章中的代码应该不是最优的,网上也有很多文章介绍无锁队列的,也有针对不同场景(SPSC/MPMC,Single/Multi Producer/Consumer)的极致优化,并不是无锁就比有锁性能好,还得看场景。这次为了把遇到的性能问题进一步验证对比缩小范围,面向搜索编程与学习,打开了解队列的实现机制。从模仿编写代码中我们可以初步了解到C++中的pthread互斥锁的用法、gcc的atomic内建能力,以及操作系统的调度机制。