spark里的filter和projection pushdown

1. 概述

predicate pushdown也叫filter pushdown。

所以,有两种pushdown。

  • projection pushdown, 用于select 的pushdown
  • filter pushdown, 用于filter的pushdown

先来明确一下这俩在干嘛。

1.1 projection pushdown

Projection Pushdown minimizes data transfer between MapR Database and the Apache Spark engine by omitting unnecessary fields from table scans. It is especially beneficial when a table contains many columns.

projection pushdown通过在table scan过程中忽略不需要的列来减少从数据源读取的数据量。把projection下堆到数据源处。

当使用select的时候,会进行projection pushdown,比如

1
2
3
4
from pyspark.sql import SparkSession

df = spark_session.loadFromMapRDB("/tmp/user_profiles")
df.select("_id", "first_name", "last_name")

1.2 filter pushdown

把筛选行的filter下推到数据源处。

也是会减少从数据源传输到spark engine的数据量,但减少的单位是“行”,而projection pushdown减少的单位是“列”。

比如

1
2
3
4
from pyspark.sql import SparkSession

df = spark_session.loadFromMapRDB("/tmp/user_profiles")
df.filter("first_name = 'Bill'")

支持以下filter的pushdown:

  • =!=
  • <, >, >=, <=
  • IN
  • LIKE
  • ANDOR
  • NOT

1.3 限制

filter pushdown不支持复杂类型:array, map, struct, 比如

scala

1
df.filter($"address.city" === "Milpitas")

java

1
df.filter(col("address.city").equalTo("Milpitas"));

projection pushdown也不支持这些复杂类型, 比如

scala

1
ds.select($"hobbies" (0))

java

1
df.select(col("hobbies").getItem(0));

但是spark3.0进行了一些改进。

2. databricks关于FilterPushdown的例子

databricks有一些例子How logical plan optimizations work in Catalyst

2.1 more interesting example

创建两个DataFrame

1
2
3
4
5
val items = Seq((0, "Macbook Pro", 1999.0), (1, "Macbook Air", 1500.0), (2, "iPad Air", 1200.0)).toDF("id", "name", "price")
val orders = Seq((100, 0, 1), (100, 1, 1), (101, 2, 3)).toDF("id", "itemid", "count")

items.createOrReplaceTempView("item")
orders.createOrReplaceTempView("order")

然后搞一个简单的join以及filter

1
2
3
4
SELECT order.id, item.name, item.price, order.count
FROM item
JOIN order
WHERE item.id = order.itemid and item.price < 1400 and order.count > 2 - 1

2.1.1 analyzed plan

然后看analyzed plan

1
val analyzedPlan = sql("SELECT order.id, item.name, item.price, order.count FROM item JOIN order WHERE item.id = order.itemid and item.price < 1400 and order.count > 2 - 1").queryExecution.analyzed

结果为

