当前位置: 首页>移动开发>正文

sparkSQL插入表的部分字段 spark sql语句

前言

大数据领域“SQL化开发”的风气方兴未艾,这是因为SQL是一种通用、学习成本低的语言,并且还有较强的数据描述能力。不少大数据框架早已支持了SQL化开发,如Spark、Flink、Kafka等。

之前笔者操刀的多数Spark程序都是用传统的RDD API写的,Spark SQL用得很少,Flink也如是。最近抽空对这两门“SQL”做了一些了解,并且断断续续研究了Spark SQL的部分原理,了解到它的内部也是存在基于规则优化(Rule-based optimization, RBO)和基于代价优化(Cost-based optimization, CBO)的,与传统关系型数据库和大数据领域的有些组件(Hive/Presto等)异曲同工。

Spark SQL的核心是Catalyst,即它专属的查询解析和优化器。下面是Spark SQL原始论文中给出的Catalyst执行流程图。





sparkSQL插入表的部分字段 spark sql语句,sparkSQL插入表的部分字段 spark sql语句_SQL,第1张


可见主要分为语句解析、逻辑计划与优化、物理计划与优化、代码生成4个阶段,前3个阶段都由Catalyst负责。其中,逻辑计划的优化采用RBO的思路,物理计划的优化则采用CBO的思路。本文只来看RBO,顺便也介绍一下它之前的语句解析、逻辑计划过程,并不会具体到源码分析的级别。物理计划与CBO比起逻辑计划与RBO更加灵活和复杂,等忙过这一阵之后择期再写。

SQL语句解析

Catalyst使用开源的语法分析器Antlr解析SQL语句,并生成未解析的逻辑计划(Unresolved Logical Plan),对应到源码中的类为SqlBaseLexer和SqlBaseParser。

未解析的逻辑计划其实就是一棵原生的抽象语法树(Abstract Syntax Tree, AST),只与语句本身有关,而与表的元数据没有任何关系。用以下基于TPC-H数据集的查询为例,在其Q3查询的基础上简化而来。TPC-H数据集的导入可以参见这篇文章。

select avg(revenue) from (
  select l_extendedprice * (100 - 99 - l_discount) as revenue
  from tpch.customer c 
  join tpch.orders o on c.c_mktsegment = 'BUILDING' and c.c_custkey = o.o_custkey 
  join tpch.lineitem l on l.l_orderkey = o.o_orderkey
  where o_orderdate <= '1995-03-17' and l_shipdate >= '1995-03-18'
) temp;

调用SparkSession.sql().explain(true)方法,查看执行计划。下面就是未解析的逻辑计划的全貌。

== Parsed Logical Plan ==
'Project [unresolvedalias('avg('revenue), None)]
+- 'SubqueryAlias temp
   +- 'Project [('l_extendedprice * ((100 - 99) - 'l_discount)) AS revenue#0]
      +- 'Filter (('o_orderdate <= 1995-03-17) && ('l_shipdate >= 1995-03-18))
         +- 'Join Inner, ('l.l_orderkey = 'o.o_orderkey)
            :- 'Join Inner, (('c.c_mktsegment = BUILDING) && ('c.c_custkey = 'o.o_custkey))
            :  :- 'SubqueryAlias c
            :  :  +- 'UnresolvedRelation `tpch`.`customer`
            :  +- 'SubqueryAlias o
            :     +- 'UnresolvedRelation `tpch`.`orders`
            +- 'SubqueryAlias l
               +- 'UnresolvedRelation `tpch`.`lineitem`

如果这样不容易阅读的话,我们手动将这棵抽象语法树画出来,就简明得多。别名逻辑操作符(SubqueryAlias)就不画了。



sparkSQL插入表的部分字段 spark sql语句,sparkSQL插入表的部分字段 spark sql语句_hadoop_02,第2张


由上图可见,所有的表都用UnresolvedRelation来表示,也就是仅仅知道它们是表,而对其他信息(表的结构、数据类型、存储位置等等)都一无所知,Project、Filter等操作符中的列名对应的信息自然也是不清楚的。这些东西都要在生成逻辑计划的同时弄明白。

逻辑计划生成

逻辑计划的生成由Analyzer类来实现,它利用SessionCatalog(具体到这里就是Hive的Catalog,即元数据集合)将上一节AST中所有Unresolved的东西解析出来。解析完毕的逻辑计划如下所示。

