蘭陵N梓記

一指流沙,程序年华


  • 首页

  • 归档

  • 关于

  • 搜索
close

飞哥讲代码22:C++线程安全队列

时间: 2021-02-17   |   分类: 技术     |   阅读: 5838 字 ~12分钟

本文虽是C++代码讲解,但JDK也有对应的两种实现,学习Java的同学也可阅读一并了解一下。

背景

在多线程的并发模型中,无论是CSP还是Actor模式,都需要借助一个通道来在多个线程间传递消息来通讯。队列在计算机中是非常重要的一种数据结构,队列典型的特征是先进先出(FIFO),符合流水线业务流程。在进程间通信、网络通信之间经常采用队列做缓存,缓解数据处理压力。

节前定位某一C++开发的部件的性能问题,涉及到阻塞队列唤醒延迟问题。队列是采用ACE提供ACE_Message_Queue,使用场景是单生产/单消费。

ACE_Message_Queue的模型是仿照System V streams提供的队列设施设计的。消息块ACE_Message_Block是消息队列中的固定的对象结构。ACE大量采用了设计模式,代码一层套一层的,这也使得代码变得复杂不容易看懂。ACE_Message_Queue为了支持在多线程或单线程不同场景使用,采用了基于traits策略,通过模板参数来指定是需否要支持多线程。

在Java语言中,JDK中有ArrayBlockingQueue/LinkedBlockingQueue(有锁,有界)与ConcurrentLinkedQueue/LinkedTransferQueue(无锁,无界),开源高性能的Disruptor框架实现了无锁队列。

为了判断ACE提供的队列是否存在调度性能问题(纯入队到出队需要40~50us延迟,并且出现抖动历害),需要进行对比测试,但由于此部件编译环境是suse11,gcc的版本是4.X,不支持c++11,也未采用Boost库(它提供无锁队列)。那我们就来参考一些开源实现来实现自己的多线程安全的队列,顺带学习一下队列背后的机制。

有锁队列

队列的底层结构通常是一个链表,C++程序通常对内存极其敏感,动态地创建删除链表节点会申请与释放内存,长时间运行会导致内存空洞问题。肯定不能采用普通链表了,同时考虑队列长度一定要有上限设置,否则过大会出现内存问题。即两点要求:

  • 上限设置
  • 固定内存分配

上述两点要求很快联想到循环队列,循环队列基于数组实现,借用一张图如下:

circle_qeueue

不同的线程之间通讯,需要完成如下逻辑:

  • 队列满时,消息生产者阻塞,直到队列有空闲时释放并加入新消息
  • 队列空时,消息消费者阻塞,直到队列有消息时释放并取出消息

考虑到不能采用C++11(它提供了更为友好的多线程支持),那只能采用较为底层的pthread库,涉及到两个知识点:

  • 互斥锁(pthread_mutex_t):用于对竞争资源加锁保护
  • 条件变量(pthread_cond_t): 用于等待阻塞与通知唤醒

精减的代码如下,基本等同实现了一个简化版本的ACE_Message_Queue(它的实现也是基于pthread,为了跨平台还支持其它,不是本文重点):

#include <pthread.h>
#include <vector>

typedef unsigned long ulong;
template <typename T>
class BlockingQueue {
private:
    pthread_mutex_t mutex_;
    pthread_cond_t not_full_;
    pthread_cond_t not_empty_;
    volatile ulong   head_;
    volatile ulong   tail_;
    ulong capacity_;
    std::vector<T> queue_;
public:
    explicit BlockingQueue(ulong capacity): 
        capacity_(capacity), 
        queue_(capacity + 1), 
        head_(0), 
        tail_(0) {
        pthread_mutex_init(&mutex_, NULL) ;
        pthread_cond_init(&not_full_, NULL) ;
        pthread_cond_init(&not_empty_, NULL) ;
    }

    ~BlockingQueue() {
        queue_.clear();
        pthread_mutex_destroy(&mutex_);
        pthread_cond_destroy(&not_full_);
        pthread_cond_destroy(&not_empty_);
    }

    ulong size() const {
        // 注tail_/head_采用volatile修饰,此方法并没有加锁保护
        return (tail_ - head_ + capacity_) % capacity_;
    }

    void push(const T& e) {
        pthread_mutex_lock(&mutex_);
        while (is_full()) { // [1] 为什么要使用while
            pthread_cond_wait(&not_full_, &mutex_); // [2]为什么要传入互斥锁
        }

        queue_[tail++] = e;
        tail_ %= (capacity_ + 1);
        pthread_cond_signal(&not_empty_); // [3] 采用signal还是broadcast
        pthread_mutex_unlock(&mutex_);
    }

    void pop(T& t) {
        pthread_mutex_lock(&mutex_);
        while (is_empty()) { 
            pthread_cond_wait(&not_empty_, &mutex_); 
        }

        T res = queue_[head_++];
        head_ %= (capacity_ + 1);
        pthread_cond_signal(&not_full_);
        pthread_mutex_unlock(&mutex_);
        t = res;
    }

private:
    inline bool is_empty() {
        return tail_ == head_;
    }

    inline bool is_full() {
        return (head_ + capacity_ - tail_) % (capacity_ + 1) == 0;
    }
};

代码只实现最基本的能力:

  • 采用模板,参数T可参实例化为ACE_Message_Block*
  • capacity_: 指定队列的容量,即上限
  • push: 消息入队列,当队列满阻塞
  • pop: 消息出队列,当队列空阻塞

代码中三处注释说明一下:

  • [1] 为什么要使用while?因为pthread_cond_wait存在虚假唤醒(Spurious Wakeup),spurious wakeup 指的是一次 signal() 调用唤醒两个或以上 wait/waiting 的线程,或者没有调用 signal() 却有线程从 wait() 返回。 pthread_cond_wait()底层是futex系统调用。在linux中,任何慢速的阻塞的系统调用当接收到信号的时候,就会返回-1,并且设置errno为EINTR。在系统调用返回前,用户程序注册的信号处理函数会被调用处理。 所以当虚假唤醒时,采用while来检查是否再次满足条件,从而避免导致问题。
  • [2] 为什么要传入互斥锁?调用pthread_cond_wait()阻塞自己,但是它持有的锁怎么办呢,如果他不归还操作系统,那么其他线程将会无法访问公有资源。pthread_cond_wait()内部实现机制会自动释放互斥锁,释放时机是线程从调用pthread_cond_wait到操作系统把他放在线程等待队列之后。
  • [3] 采用signal还是broadcast? pthread_cond_signal()激活一个等待该条件的线程,存在多个等待线程时按入队顺序激活其中一个;而pthread_cond_broadcast()则激活所有等待线程。

学习Java的同学,可以参考JDK中ArrayBlockingQueue的源码实现,它同样采用了上述机制。

无锁队列

无锁并不是不支持多线程,而是在需要确保操作是线程安全的地方使用CAS(Compare And Swap/Set)等原子操作。CAS是一个CPU级别的指令(X86下对应的是CMPXCHG汇编指令)。工作方式类似于乐观锁,参见Wikipedia的Compare And Swap。

C++的Boost也提供了无锁队列实现(单生产者/单消费者),Boost是一个庞大的库,我们先不引入,还是手写一个。无锁队列可以说历史悠久了,无锁队列实现主要有两篇论文可参考:

  • [论文1]数组方式:John D. Valois 1994《Implementing Lock-Free Queues》
  • [论文2]链表方式: Maged M. Michael 和 Michael L. Scott 1996《Simple, Fast, and Practical Non-Blocking and Blocking ConcurrentQueue Algorithms》,简称MSQueue

同样我需要考虑不能动态申请与释放内存,采用循环队列数组实现。C++的CAS也有像JavaJDK提供相应的高阶封装,C++11才提供。GCC编译器支持多个原子操作(GCC Atomic Builtins),我们需要使用两个函数:

  • __sync_bool_compare_and_swap: CAS,用于对原子数据的修改
  • __sync_synchronize: 内存屏障,用于对原子数据的存放与读取

