Why openLooKeng?

跨数据中心数据分析

统一的SQL接口访问跨数据中心、跨云的数据源

极简的跨源数据分析体验

统一的SQL接口访问多种数据源

易扩展数据源

可以通过增加Connector来增加数据源

采集变连接、数据零搬迁

openLooKeng社区活力

社区成员

社区用户

贡献者

查看详情

博客

  • 一次openLooKeng的Pipeline之旅:上游算子处理能力加快会使得下游算子也加快么?

    廖登宏 | April 29, 2022

    1 背景 在大数据处理的软件中,如openLooKeng或presto,数据处理往往是以pipeline的方式,通过将data在不同的Operator之间流转,从而实现对数据的处理。 以处理以下sql为例:select sum(ss_sales_price), sum(ss_list_price) from store_sales_partition_1 where ss_sold_date_sk > 0 group by ss_hdemo_sk; 以上sql会形成三个stage,在读数据的source stage主要包含Scan、Agg(partial)、FilterAndProject、PartitionOutput四个算子,数据处理的大部分时间都在这个stage(因为partial agg后,数据量大大减少),一种典型的pipeline如下: 2 问题 那么在pipeline的模式下,TableScanOperator的处理速度加快后,是否其他算子的处理速度也会加快呢? 为了验证这个猜想,我们构造了以下两种场景: 正常的table scan,读取ORC文件; 使用ORC Cache,将读取后生成的block缓存在内存中,下次读取无需读取原始文件。 以下为测试数据: «test result.xlsx» 3 结论 结论:从以上数据可知,通过ORC Cache后,Scan的CPU时间减少了约一半,E2E的时间也因为Scan算子的减少而相应减少,但是其他算子的处理速度却不会因此而提升。 理论分析:A算子从上游算子获取的数据是一定的,无论上游算子是在1s内还是10s内给算子A,那么算子A执行addinput和getoutput进行数据处理的时间就是一定的,因为假设10s某些时间上游算子没有喂给A数据,那么A是不参与运算的,即既不记录CPU时间也不计wall time。 以下为scan和agg处理速度的截图,从图中直观看到scan处理速度加快,而下游的hashagg几乎没有变化: 几个发散的思考 ORC cache 开启hetu.split-cache-map.enabled=true后并执行cache table,那么table cache后的split会cache,会使用cache的split调度策略,而非默认的调度策略,此时会突破node-scheduler.max-splits-per-node的限制,这看起来更像是个bug。 hive.orc.row-data.block.cache.enabled开启后,split和row data都会cache,且cache的data是解压后的block,同时filetail、footer等元数据信息没有开启,因此仍然会去读元数据信息,但是感觉已经没有必要了,因为读这些信息并不能过滤数据,仍然会使用cache的数据?—-这是一个bug? 调度影响 当node-scheduler.max-splits-per-node使用默认值100时,由于单节点处理的数据量很小,因而queued和running的split最大为100,需要分多批次调度,而hetu.split-cache-map.enabled=true后queue和running的splits无限大,为何分批次调度的影响这么大,值得进一步分析。 文件系统Cache row data cache后减少的这一半时间主要是ORC读取上来后的解压缩时间,因为实际数据已经缓存在文件系统cache中,因而数据从内核态拷贝到用户态是很快的。 在最早的分析中,我们发现算子级别的wall和cpu time一样,而端到端时间差异几倍,怀疑是IO wait,但是实际上通过IOstat等命令看,没有io wait,甚至没有io读写,发现是因为文件只有700M,基本全缓存到文件系统cache了,从而进一步理解了文件系统cache。 同时我们怀疑是否有异步线程的时间没有统计到driver的process中,实际上也没有这种情况发生,因为哪怕是异步线程处理,那么在cpu图上一定能看到痕迹。 通过以下两篇文章,能较好的了解异步IO: [译] Linux 异步 I/O 框架 io_uring:基本原理、程序示例与性能压测(2020) 来自 https://arthurchiao.

  • 动态过滤测试报告

    支海东 | April 27, 2022

    1 概述 Join在SQL语言中,是数据库查询永远绕不开的话题,也是SQL中最为复杂,代价最大的操作类型。而在hash join中,如何使构建端(build –side)和探测端(probe-scan)高效运行,便成为了需要研究的问题。动态过滤(Dynamic Filtering)就是为了解决这个问题而应运而生的机制。在openLooKeng中,我们广泛地采用了这种机制。 1.1 背景 SQL语言中join大致有几种执行机制。最简单的是Nested-Loop join和Blocked Nested-Loop join。假设现在有两张表R和S, Nested-Loop join会用二重循环的方式扫描每个(r,s)对,若符合join的条件,就匹配成功,其时间复杂度也很简单地可推导为O(|R||S|)。而Blocked Nested-Loop join是基于前者的改进版,其思路是对于外层循环的表R,不再逐行扫描,而是一次加载一批(即所谓Block)数据进入内存,并且将它们按join key散列在哈希表里,然后也按批扫描表S,并与哈希表进行比对,能够对的上的行就作为join的输出。一个block数据的大小一般是一个或多个内存页的容量。这样就可以将I/O复杂度从O(|R||S|)降低到O[p(R) * p(S) / M],其中p(R)、p(S)分别代表R和S换算成页数的大小,M代表可用内存中的总页数。 此二种loop join的原理图如下所示: 除此以外,主要是Hash join和Grace Hash join。Sort-Merge join由于和动态过滤并不相关,本文在此不予以讨论。Hash join分为两个阶段,构建(build)和探测(probe)。我们将join两端中较小的那个集合R作为构建集(build set),另一个S作为探测集(probe set)。在构建阶段,我们将R的所有数据按照主键进行哈希散列,构成一张哈希表,而哈希表的值便是R原来的行数据。而在探测阶段,我们扫描探测集S,取得S的主键的哈希值并判断其是否在哈希表之中,输出结果。此二阶段的过程的示意图如下所示。易知,其时间复杂度为O(|R|+|S|)。 而Grace Hash join的原理,只是改进版,并不影响我们对于动态过滤机制的理解,因而在此处略去不表。 1.2 原理 上文中已经介绍了join本身的原理,接下来我们介绍动态过滤。 在筛选机制比较苛刻的场景中,绝大多数探测端的行是一经筛选,不匹配则直接丢弃的。但如果我们将这个谓词从计划阶段下沉到执行阶段,在构建端已经有严格筛选条件的情况下,直接减少构建集的计算,从而不去读取探测端那些不匹配的行而不是先读取再丢弃,就节约了大量的筛选时间,极大地提高了运行效率。而这个过程,就叫做动态过滤。 如上图所示,item作为一个已经被严格筛选过的构建集,我们用过滤器F将筛选机制直接传递给探测集。如此,探测端的工作效率便会提升。而动态过滤机制的主要难点就在于如何把构建端的值从inner-join操作符传递到探测端,因为操作符很可能是在不同的机器上运行的。 而在实施的时候,我们主要依靠“基于代价的优化器”(CBO, cost-based optimizer),这个优化器让我们可以使用“广播合并”(broadcast join)。在我们的案例中,构建端远小于探测端,探测端的扫描和inner-join操作符运行在同一进程中,这样信息的通信机制会容易很多。 在确保了广播合并被使用且构建端的信息也能被传送到探测端的时候,我们加入“收集操作符”(collection operator),就放在哈希构建操作符之前。 如图,收集操作符收集了构建端的值,而当构建端的值输入完毕后,我们将动态过滤机制放到探测端一边。由于探测端的哈希映射和收集操作符的哈希映射是并行执行的,这里并不需要花费多余的时间。然后哈希匹配的时候,探测端就可以直接去被筛选过后的构建端的哈希表进行查找。 2 测试 为了审视动态机制开启后的具体效果,我们进行一个测试:对同样的几个节点在先不开启后开启动态过滤机制的状态下,测试同样的几段SQL代码在相同节点上的运行时间。最终通过运行时间的变化体现执行效率的变化。值得一提的是,工作节点(worker nodes)的数量尽可能要大于1个。 2.1 环境配置 https://openlookeng.io/zh-cn/docs/docs/installation/deployment.html https://openlookeng.io/zh-cn/docs/docs/installation/deployment-ha.html https://openlookeng.io/zh-cn/docs/docs/admin/dynamic-filters.html 以上3个网页已经详细讲述了如何配置协调节点(Coordinator Node)和工作节点(Worker Node),以及是否开启动态过滤机制。需要注意的是,等待动态过滤条件生成的最长等待时间最好加上,并且设置为2s,即在etc/config.properties里加入语句 Dynamic-filtering-wait-time=2s 否则有些SQL语句可能会出现由于等待时间不合适而并未开启动态过滤机制的情况。 2.2 测试结果 在看具体数字前,我们首先看看计算图的变化。 这两张图分别是动态过滤机制开启前后,在同样的节点上运行同样的程序的计算图。可见,就像原理部分中所叙述的那样,计算图中出现了动态过滤机制,右图中标红的SQL语句就是体现。 最后,这张表是6段不同的代码在动态过滤机制开启前后的运行效率对比。 如果您有任何疑问或建议,欢迎在社区代码仓内提Issue;也欢迎加小助手微信(openLooKengoss),进入专属技术交流群。 社区代码仓

更多...

新闻

  • openLooKeng v1.6.0 正式发布

    openLooKeng | March 30, 2022

    新版本1.6.0在数据持久化、低时延、连接器、Task Recovery等方面做出了优化,无论是性能提升、可靠性、还是创新方面,都是干货满满,下面就带大家快速浏览openLooKeng新版本1.6.0的关键特性。

  • 2021“科创中国”开源创新榜单公布,openLooKeng入选年度优秀开源产品

    openLooKeng | March 1, 2022

    2021“科创中国”开源创新榜单公布,openLooKeng入选年度优秀开源产品。此次上榜也代表着“科创中国”对openLooKeng在开源创新方面取得的成绩的认可。

更多...