李铮 | December 31, 2021 | openLooKeng 动态过滤
openLooKeng 动态过滤
1 概述
如之前openLooKeng博客中所述,动态过滤的核心思想是依靠join条件以及build侧表读出的数据,运行时生成动态过滤条件(dynamic filters),应用到probe侧表的table scan阶段,从而减少参与join操作的数据量,有效地减少IO读取与网络传输。动态过滤优化方法包含4个主要阶段: partial filters的构建,partial filters合并,partial filters以及merged filters的传输,merged filters应用。
动态过滤特性系统架构如下图所示,总体架构来看,在进行join处理时,对物理执行计划进行改写,添加DynamicFilterSourceOperator,进行join时build侧表的数据收集。之后,将收集到的partial filters包装成bloomfilter发送给分布式缓存hezelcast。与此同时,构建DynamicFilterService,对收集到的partial filters进行合并。在合并完成之后,将生成好的dynamic filter推给join时probe表,作为其额外的过滤条件,在table scan时对数据进行过滤。
通常场景下,所有的join node都会生成dynamic filter,然后依靠优化器规则PredicatePushDown将生成的dynamic filter下推给TableScanNode。然而,默认的动态过滤生成与应用缺乏对于元数据信息的感知,查询分析引擎probe侧表的filter需要等待build侧表读取完成之后生成,默认join结点的probe表的读取会首先等待一个时间(如1000ms),如果没有等待到build侧的filter传入则无法利用。如果build过滤后的数据量相对大时,也会导致大量的filter在网络中传输。总的来说,直接利用join进行动态过滤生成与应用,缺乏利用统计信息以及元数据信息对于动态过滤条件生成及应用进行预估,导致选择率很低的动态过滤条件生成与使用,造成额外的系统开销。
2 动态过滤优化
2.1 基于选择率优化
当前,优化器规则中RemoveUnsupportedDynamicFilters仅仅删除那些不应该存在的动态过滤条件,比如build侧存在的动态过滤条件或者是probe侧存在动态过滤条件但是对应的JoinNode中并不存在。因此,这个规则是无法支撑我们基于元数据和CBO进行动态过滤生成与应用调整的。因此,我们需要扩展RemoveUnsupportedDynamicFilters规则:
将Metadata以及cost provider提供给RemoveUnsupportedDynamicFilters
删除我们不需要的动态过滤条件,例如,如果表很小且动态过滤无法帮助过滤,那么表应该被过滤得足够多,我们不需要启用动态过滤。
2.2 动态过滤条件生成优化
此外,openLooKeng早期版本,对于动态过滤条件的合并与生成,是为每一个DynamicFilterSourceOperator都注册一个driver id。如果DynamicFilterSourceOperator快速的完成,那么刚注册了他的driver id就很快完成,那么在coordinator上的DynamicFilterService可能会错误的以为所有worker已经完成了自身的读取任务,可以合并动态过滤条件生成最终的数据,这可能会导致错误的结果。此外,对于每个查询,与coordinator中hazelcast的交互次数为
num(Dynamic Filter) * num(Worker) * num(DynamicFilterSourceOperator per Worker) * 4(register, finish, worker, partial result)
当部署到具有高并发查询的大群集时,这可能会意味着coordinator需要为每个DynamicFilterSourceOperator执行合并,可能造成极大的网络传输数据量以及计算开销。为了解决这个问题,openLooKeng 1.1.0中在每个任务中由各个worker完成自己持有的动态过滤条件的合并。因此,DynamicFilterService只需要合并每个worker提供的过滤条件。
2.3 基本实现原理
在RemoveUnsupportedDynamicFilters的优化器中,在进行动态过滤条件检验时:
检查build侧表的预估输出行大小,并且丢弃可能会比较大的动态过滤条件(handleTooLargePredicate()方法仍将保留在DynamicFilterSourceOperator中作为保护措施)。
计算build侧的选择率,如果发现build侧的表的本身自带的过滤条件无法很好的过滤数据,即基本是全表扫描,那我们将对该build侧的表参与的JoinNode中的动态过滤条件删除。
如下图所示,红色框标记的参与join计算的build侧的表,其本身行数过大或者是全表扫描,那么就不会针对该JoinNode生成并应用动态过滤。
另外一种场景,对于tpcds q2,执行计划如下图所示。实际上,DF-3可以过滤顶部JoinNode的probe侧的表。但是由于pipeline调度的关系,因为我们配置了hivePageSouce等待时间,所以整个subplan的执行必须等待右侧的subplan完成。因此,我们可以删除DF-3,对于底部表A,可以尽快获得动态过滤条件,对于表B扫描,它可以使用表A中的动态过滤条件。由于整个左子树可以更早地利用动态过滤,语句整体的执行时间因此缩短。
对于动态过滤条件生成,我们会将每个worker负责的partial动态过滤条件在LocalDynamicFilter:addOperatorResult()中进行合并,当所有分区的读取完成之后,worker将把合并后的partial filters放入到hazelcast当中。相对应的,DynamicFilterService
中的逻辑也可以简化,我们只需要检查所有参与的worker是否完成了他们自己的任务,并将最终合并后的partial filters放入到了hazelcast中。因此,我们不再需要维护复杂的状态信息,因为coordinator参与处理的worker总数,其处理逻辑也得到很大的简化。
参考文档:
https://gitee.com/openlookeng/hetu-core/pulls/398/files
https://gitee.com/openlookeng/hetu-core/pulls/353/files
https://gitee.com/openlookeng/hetu-core/pulls/282/files
https://gitee.com/openlookeng/hetu-core/pulls/789/files
如果您有任何疑问或建议,欢迎在社区代码仓内提Issue;也欢迎加小助手微信(openLooKengoss),进入专属技术交流群。
社区代码仓
https://github.com/openlookeng
openLooKeng,让大数据更简单!