== Analyzed Logical Plan ==
avg(revenue): double
Aggregate [avg(revenue#0) AS avg(revenue)#72]
+- SubqueryAlias temp
   +- Project [(l_extendedprice#60 * (cast((100 - 99) as double) - l_discount#61)) AS revenue#0]
      +- Filter ((cast(o_orderdate#50 as string) <= 1995-03-17) && (l_shipdate#65 >= 1995-03-18))
         +- Join Inner, (l_orderkey#55 = o_orderkey#46)
            :- Join Inner, ((c_mktsegment#44 = BUILDING) && (c_custkey#38 = o_custkey#47))
            :  :- SubqueryAlias c
            :  :  +- SubqueryAlias customer
            :  :     +- HiveTableRelation `tpch`.`customer`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [c_custkey#38, c_name#39, c_address#40, c_nationkey#41, c_phone#42, c_acctbal#43, c_mktsegment#44, c_comment#45]
            :  +- SubqueryAlias o
            :     +- SubqueryAlias orders
            :        +- HiveTableRelation `tpch`.`orders`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [o_orderkey#46, o_custkey#47, o_orderstatus#48, o_totalprice#49, o_orderdate#50, o_orderpriority#51, o_clerk#52, o_shippriority#53, o_comment#54]
            +- SubqueryAlias l
               +- SubqueryAlias lineitem
                  +- HiveTableRelation `tpch`.`lineitem`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [l_orderkey#55, l_partkey#56, l_suppkey#57, l_linenumber#58, l_quantity#59, l_extendedprice#60, l_discount#61, l_tax#62, l_returnflag#63, l_linestatus#64, l_shipdate#65, l_commitdate#66, l_receiptdate#67, l_shipinstruct#68, l_shipmode#69, l_comment#70]

解析出来的东西有:

  • 各表的元数据(HiveTableRelation)及包含的字段;
  • 聚合操作及对应的函数(Aggregate、avg);
  • 各字段的数据类型与类型转换(as double、as string)。

用树形结构表示如下图。



sparkSQL插入表的部分字段 spark sql语句,sparkSQL插入表的部分字段 spark sql语句_hadoop_03,第3张


接下来就要靠RBO对这棵树进行优化了。

基于规则优化

所谓基于规则优化,就是指通过一系列预先定义好的规则(Rule)对逻辑计划进行等价转换,以提高查询效率。

RBO的两个主要思路是:减少参与计算的数据量、降低重复计算的代价。RBO相对于CBO而言要成熟得多,常用的规则都基于经验制定,可以覆盖大部分查询场景,并且方便扩展。其缺点则是不够灵活,毕竟这个阶段对物理上的特征(如表的底层存储形式和真正的数据量)还没有感知。

下面先列出文章开头的查询优化过的逻辑计划。

== Optimized Logical Plan ==
Aggregate [avg(revenue#0) AS avg(revenue)#72]
+- Project [(l_extendedprice#60 * (1.0 - l_discount#61)) AS revenue#0]
   +- Join Inner, (l_orderkey#55 = o_orderkey#46)
      :- Project [o_orderkey#46]
      :  +- Join Inner, (c_custkey#38 = o_custkey#47)
      :     :- Project [c_custkey#38]
      :     :  +- Filter ((isnotnull(c_mktsegment#44) && (c_mktsegment#44 = BUILDING)) && isnotnull(c_custkey#38))
      :     :     +- HiveTableRelation `tpch`.`customer`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [c_custkey#38, c_name#39, c_address#40, c_nationkey#41, c_phone#42, c_acctbal#43, c_mktsegment#44, c_comment#45]
      :     +- Project [o_orderkey#46, o_custkey#47]
      :        +- Filter (((isnotnull(o_orderdate#50) && (cast(o_orderdate#50 as string) <= 1995-03-17)) && isnotnull(o_custkey#47)) && isnotnull(o_orderkey#46))
      :           +- HiveTableRelation `tpch`.`orders`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [o_orderkey#46, o_custkey#47, o_orderstatus#48, o_totalprice#49, o_orderdate#50, o_orderpriority#51, o_clerk#52, o_shippriority#53, o_comment#54]
      +- Project [l_orderkey#55, l_extendedprice#60, l_discount#61]
         +- Filter ((isnotnull(l_shipdate#65) && (l_shipdate#65 >= 1995-03-18)) && isnotnull(l_orderkey#55))
            +- HiveTableRelation `tpch`.`lineitem`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [l_orderkey#55, l_partkey#56, l_suppkey#57, l_linenumber#58, l_quantity#59, l_extendedprice#60, l_discount#61, l_tax#62, l_returnflag#63, l_linestatus#64, l_shipdate#65, l_commitdate#66, l_receiptdate#67, l_shipinstruct#68, l_shipmode#69, l_comment#70]

优化过的逻辑计划与原本的逻辑计划相比有了很大变化。为了对比清晰,将两棵树都画在下面了。



sparkSQL插入表的部分字段 spark sql语句,sparkSQL插入表的部分字段 spark sql语句_apache_04,第4张


上面的图中包含了3种最常见也是最有效的RBO方式,分别简单阐述一下。英文名称是Spark SQL源码中的字段名称。

  • 常量折叠(ConstantFolding)
    上述语句中有一个纯常量运算表达式,即100 - 99。如果行数很多的话,每行都要计算一次该表达式的值,积少成多就浪费了很多时间(因为该表达式可以更加复杂)。所以通过常量折叠可以将它预先转化为1.0,消除了很多不必要的重复计算。图中红色箭头即是。
  • 谓词下推(PushdownPredicate)
    谓词下推的概念在前面讲解HiveQL优化时已经说过了。如果能够将SQL语句中的谓词逻辑(where条件、join on中的谓词条件)都尽量提前执行,下游处理已经过滤完毕的数据,能够减少工作量。图中绿色箭头即是。
  • 列裁剪(ColumnPruning)
    在未优化的逻辑计划中,Join Inner与Filter操作符都会扫描很多列,然后再由Project操作符筛选出结果列。但实际上,我们可以在初始单独扫描表时就只筛选出符合后续逻辑计划的最小列集合,同样能够节省很多资源。如果表物理上是用Parquet、ORC等列式存储格式持久化的,效率就会更高。图中所有标为橙色的Project操作符即是。

To be continued


https://www.xamrdz.com/mobile/4hy1964528.html

相关文章: