背景
最近调研低成本的大数据量的数据分析框架,搜索发现有:
- 使用Python包Vaex读入并分析100G数据
- Vaex:突破pandas,快速分析100GB大数据集
- 这场Spark、Dask、Vaex、Pandas的正面交锋,谁赢了?
- Vaex:一种具有超级字符串功能的DataFrame Python库
这些文章都在介绍Vaex,第三篇文章中有多种分析框架不同场景应用的性能对比。Vaex由于它采取内存映射、惰性计算的设计,可以在百亿级数据集上进行秒级的统计分析和可视化展示,使得它能在数据分析领域有它一席位置。
作为一名屌丝程度员,在性能倍增的背景下,极其想扒一扒它的代码,探索它是如何做到的。笔者也简单做了一些的验证(数据文件采用Parquet),它的确是秒级完成千万级数据量基于列式存储的数值统计分析(求mean,std,var等)、多列之间的计算以及按列条件过滤。这些计算不需要使用大量的内存,但象join,groupby聚合这类复杂的计算,它还是把数据加载到内存中计算。它的API也没有Pandas灵活与丰富,所以并不能完全取代Pandas。
参考多篇文章,以及走读代码,总结Veax的技术特点:
- 惰性计算:基于文件内存映射,将一个文件映射为虚拟地址,突破物理内存限制
- 零内存拷贝:基于列式的过滤/转换/计算时,无内存拷贝,数据在需要时才进行流式传输,采用C++实现无拷贝到numpy数组的转换
- 字符串优化:采用C++实现对字符串的各种计算,并基于Apache Arrow的StringArray降低CPU使用
- 聚合计算优化:采用C++实现分箱、hash,count/sum等计算,避免Python GIL影响无法真正多核并行计算,可以秒出统计图
- 并行计算:把计算按chunk_size分成多个小任务,采用异步IO多线程/多进程并行计算,本地实现map/reduce(从代码结构看还有企业版未开源)
注:Apache Arrow是内存数据交换格式,可以直接内存到磁盘映射,并且针对数据分析领域做了各种内存结构优化,如:
- 基于内存的列式数据结构:把同一列数据放在一个内存缓冲区上,以便数据分析领域列式计算不需要跨CPU切换
- 空值位图与计算:数据分析领域,会经常遇到空值,把具有空值的数组放在具有连续的内存缓冲区,采用空值位图,降低空间占用
本文重点在分析Vaex如何在有限内存下大数据量计算,对于零内存复制,字符串优化,聚合优化并未展开。
计算框架
讲代码前,先说说Vaex中几个核心对象:
- DataFrame:提供像Pandas的DataFrame API,是对本地或远程Dataset的包装。
- Dataset:具有相同Schema的多个文件的数据集合,可以是本地文件数据集,可以是远程存储S3上的文件数据集。
- Column: DataFrame的列,不同场景又有不同的泛化,如ColumnMaskedNumpy,ColumnSparse,ColumnArrowLazyCast,ColumnStringArrow,实现与numpy和Apache Arrow等映射转换操作
- Executor:Veax采用惰性计算,由它调度任务执行,开源代码提供了ExecutorLocal,猜测未开源的企业版,提供更好的分布式Executor
Pandas需要把数据加载到内存,在内存中完成计算,这是Pandas的优点也是缺点,少数据量纯内存计算非常高效,大数据量需要自已考虑数据如何分区分片计算。与Pandas不同的是,而Vaex对HDF5等格式的列式数据存储文件进行抽象封装,通过把文件映射为虚拟内存的Numpy数组,再基于Numpy数组构建Dataset/DataFrame对象。Vaex也通过采用Apache Arrow库来支持Parquet、Arrow列式数据格式。
以支持HDF5格式分析为例,其中关键类的层次是这样的:
- 继承关系:Hdf5MemoryMapped–>DatasetMemoryMapped–>DatasetFile–>Dataset
- 聚合关系:DataFrame聚合Dataset、Column与Executor
Dataset是数据操作的核心对象,定义数据操作各种API(如concat, merge等),为了支持多任务并行计算,还定义了关键的chunk_iterator接口,返回一个可以计算的chunk信息迭代器。
@abstractmethod
def chunk_iterator(self, columns, chunk_size=None, reverse=False):
pass
从ExecutorLocal的execute_async函数实现来看,Executor会编历chunk_iterator函数生成结果,把每个chunk的计算分配给多个异步多线程执行,从而实现并行计算。
async for element in self.thread_pool.map_async(self.process_part,
dataset.chunk_iterator(columns, chunk_size),
dataset.row_count,
progress=lambda p: all(self.signal_progress.emit(p)) and
all([all(task.signal_progress.emit(p)) for task in tasks]) and
all([not task.cancelled for task in tasks]),
cancel=lambda: self._cancel(run), unpack=True, run=run):
pass # just eat all element
logger.debug("executing took %r seconds" % (time.time() - t0))
ExecutorLocal是基于Python自带asynio库。要有好的并行计算能力,关键还是看各个子Dataset如何实现chunk_iterator方法。
内存映射
文件直接内存映射并不是什么黑科技,Linux下提供系统调用函数mmap。从上面的类继承关系,我们发现Vaex已提供DatasetMemoryMapped来封装文件到内存的映射。
DatasetMemoryMapped._get_mapping函数支持多个文件映射到内存:
def _get_mapping(self, path):
assert not self.nommap
if path not in self.mapping_map:
file = open(path, "rb+" if self.write else "rb")
fileno = file.fileno()
kwargs = {}
if vaex.utils.osname == "windows":
kwargs["access"] = mmap.ACCESS_READ | 0 if not self.write else mmap.ACCESS_WRITE
else:
kwargs["prot"] = mmap.PROT_READ | 0 if not self.write else mmap.PROT_WRITE
mapping = mmap.mmap(fileno, 0, **kwargs)
# TODO: we can think about adding this in py38
# mapping.madvise(mmap.MADV_SEQUENTIAL)
self.file_map[path] = file
self.fileno_map[path] = fileno
self.mapping_map[path] = mapping
return self.mapping_map[path]
代码很直白,采用python自带的mmap库,打开文件并没有立即读取数据(read调用),只是把文件名柄传给mmap函数,从而普通文件被映射到虚拟内存地址空间。
看到这段代码,也让我想起以前看Kafka以及RocketMQ的代码。对于Java语言,NIO同样也提供MappedByteBuffer来操作文件,避免使用JVM管理的堆内存。
为什么开源都喜欢文件内存映射来操作文件?从磁盘上将文件读入内存,都要经过文件系统进行数据拷贝,并且数据拷贝操作是由文件系统和硬件驱动实现的,理论上来说效率也不会差。通过内存映射的方法访问磁盘上的文件,效率要比read和write系统调用高:
- read: 是系统调用,首先将文件从磁盘拷贝到内核空间的一个缓冲区,再将这些数据拷贝到用户空间,实际上进行了两次数据拷贝
- map: 同样也是系统调用,但没有进行数据拷贝,当缺页中断发生时,直接将文件从磁盘拷贝到用户空间,只进行了一次数据拷贝。
所以,通常情况下采用内存映射的读写效率要比传统的read/write性能高。那Vaex是如何使用映射的文件对象呢,再来看一下:
def _do_map(self, path, offset, length, dtype):
if self.nommap:
file = self._get_file(path)
column = ColumnFile(file, offset, length, dtype, write=self.write, path=self.path, tls=self.tls_map[path])
else:
mapping = self._get_mapping(path)
column = np.frombuffer(mapping, dtype=dtype, count=length, offset=offset)
return column
读取HDF5文件的某列数据时,可以直接映射到为numpy的数组,通过np.frombuffer(numpy提供的API)读取数据,只须传入mmap文件对象,读取的长度count,以及偏移量offset,这样可以逐步读取需要的数据,而不需要很多内存,也没有多次的数据拷贝。
分批计算
mmap只解决部分问题,即大文件的读取不需要更多的内存,以及read的效率提升。像前面文章介绍的100GB大数据集统计分析,还得结合并行计算框架,实现分批并行计算。
我再来看一下DatasetMemoryMapped.chunk_iterator实现:
def chunk_iterator(self, columns, chunk_size=None, reverse=False):
if self.nommap:
# we expect here a path like s3 fetching, which will benefit from multithreading
pool = get_main_io_pool()
def read(i1, i2, reader):
return i1, i2, reader()
chunks_generator = self._default_lazy_chunk_iterator(self._columns, columns, chunk_size)
yield from pwait(buffer(pmap(read, chunks_generator, pool), pool._max_workers+3))
else:
yield from self._default_chunk_iterator(self._columns, columns, chunk_size, reverse=reverse)
- 当未采用mmap,只是基于进程池的多任务计算,不支持按列分块
- 当采用mmap,会调用基类的_default_chunk_iterator,其实也是调用_default_lazy_chunk_iterator
再来看一下_default_lazy_chunk_iterator的实现:
def _default_lazy_chunk_iterator(self, array_map, columns, chunk_size, reverse=False):
chunk_size = chunk_size or 1024**2
chunk_count = (self.row_count + chunk_size - 1) // chunk_size
chunks = range(chunk_count)
if reverse:
chunks = reversed(chunks)
for i in chunks:
i1 = i * chunk_size
i2 = min((i + 1) * chunk_size, self.row_count)
def reader(i1=i1, i2=i2):
chunks = {k: array_map[k][i1:i2] for k in columns}
length = i2 - i1
for name, chunk in chunks.items():
assert len(chunk) == length, f'Oops, got a chunk ({name}) of length {len(chunk)} while it is expected to be of length {length} (at {i1}-{i2}'
return chunks
yield i1, i2, reader
代码还算好理解:根据chunk_size与row_count计算chunk_count,按每列生成自己的chunk信息,以便支任务调度时,可以分批分列并行计算。每次迭代的起始位置是i * chunk_size,结束位置是min((i + 1) * chunk_size, self.row_count)。
以value_counts函数(统计函数,计算每列的唯一值的个数)的实现分析为例,内部先定义了map/reduce两个函数:
def map(thread_index, i1, i2, ar):
# 省略
def reduce(a, b):
return a+b
self.ds.map_reduce(map, reduce, [self.expression], delay=False, progress=progress, name='value_counts', info=True, to_numpy=False)
counters = [k for k in counters if k is not None]
而map_reduce的实现,是生成一个task,加入到executor中等待执行:
def map_reduce(self, map, reduce, arguments, progress=False, delay=False, info=False, to_numpy=True, ignore_filter=False, pre_filter=False, name='map reduce (custom)', selection=None):
task = tasks.TaskMapReduce(self, arguments, map, reduce, info=info, to_numpy=to_numpy, ignore_filter=ignore_filter, selection=selection, pre_filter=pre_filter)
progressbar = vaex.utils.progressbars(progress)
progressbar.add_task(task, name)
self.executor.schedule(task)
return self._delay(delay, task)
再总结一下,DataFrame提供的惰性计算的API,执行过程大概如下:
- 创建map与reduce两个函数
- 生成TaskMapReduce任务
- 把任务加入到executor等待调度执行
- 调用execute函数,让asyncio框架执行executor的execute_async函数
- execute_async从队列中取出任务
- 调用chunk_iterator产生需要计算的列的chunk信息
- 线程池(进程池)按chunk分给多个线程/进程并行计算
- 最后执行reduce函数合并计算结果
结语
Vaex针对列式数据的统计计算函数做了针对性的优化,基于文件内存映射,避免消耗内存以及内存的拷贝;实现了一套简单有效的多线程/多进程map/reduce计算框架,通过惰性延迟分批计算,从而从容地应对单机下大数据量的统计计算。