spark里的filter和projection pushdown
1. 概述
predicate pushdown也叫filter pushdown。
所以,有两种pushdown。
- projection pushdown, 用于
select
的pushdown - filter pushdown, 用于
filter
的pushdown
先来明确一下这俩在干嘛。
1.1 projection pushdown
Projection Pushdown minimizes data transfer between MapR Database and the Apache Spark engine by omitting unnecessary fields from table scans. It is especially beneficial when a table contains many columns.
projection pushdown通过在table scan过程中忽略不需要的列来减少从数据源读取的数据量。把projection下堆到数据源处。
当使用select
的时候,会进行projection pushdown,比如
1 | from pyspark.sql import SparkSession |
1.2 filter pushdown
把筛选行的filter下推到数据源处。
也是会减少从数据源传输到spark engine的数据量,但减少的单位是“行”,而projection pushdown减少的单位是“列”。
比如
1 | from pyspark.sql import SparkSession |
支持以下filter的pushdown:
=
和!=
<
,>
,>=
,<=
IN
LIKE
AND
,OR
NOT
1.3 限制
filter pushdown不支持复杂类型:array, map, struct, 比如
scala
1 | df.filter($"address.city" === "Milpitas") |
java
1 | df.filter(col("address.city").equalTo("Milpitas")); |
projection pushdown也不支持这些复杂类型, 比如
scala
1 | ds.select($"hobbies" (0)) |
java
1 | df.select(col("hobbies").getItem(0)); |
但是spark3.0进行了一些改进。
2. databricks关于FilterPushdown的例子
databricks有一些例子How logical plan optimizations work in Catalyst
2.1 more interesting example
创建两个DataFrame
1 | val items = Seq((0, "Macbook Pro", 1999.0), (1, "Macbook Air", 1500.0), (2, "iPad Air", 1200.0)).toDF("id", "name", "price") |
然后搞一个简单的join以及filter
1 | SELECT order.id, item.name, item.price, order.count |
2.1.1 analyzed plan
然后看analyzed plan
1 | val analyzedPlan = sql("SELECT order.id, item.name, item.price, order.count FROM item JOIN order WHERE item.id = order.itemid and item.price < 1400 and order.count > 2 - 1").queryExecution.analyzed |
结果为
1 | analyzedPlan: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan = |
这里边Filter
是Join
的父节点,意味着filter条件里的item.price < 1400 and order.count > 2 - 1
是在join之后才被执行。
2.1.2 optimized plan
看optimized plan
1 | // Apply Spark SQL optimizations |
结果为
1 | optimizedPlan: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan = |
这里可以看到几点变化:
- Filter被下推,直接作用于
LocalRelation
。 适用的规则为PushDownPredicate
2 - 1
被替换成了1
。 适用的规则为ConstantFolding
SubqueryAlia
节点被直接优化掉了cast(1400 as double)
被换成了1400.0
2.1.3 Write rules for logical plan
Catalyst operates with logical plans and expressions, even attribute is an expression. Below are some examples how to convert one expression into another another using rules. Each logical plan is essentially a bunch of
QueryPlan[LogicalPlan]
instances andExpression
expressions. Some examples of logical plans areProject
,Generate
,Filter
, etc.
翻译一下:
Catalyst操作的对象是_logical plans_以及_expressions_, 即使是attribute也是一种expression。下边是一些使用规则(rule)将一个表达式转换成另一个的例子。每个logical plan本质是就是一组
QueryPlan[LogicalPlan]
实例和Expression
表达式。logical plan的例子包括Project
,Generate
,Filter
等。
这里有一些东东不大明白:
QueryPlan[LogicalPlan]
是个什么东东?- plan这个概念和expression有啥区别?
继续往下看
2.1.3.1 expression的转换
1 | // Simple expression transform |
Analyzer is built from rules, each rule is essentially one operation that takes logical plan and returns logical plan with very minimal change, hopefully better change. See example below:
是说rule就是一个operator,输入和输出都是logical plan, 不过这俩logical plan会有些许不同,目的是进行一些优化。
transform
方法的定义为
1 | def transform(rule: PartialFunction[BaseType, BaseType]): BaseType = { |
这里的
1 | { |
就是一个rule。add
和subtract
就是两个Expression
。这里通过一个自己实现的rule,将Add
表达式转成了Subtract
表达式。
2.1.3.2 TreeNode
transform
是Add
继承自TreeNode
的方法。
这里TreeNode
的类型参数有点绕。
1 | abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product { |
在这里要注意两点
TreeNode
是一个invariant class,不是协变类,也不是逆变类。TreeNode
要求其自身是一个BaseType
。也就是说,BaseType
实际上就是指它自己或它的子类。这样在定义方法的时候就可以用到这个限制。比如,TreeNode
的foreach方法这么定义1
2
3
4
5
6
7
8/**
* Runs the given function on this node and then recursively on [[children]].
* @param f the function to be applied to each node in the tree.
*/
def foreach(f: BaseType => Unit): Unit = {
f(this)
children.foreach(_.foreach(f))
}这里之所以可以
f(this)
, 就是因为self: BaseType =>
这个自身类型限制。如果去掉自身类型的限制,那么就f(this)
就不合语法了。
实际上它的直接子类都这样类义的:
Expression
1 | abstract class Expression extends TreeNode[Expression] |
Block
1 | trait Block extends TreeNode[Block] with JavaCode |
QueryPlan
1 | abstract class QueryPlan[PlanType <: QueryPlan[PlanType]] extends TreeNode[PlanType] { |
这样写的话,Expression
继承的TreeNode
的方法,如果参数类型是BaseType
,那么就只能传进去Expression
的子类,而不能传进去Block
的子类。
这样是不可以的
1 | val plan3 = add transform { |
就会报错
pattern type is incompatible with expected type;
found : org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
required: org.apache.spark.sql.catalyst.expressions.Expression
case foo: LogicalPlan => foo
也就是说add接受的transform实际上的类型要求是PartialFunction[Expression, Expression]
(这里要注意PartitalFunction的型变)。
这么搞,有利于递归调用rule的时候避免错误匹配,比如一个用于Expression
的规则被误用到了一个LogicalPlan
,就要出问题了。
2.1.4 Existing filter optimizations
1 | val logicalPlan = LocalRelation(int('a), str('b)). |
结果为
1 | logicalPlan: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan = |
这里可以看到((a#21651 + 1) > 2))
并非最优的形式。所以,下面自己创建一个rule来解析这个表达式。
2.1.5 Optimize filter folding
创建一个Rule
,叫做SimpleFilterFolding
。代码为
1 | // Let's call our rule SimpleFilterFolding |
然后把这个rule注册到我们的optimizer中。
1 | object SimpleOptimizer extends RuleExecutor[LogicalPlan] { |
结果为
1 | defined module SimpleOptimizer |
这里(2 - 1)
并没有被优化掉。但是已有rule可以做这件事,就是叫做ConstantFolding
的rule,现在把它跟我们的rule一起使用。
1 | object SimpleOptimizer extends RuleExecutor[LogicalPlan] { |
这样结果就对了
1 | defined module SimpleOptimizer |
Spark 3.0的在filter pushdown方面有了进步,这个下个blog再研究。