本文虽分析的是C++源码,但是对Evnoy的设计思想分析,并不影响其它语言开发者阅读。
案例
Envoy是Service Mesh框架Istio推荐的SideCar,基于C++开发(大量使用了Google开源C++项目absl),具有高性能的特点,被广大微服务框架爱好者所熟悉。它的高性能一方面也源自它的优秀线程模型,我们可以通过这篇 Envoy为什么能战胜Ngnix——线程模型分析篇 可以进一步了解它的设计思路。这是对Envoy架构师的 博文 翻译,原文内容较深入较长,总结如下:
- 采用单进程多线程的线程模型,其中一个主线程控制一些零散的协作任务
- 若干worker线程负责连接监听,以及连接请求消息的过滤、转发
- 一旦监听器接受了连接,连接的后续生命周期都绑定到单个工作线程
- 使用非阻塞的网络调用,配置的Worker数与CPU核数(线程线)一致,即可完成大部分工作负载
上述做法也是最为常见的多线程设计模式,似乎没有什么特别之处。但写过C++多线程的同学就会知道其中的痛苦,虽然主线程与Worker线程职责分离了,但是xDS会随时动态更新监听器/路由/集群等信息,Worker线程又需要实时查询这些信息以作出变更。这涉及到数据的线程安全,若采用传统的多线程加锁获取共享数据,必然会影响到连接处理请求的性能。
出于高性能的诉求,Envoy采用无锁编程(针对上述数据访问)方案。主线程把需要同步的数据通过Slot推送给Worker线程,由Worker线程在沉寂期完成数据更新,这样需要访问此类数据时就不需要加锁。这就是Envoy中线程模型中最为巧妙的地方。先借用原博文中的一张图片描述一下:
我们以Envoy 1.13.8的源码来看一下如何做到无锁编程,Slot数据推送是结合C++的TLS(thread local stroage)特性使用的。核心代码实现相当的优雅和高内聚,我们可以在 这里 与这里 找到他们。
- 前者主要定义了Slot与SlotAllocator接口
- 后者主要实现Slot对象的管理与其数据推送
- SlotAllocator接口实者InstanceImp:对Slot对象的管理,以及支持集中管理各线程TLS上的Slot数据
- Slot接口实现者SlotImpl:提供set/get函数来设置与获取数据,屏蔽对象上的数据的存储与同步细节
实现逻辑核心思路如下:
- 主线程维护一个Slot对象列表,可以通过index直接访问Slot对象
- 主线程分配置Slot槽位,每个Slot上可以存储不同类型的数据,如xDS数据
- 主线程对Slot对象设置数据时,则把数据发送到所有worker线程作为一个正常的事件循环处理,让Worker线程也把数据设置到它的TLS上,从来在TLS上具有与主线程相同的Slot对象
- Worker线程处理其它事件时,可以从对应的Slot上获取它的所能获取的数据
Dispatcher
为了弄清楚主线程与Worker线程的交互,先要了解一下Enovy中的Dispatcher机制,我们可以从 源码分析——Envoy Dispatcher 了解到:
- 每一个worker线程运行一个非阻塞的EventLoop
- Dispatcher是一接口,它本质上实现是一个EventLoop,其承担了任务队列、网络事件处理、定时器、信号处理等核心功能
- Envoy复用了Libevent中的event_base,Envoy在Libevent的基础上进行了二次封装并抽象出一些事件类,如FileEvent、SignalEvent、Timer等
- Dispatcher的内部有一个任务队列,会有一个工作线程专门处理任务队列中的任务
- Worker外可以通过Dispatcher的post函数将任务投递到worker的任务队列中,交给Dispatcher的线程去处理
Dispatcher是一个典型生产-消费设计模式。
Slot
SlotImpl实现了Slot接口,是InstanceImpl的内部类。它负责维护数据,我们先来看一下其set函数实现:
void InstanceImpl::SlotImpl::set(InitializeCb cb) { // [4] 参数InitializeCb
ASSERT(Thread::MainThread::isMainThread()); // [1] 是否主线程调用
ASSERT(!parent_.shutdown_);
// [2] dispatcher.post推送事件
for (Event::Dispatcher& dispatcher : parent_.registered_threads_) {
// See the header file comments for still_alive_guard_ for why we capture index_.
dispatcher.post(wrapCallback(
[index = index_, cb, &dispatcher]() -> void { setThreadLocal(index, cb(dispatcher)); }));
}
// [3] Handle main thread.
setThreadLocal(index_, cb(*parent_.main_thread_dispatcher_));
}
从上述代码中可以看出:
- [1] Slot对象存放数据只能是主线程调用
- [2] 给每个注册的工作线程推送数据,调用dispatcher.post压入一个lambda的回调函数,在此回调函数会调用setThreadLocal函数来把数据放在Worker线程的TLS中,其中index_是Slot对象槽位号
- [3] 再调用主线程的setThreadLocal来把数据放在主线程的TLS中,因为主线程也可以在收到其它事件来获了相应的槽位数据
- [4]
InitializeCb
是一个回调函数指针,它传递dispatcher参数,返回要存储的对象指针。主线程与工作线程实际存储的数据都是通过调用InitializeCb
函数来拿到真实要存放的数据ThreadLocalObjectSharedPtr
,对于同一个InitializeCb
函数,会在不同的工作线程中调用多次,相当于对槽位数据进行拷贝
InitializeCb参见:using InitializeCb = std::function<ThreadLocalObjectSharedPtr(Event::Dispatcher& dispatcher)>
再来看一下setThreadLocal函数的实现:
// [1] 把对象加入到TLS对象data_列表中
void InstanceImpl::setThreadLocal(uint32_t index, ThreadLocalObjectSharedPtr object) {
if (thread_local_data_.data_.size() <= index) {
thread_local_data_.data_.resize(index + 1);
}
thread_local_data_.data_[index] = object;
}
// [2]前面的thread_local_data_类型如下
struct ThreadLocalData {
Event::Dispatcher* dispatcher_{};
std::vector<ThreadLocalObjectSharedPtr> data_;
};
// [3]thread_local_data_
thread_local InstanceImpl::ThreadLocalData InstanceImpl::thread_local_data_;
从上述代码中可以看出:
- [1]
thread_local_data_.data_
支持存储多个对象,数组大小不够,则扩充,每个index对应Slot的槽位号 - [2]
thread_local_data_.data_
的类型是一个struct,包括dispatcher指针(每个注册的线程),以及存储的对象列表(不同的Slot存储不同的数据) - [3]
thread_local_data_
是InstanceImpl的静态成员,它是一个TLS对象。thread_local是C++11新引入,thread_local关键字修饰的变量具有线程周期(thread duration),这些变量(或者说对象)在线程开始的时候被生成(allocated),在线程结束的时候被销毁(deallocated),thread_local的变量绑定的对象即TLS对象
再来看一下get函数实现:
bool InstanceImpl::SlotImpl::currentThreadRegisteredWorker(uint32_t index) {
return thread_local_data_.data_.size() > index;
}
ThreadLocalObjectSharedPtr InstanceImpl::SlotImpl::getWorker(uint32_t index) {
ASSERT(currentThreadRegisteredWorker(index)); // [1] 判断是否当前工作线程的Slot是否存在
return thread_local_data_.data_[index]; // [2] 根据槽位号取对应的数据
}
ThreadLocalObjectSharedPtr InstanceImpl::SlotImpl::get() { return getWorker(index_); }
从上述代码中可以看出:
- get函数直接调用getWorker函数
- [1] 判断Slot的槽位在当前线程的TLS是否存在
- [2] 根据槽位号index_从当前线程的TLS上取对应的数据
Slot接口还提供两个函数来直接传递数据给所有注册的线程:
- 相同点:机制与set函数实现类似,采用
dispatcher.post
把数据分发给各注册线程处理,通过回调UpdateCb
函数指针来产生数据 - 不同点:不会把
UpdateCb
函数指针产生数据更新到TLS;TLS中存储的数据适合于多次需要读取的场景,runOnAllThreads
函数适合一次数据传递,不需要长期存储
void runOnAllThreads(const UpdateCb& cb) override; // 更新,UpdateCb产生数据
void runOnAllThreads(const UpdateCb& cb, const Event::PostCb& complete_cb) override; // 所有线程完成回调complete_cb
小结:
- Slot.set函数是根据slot的槽位号index_设置数据到TLS上,通过dispatcher.post让各自的注册的dispatcher调用InitializeCb来产生数据,并设置到各自的线程TLS上,即把数据同步到各自线程的TLS中存储
- Slot.get函数是根据slot的槽位号index_从TLS上获取数据,而不是从一个共享的地方获取数据,实现了无锁读取数据
SlotAllocator
InstanceImpl实现SlotAllocator接口,先来看一下allocateSlot函数实现:
SlotPtr InstanceImpl::allocateSlot() {
ASSERT(Thread::MainThread::isMainThread()); // [1] 判断是否主线程调用
ASSERT(!shutdown_);
if (free_slot_indexes_.empty()) { // [2] 空闲的Slot槽位号不够
SlotPtr slot = std::make_unique<SlotImpl>(*this, slots_.size());
slots_.push_back(slot.get());
return slot;
}
const uint32_t idx = free_slot_indexes_.front(); // [3] 从空闲队首取一个
free_slot_indexes_.pop_front();
ASSERT(idx < slots_.size());
SlotPtr slot = std::make_unique<SlotImpl>(*this, idx);
slots_[idx] = slot.get();
return slot;
}
主要代码逻辑:
- [1] 分配Slot槽位,只能是主线程,分配Slot与给Slot设置数据都在主线程上,所以不需要加锁
- [2] InstanceImpl维护一个空闲槽位号列表,当空闲不够,则分配一个新的
- [3] 有空闲槽位,则创建Slot对象,并获取Slot在主线程TLS上存储的对象
再来看一下registerThread函数实现:
void InstanceImpl::registerThread(Event::Dispatcher& dispatcher, bool main_thread) {
ASSERT(Thread::MainThread::isMainThread()); // [1] 判断是否主线程调用
ASSERT(!shutdown_);
if (main_thread) { // [2] 当是注册主线程时
main_thread_dispatcher_ = &dispatcher;
thread_local_data_.dispatcher_ = &dispatcher;
} else { // [3] 当是注册工作线程时
ASSERT(!containsReference(registered_threads_, dispatcher));
registered_threads_.push_back(dispatcher);
dispatcher.post([&dispatcher] { thread_local_data_.dispatcher_ = &dispatcher; });
}
}
主要代码逻辑:
- [1] 注册工作线程只能在主线程中完成,即主线程创建工作线程时,由工作线程的WorkerImpl实现者在其构造函数中调用registerThread
- [2] 主线程也需要注册,因为allocateSlot函数也会调用slot.get(),用于获取主线程产生的数据
- [3] 工作线程不允许重复注册,并通过dispatcher.post,把工作线程的dispatcher设置到其线程的TLS对象thread_local_data_上
此外,InstanceImpl还提供removeSlot函数,会删除Slot对象,在SlotImpl析构函数中调用removeSlot函数,释放一个空闲槽位。
小结:
- 主线程创建工作线程,在工作线程的构造方法中,调用InstanceImpl.registerThread注册工作线程的dispatcher给InstanceImpl
- 主线程创建其它对象(xDS/runtime/statflush/admin等),在这些对象的构造方法中,调用InstanceImpl.allocateSlot来产生槽位存储相应的数据
- 主线程接收取xDS等更新消息时,调用对应的对象持有的slot对象的set函数,把数据同步给所有注册的工作线程
结语
Envoy通过InstanceImpl对象集中管理多个工作线程需要使用的数据,通过对不同类型数据分配Slot对象来存储,所有线程采用TLS方式来持有相同的Slot对象,Slot对象屏蔽了对象的存储与同步。当Slot在设置数据的时候,会同时分发到所有线程执行数据设置逻辑,从而所有线程的Slot对象数据异步更新。天下没有完美的方案,其本质是采用复制来减少对共享数据加锁访问,达到数据最终一致性。