1
2
3
4
5
6
7
8
9
10
analyzedPlan: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan = 
Project [id#15942, name#15929, price#15930, count#15944]
+- Filter (((id#15928 = itemid#15943) && (price#15930 < cast(1400 as double))) && (count#15944 > (2 - 1)))
+- Join Inner
:- SubqueryAlias item
: +- Project [_1#15924 AS id#15928, _2#15925 AS name#15929, _3#15926 AS price#15930]
: +- LocalRelation [_1#15924, _2#15925, _3#15926]
+- SubqueryAlias order
+- Project [_1#15938 AS id#15942, _2#15939 AS itemid#15943, _3#15940 AS count#15944]
+- LocalRelation [_1#15938, _2#15939, _3#15940]

这里边FilterJoin的父节点,意味着filter条件里的item.price < 1400 and order.count > 2 - 1是在join之后才被执行。

2.1.2 optimized plan

看optimized plan

1
2
// Apply Spark SQL optimizations
val optimizedPlan = SimpleTestOptimizer.execute(analyzedPlan)

结果为

1
2
3
4
5
6
7
8
9
optimizedPlan: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan = 
Project [id#15942, name#15929, price#15930, count#15944]
+- Join Inner, (id#15928 = itemid#15943)
:- Project [_1#15924 AS id#15928, _2#15925 AS name#15929, _3#15926 AS price#15930]
: +- Filter (_3#15926 < 1400.0)
: +- LocalRelation [_1#15924, _2#15925, _3#15926]
+- Project [_1#15938 AS id#15942, _2#15939 AS itemid#15943, _3#15940 AS count#15944]
+- Filter (_3#15940 > 1)
+- LocalRelation [_1#15938, _2#15939, _3#15940]

这里可以看到几点变化:

  1. Filter被下推,直接作用于LocalRelation。 适用的规则为PushDownPredicate
  2. 2 - 1 被替换成了1。 适用的规则为ConstantFolding
  3. SubqueryAlia节点被直接优化掉了
  4. cast(1400 as double)被换成了1400.0

2.1.3 Write rules for logical plan

Catalyst operates with logical plans and expressions, even attribute is an expression. Below are some examples how to convert one expression into another another using rules. Each logical plan is essentially a bunch of QueryPlan[LogicalPlan] instances and Expression expressions. Some examples of logical plans are Project, Generate, Filter, etc.

翻译一下:

Catalyst操作的对象是_logical plans_以及_expressions_, 即使是attribute也是一种expression。下边是一些使用规则(rule)将一个表达式转换成另一个的例子。每个logical plan本质是就是一组QueryPlan[LogicalPlan]实例和Expression表达式。logical plan的例子包括Project, Generate, Filter等。

这里有一些东东不大明白:

  1. QueryPlan[LogicalPlan]是个什么东东?
  2. plan这个概念和expression有啥区别?

继续往下看

2.1.3.1 expression的转换

1
2
3
4
5
6
7
8
// Simple expression transform
val add = Add(Literal(2), Literal(3))

val subtract = add transform {
case Add(left, right) => Subtract(left, right)
}
add: org.apache.spark.sql.catalyst.expressions.Add = (2 + 3)
subtract: org.apache.spark.sql.catalyst.expressions.Expression = (2 - 3)

Analyzer is built from rules, each rule is essentially one operation that takes logical plan and returns logical plan with very minimal change, hopefully better change. See example below:

是说rule就是一个operator,输入和输出都是logical plan, 不过这俩logical plan会有些许不同,目的是进行一些优化。

transform方法的定义为

1
2
3
def transform(rule: PartialFunction[BaseType, BaseType]): BaseType = {
transformDown(rule)
}

这里的

1
2
3
{
case Add(left, right) => Subtract(left, right)
}

就是一个rule。addsubtract就是两个Expression。这里通过一个自己实现的rule,将Add表达式转成了Subtract表达式。

2.1.3.2 TreeNode

transformAdd继承自TreeNode的方法。

这里TreeNode的类型参数有点绕。

1
2
3
abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product {
// scalastyle:on
self: BaseType =>

在这里要注意两点

  1. TreeNode是一个invariant class,不是协变类,也不是逆变类。

  2. TreeNode要求其自身是一个BaseType。也就是说, BaseType实际上就是指它自己或它的子类。这样在定义方法的时候就可以用到这个限制。比如,TreeNode的foreach方法这么定义

    1
    2
    3
    4
    5
    6
    7
    8
    /**
    * Runs the given function on this node and then recursively on [[children]].
    * @param f the function to be applied to each node in the tree.
    */
    def foreach(f: BaseType => Unit): Unit = {
    f(this)
    children.foreach(_.foreach(f))
    }

    这里之所以可以f(this), 就是因为self: BaseType =>这个自身类型限制。如果去掉自身类型的限制,那么就f(this)就不合语法了。

实际上它的直接子类都这样类义的:

Expression

1
abstract class Expression extends TreeNode[Expression]

Block

1
trait Block extends TreeNode[Block] with JavaCode

QueryPlan

1
2
abstract class QueryPlan[PlanType <: QueryPlan[PlanType]] extends TreeNode[PlanType] {
self: PlanType =>

这样写的话,Expression继承的TreeNode的方法,如果参数类型是BaseType,那么就只能传进去Expression的子类,而不能传进去Block的子类。

这样是不可以的

1
2
3
val plan3 = add transform {
case foo: LogicalPlan => foo
}

就会报错

pattern type is incompatible with expected type;
found : org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
required: org.apache.spark.sql.catalyst.expressions.Expression
case foo: LogicalPlan => foo

也就是说add接受的transform实际上的类型要求是PartialFunction[Expression, Expression] (这里要注意PartitalFunction的型变)。

这么搞,有利于递归调用rule的时候避免错误匹配,比如一个用于Expression的规则被误用到了一个LogicalPlan,就要出问题了。

2.1.4 Existing filter optimizations

1
2
3
4
5
6
val logicalPlan = LocalRelation(int('a), str('b)).
select('a).
where(GreaterThan(Add('a, Literal(1)), Literal(2)))

val analyzedPlan = logicalPlan.analyze
val optimizedPlan = SimpleTestOptimizer.execute(analyzedPlan)

结果为

1
2
3
4
5
6
7
8
9
10
11
12
13
14
logicalPlan: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan = 
'Filter (('a + 1) > 2)
+- 'Project ['a]
+- LocalRelation <empty>, [a#21651, b#21652]

analyzedPlan: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =
Filter ((a#21651 + 1) > 2)
+- Project [a#21651]
+- LocalRelation <empty>, [a#21651, b#21652]

optimizedPlan: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =
Project [a#21651]
+- Filter (isnotnull(a#21651) && ((a#21651 + 1) > 2))
+- LocalRelation <empty>, [a#21651, b#21652]

这里可以看到((a#21651 + 1) > 2))并非最优的形式。所以,下面自己创建一个rule来解析这个表达式。

2.1.5 Optimize filter folding

创建一个Rule,叫做SimpleFilterFolding。代码为

1
2
3
4
5
6
7
8
9
10
11
// Let's call our rule SimpleFilterFolding
object SimpleFilterFolding extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
// We take logical plan and only apply our rule when we encounter filter with a simple `add` condition
case filter @ Filter(condition, _) => filter transformExpressionsUp {
// What we need to do is replacing our filter where `expr` is greater than right side - literal
case GreaterThan(Add(expr, literal: Literal), right) =>
GreaterThan(expr, Subtract(right, literal))
}
}
}

然后把这个rule注册到我们的optimizer中。

1
2
3
4
5
6
7
object SimpleOptimizer extends RuleExecutor[LogicalPlan] {
val batches = Batch("Filter folding", Once,
SimpleFilterFolding) :: Nil
}

// We take analyzed plan and run through Optimizer object
val optimizedPlan = SimpleOptimizer.execute(analyzedPlan)

结果为

1
2
3
4
5
defined module SimpleOptimizer
optimizedPlan: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =
Filter (a#21651 > (2 - 1))
+- Project [a#21651]
+- LocalRelation <empty>, [a#21651, b#21652]

这里(2 - 1)并没有被优化掉。但是已有rule可以做这件事,就是叫做ConstantFolding的rule,现在把它跟我们的rule一起使用。

1
2
3
4
5
6
7
object SimpleOptimizer extends RuleExecutor[LogicalPlan] {
val batches = Batch("Filter folding", Once,
SimpleFilterFolding, ConstantFolding) :: Nil
}

// We take analyzed plan and run through Optimizer object
val optimizedPlan = SimpleOptimizer.execute(analyzedPlan)

这样结果就对了

1
2
3
4
5
defined module SimpleOptimizer
optimizedPlan: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =
Filter (a#21651 > 1)
+- Project [a#21651]
+- LocalRelation <empty>, [a#21651, b#21652]

Spark 3.0的在filter pushdown方面有了进步,这个下个blog再研究。

参考资料