熟悉CAS的同学可能知道,CAS存在ABA的问题(参见WikipediaABA_problem)。Wikipedia上给了一个方法,使用double-CAS,借助另一个辅助计数器,当ABA发生时,虽然值一样,但是计数器不一样了。采用循环数组可以避免ABA问题。

代码如下,各节点以及队列同时记录tail与head,代码改自craflin/LockFreeQueue:

#include <memory>
#include <vector>
#include <unistd.h>

typedef unsigned long ulong;
template <typename T> 
class LockFreeQueue {
private:
  struct Node
  {
    T data;
    ulong tail; // [1] 用于判断是否队列是否空,或满
    ulong head;
  };

  std::vector<Node> queue_;
  ulong capacity_;
  ulong capacity_mask_;
  ulong tail_;
  ulong head_;
public:
  explicit LockFreeQueue(ulong capacity): queue_ (capacity) {
    capacity_mask_ = capacity - 1;
    // capacity是2^N,方便 idx & capacity_mask_
    for(unsigned long i = 1; i <= sizeof(void*) * 4; i <<= 1) {
      capacity_mask_ |= capacity_mask_ >> i; 
    }

    capacity_ = capacity_mask_ + 1;  
    queue_.resize(capacity_);
    for(unsigned long i = 0; i < capacity_; ++i) {
      queue_[i].tail = i; // [2] 用于标识是否此下标被占用
      queue_[i].head = -1;
    }

    tail_ = 0;
    head_ = 0;
  }

  ~LockFreeQueue() {
    queue_.clear();
  }
  
  ulong size() const {
    ulong head = load(head_);
    return tail_ - head;
  }
  
  bool push_without_wait(const T& data) {
    Node* node;
    ulong next, tail = tail_;
    while(tail != next) { 
      // [3] 定位尾节点,并判断队列是否已满
      node = &queue_[tail & capacity_mask_];
      if(load(node->tail) != tail) { // 双指针,一个是入队时tail_+ 1, 一个是出队时node->tail = head + capacity_
        return false;
      }

      // [4] 更新tail_ +=1,操作之前的值next与tail相同,可以插入,否则说明多线程修改了,需要循环再次判断
      if((next = compare_and_swap(tail_, tail, tail + 1)) == tail) {
        break;
      }
    }

    // [5] 插入,节点的head指向原tail,即数组的下标,标识有值
    node->data = data;
    store(node->head, tail); 
    return true;
  }

  bool pop_without_wait(T& result) {
    Node* node;
    ulong next, head = head_;
    while(head != next) {
      // [6] 定位首节点,并判断是否为空
      node = &queue_[head & capacity_mask_];
      if(load(node->head) != head) {
        return false;
      }

      // [7] head_ += 1
      if((next = compare_and_swap(head_, head, head + 1)) == head) {
          break;
      }
    }

    result = node->data;
    store(node->tail, (head + capacity_);
    return true;
  }

  void pop(T& t) {
      // 队列空则休眠
      while(!pop_without_wait(t)) {
          usleep(5);
      }
  }

private:
    inline ulong load(const ulong volatile& var) {
        __sync_synchronize(); 
        return var;
    }

    inline void store(ulong volatile& var, ulong value) {
        __sync_synchronize(); 
        var = value;
    }

    inline ulong compare_and_swap(ulong volatile& var, ulong oldVal, ulong newVal) {
        // 返回值是操作之前的值
        return __sync_val_compare_and_swap(&var, oldVal, newVal);
    }
};

代码逻辑结合了数组与链表两种实现思想方式:

  • [1] 数组的元素是一个结构体Node,data存放数据,head/tail指针记录循环数据数组的首尾下标
  • [2] 节点初始化,tail标实为数组下标,表示此节点未被占用
  • [3] 入队:定位尾节点,tail节点的tail值若为tail_值,说明没有被占用,若占用,说明队列已满
  • [4] 循环队列的tail_ + 1,可能存在多线程操作,通过CAS确保只有一个线程在获以得的系统调度时间片上原子操作
  • [5] 通过CAS之后,可以真正插入值,则节点的head指向队列的tail,标识有值

上面的注释还是难以理解,先忽略数组方式,假定有一个堆栈,节点如下定义(参见设计不使用互斥锁的并发数据结构):

struct Node { 
    T data; 
    Node* next;
    Node(const T& d) : data(d), next(0) { } 
} Node;

插入操作:

push(const T& data) { 
    Node *n = new Node(data); 
    while (1) { 
        n->next = top;
        if (__sync_bool_compare_and_swap(&top, n->next, n)) { // CAS
            break;
        }
    }
}

若是多线程,逻辑如下,采用CAS,关键是需要采用循环不断判断(Re-Try-Loop):

  • 可能有两个或更多线程同时试图把数据压入堆栈, 假设线程 A 试图把 20 压入堆栈,线程 B 试图压入 30
  • 而线程 A 先获得了时间片,在 n->next = top 指令结束之后,调度程序暂停了线程 A
  • 现在线程 B 获得了时间片, 能够完成 CAS,把 30 压入堆栈后结束
  • 接着线程 A 恢复执行,显然对于这个线程 *top 和 n->next 不匹配,因为线程 B 修改了 top 位置的内容
  • 代码回到循环的开头,指向正确的 top 指针(线程 B 修改后的)调用 CAS,把 20 压入堆栈后结束

学习Java的同学,可以参考JDK中ConcurrentLinkedQueue源码,它是基于论文2实现,你可以对着它的源码进一步了解一下其机制。

等待策略

我们通常会认为,无锁队列会比较有锁队列的性能高,但无锁队列有一个缺点,当队列为空或满时,调用者怎么处理?这涉及到线程的等待,通常的等待策略是采用nanosleep/usleep休眠。但它们与pthread_cond_wait面临的问题是一样的,需要系统调度唤醒。像Linux这种非实时操作系统,只要存在唤醒就会出现调度上的延迟。

事实上,我们在单生产/单消费场景下,对有锁队列与无锁队列进行性能测试:

  • 带usleep的无锁队列 与 有锁队列是在平均唤醒延迟上是差不多的,这取决于系统的调度算法
  • 不带usleep的无锁队列,会导致空闲时CPU空转,因为不存在唤醒,也就不存在唤醒延迟问题。平均延迟的确下降了,但CPU占用也上升了

无锁队列并不是为了性能而生(要看使用的场景),它能解决有锁队列可能代码问题导致出现死锁或崩溃(假设会崩溃)的问题,因为锁是一定需要释放才能被其它的线程使用。无锁队列也可能带来一定的性能问题,因为它的实现需要Re-Try-Loop,若这个Loop中的CAS长期处理失败状态(生产或消费的线程过多),会加重无谓的CPU占用。

注:实际对比验证并不是前面参照开源实现的无锁队列,而是使用另一成熟老产品已实现的无锁队列,代码中部分还存在汇编代码优化(公司资产不便贴出代码)。

除了sleep函数,linux下还提供sched_yield函数,它的作用是让当前线程放弃CPU使用权,同时把这个线程移动到它静态优先级的调度队列末尾,并且从调度队列中找一个合适的线程来运行。如果当前线程的线程优先级列表是最高的,那么该函数立刻返回,当前线程将继续运行。合理地使用sched_yield将有助于减少竞争来提高性能。

参考Java的Disruptor中实现几种等待策略(高性能队列——Disruptor)

  • BlockingWaitStrategy: 加锁,吞吐量和延迟并不重要的场景
  • BusySpinWaitStrategy: 自旋,通过不断重试,减少切换线程导致的系统调用,而降低延迟。
  • PhasedBackoffWaitStrategy: 自旋 + yield + 自定义策略,吞吐量和延迟并不重要的场景
  • SleepingWaitStrategy: 自旋 + yield + sleep, 性能和CPU资源之间有很好的折中。延迟不均匀
  • TimeoutBlockingWaitStrategy: 加锁 + 超时限制, 吞吐量和延迟并不重要的场景
  • YieldingWaitStrategy: 自旋 + yield + 自旋, 性能和CPU资源之间有很好的折中。延迟比较均匀

系统调度

前面所提到pthread_cond_wait与usleep,它都是阻塞调用,会进入系统队列等待再次被调度。线程从创建、运行到结束总是处于下面四个状态:

  • 准备:等待可用的CPU资源,其他条件一切准备好。当线程被pthread_create创建时或者阻塞状态结束后就处于准备状态
  • 运行:线程已经获得CPU的使用权,并且正在运行,在多核心的机器中同时存在多个线程正在运行。
  • 阻塞:指一个线程在执行过程中暂停,以等待某个条件的触发。
  • 终止:线程已经从回调函数中返回,或者调用pthread_exit返回,或者被强制终止。

linux内核的三种调度策略:

  • SCHED_OTHER:分时调度策略,系统默认
  • SCHED_FIFO:实时调度策略,先到先服务
  • SCHED_RR:实时调度策略,时间片轮转

在测试中,我们调整了进程的调度策略(SCHED_OTHER -> SCHED_RR),发现有采用有锁队列也在达到我们预期想要的结果(平均延迟下降了,延迟也平稳了)。 注:pthread_create创建线程时可以指定线程调度的策略与优先级,也可以采用chrt命令来更改调度策略与优先级,不过都需要root权限。

这里点一下SCHED_RR,操作系统对待相同优先级别的线程是平等,操作系统以轮叫循环的方式分配时间片给最高优先级的线程。当我们把进程的内的线程调度都设置为SCHED_RR,则分给他们的时间片更小,也更均匀,所以延迟也平稳了。

我们能不能把应用进程设置为实时调度策略?设计实时进程的时候要谨慎,这类进程变得至高无上,也可能轻易托跨整个系统:

  • 任何一个CPU密集型循环在一个实时进程中会继续无止境地运行下去
  • 实时进程会好用系统上一切资源,可能出现让系统其他进程等不到处理时间
  • 如果不会出现死循环,存在调用阻塞,且延迟敏感的程序则可以考虑实时调度

网上找了三篇系统调度文章:

  • 入门:Linux系统调度原理浅析
  • 初阶:linux内核–进程调度
  • 高阶:Linux 2.6 调度系统分析

结语

队列是多线程编程的基础设施,各种语言都有现成的实现。文章中的代码应该不是最优的,网上也有很多文章介绍无锁队列的,也有针对不同场景(SPSC/MPMC,Single/Multi Producer/Consumer)的极致优化,并不是无锁就比有锁性能好,还得看场景。这次为了把遇到的性能问题进一步验证对比缩小范围,面向搜索编程与学习,打开了解队列的实现机制。从模仿编写代码中我们可以初步了解到C++中的pthread互斥锁的用法、gcc的atomic内建能力,以及操作系统的调度机制。

#软件开发# #c++#
飞哥讲代码23:C/C++内存空洞
飞哥讲代码21:C++TLS在Envoy中应用
微信扫一扫交流

标题:飞哥讲代码22:C++线程安全队列
作者:兰陵子
关注:lanlingthink(览聆时刻)
声明:自由转载-非商用-非衍生-保持署名(创作共享3.0许可证)

  • 文章目录
  • 站点概览
兰陵子

兰陵子

Programmer & Architect

164 日志
4 分类
57 标签
GitHub 知乎
  • 背景
  • 有锁队列
  • 无锁队列
  • 等待策略
  • 系统调度
  • 结语
© 2009 - 2022 蘭陵N梓記
Powered by - Hugo v0.101.0
Theme by - NexT
0%