一次openLooKeng的Pipeline之旅:上游算子处理能力加快会使得下游算子也加快么?
| April 29, 2022 |
1 背景 在大数据处理的软件中,如openLooKeng或presto,数据处理往往是以pipeline的方式,通过将data在不同的Operator之间流转,从而实现对数据的处理。 以处理以下sql为例:select sum(ss_sales_price), sum(ss_list_price) from store_sales_partition_1 where ss_sold_date_sk > 0 group by ss_hdemo_sk; 以上sql会形成三个stage,在读数据的source stage主要包含Scan、Agg(partial)、FilterAndProject、PartitionOutput四个算子,数据处理的大部分时间都在这个stage(因为partial agg后,数据量大大减少),一种典型的pipeline如下: 2 问题 那么在pipeline的模式下,TableScanOperator的处理速度加快后,是否其他算子的处理速度也会加快呢? 为了验证这个猜想,我们构造了以下两种场景: 正常的table scan,读取ORC文件; 使用ORC Cache,将读取后生成的block缓存在内存中,下次读取无需读取原始文件。 以下为测试数据: «test result.xlsx» 3 结论 结论:从以上数据可知,通过ORC Cache后,Scan的CPU时间减少了约一半,E2E的时间也因为Scan算子的减少而相应减少,但是其他算子的处理速度却不会因此而提升。 理论分析:A算子从上游算子获取的数据是一定的,无论上游算子是在1s内还是10s内给算子A,那么算子A执行addinput和getoutput进行数据处理的时间就是一定的,因为假设10s某些时间上游算子没有喂给A数据,那么A是不参与运算的,即既不记录CPU时间也不计wall time。 以下为scan和agg处理速度的截图,从图中直观看到scan处理速度加快,而下游的hashagg几乎没有变化: 几个发散的思考 ORC cache 开启hetu.split-cache-map.enabled=true后并执行cache table,那么table cache后的split会cache,会使用cache的split调度策略,而非默认的调度策略,此时会突破node-scheduler.max-splits-per-node的限制,这看起来更像是个bug。 hive.orc.row-data.block.cache.enabled开启后,split和row data都会cache,且cache的data是解压后的block,同时filetail、footer等元数据信息没有开启,因此仍然会去读元数据信息,但是感觉已经没有必要了,因为读这些信息并不能过滤数据,仍然会使用cache的数据?—-这是一个bug? 调度影响 当node-scheduler.max-splits-per-node使用默认值100时,由于单节点处理的数据量很小,因而queued和running的split最大为100,需要分多批次调度,而hetu.split-cache-map.enabled=true后queue和running的splits无限大,为何分批次调度的影响这么大,值得进一步分析。 文件系统Cache row data cache后减少的这一半时间主要是ORC读取上来后的解压缩时间,因为实际数据已经缓存在文件系统cache中,因而数据从内核态拷贝到用户态是很快的。 在最早的分析中,我们发现算子级别的wall和cpu time一样,而端到端时间差异几倍,怀疑是IO wait,但是实际上通过IOstat等命令看,没有io wait,甚至没有io读写,发现是因为文件只有700M,基本全缓存到文件系统cache了,从而进一步理解了文件系统cache。 同时我们怀疑是否有异步线程的时间没有统计到driver的process中,实际上也没有这种情况发生,因为哪怕是异步线程处理,那么在cpu图上一定能看到痕迹。 通过以下两篇文章,能较好的了解异步IO: [译] Linux 异步 I/O 框架 io_uring:基本原理、程序示例与性能压测(2020) 来自 https://arthurchiao.