spark里的filter和projection pushdown

1. 概述

predicate pushdown也叫filter pushdown。

所以,有两种pushdown。

  • projection pushdown, 用于select 的pushdown
  • filter pushdown, 用于filter的pushdown

先来明确一下这俩在干嘛。

1.1 projection pushdown

Projection Pushdown minimizes data transfer between MapR Database and the Apache Spark engine by omitting unnecessary fields from table scans. It is especially beneficial when a table contains many columns.

projection pushdown通过在table scan过程中忽略不需要的列来减少从数据源读取的数据量。把projection下堆到数据源处。

当使用select的时候,会进行projection pushdown,比如

1
2
3
4
from pyspark.sql import SparkSession

df = spark_session.loadFromMapRDB("/tmp/user_profiles")
df.select("_id", "first_name", "last_name")

1.2 filter pushdown

把筛选行的filter下推到数据源处。

也是会减少从数据源传输到spark engine的数据量,但减少的单位是“行”,而projection pushdown减少的单位是“列”。

比如

1
2
3
4
from pyspark.sql import SparkSession

df = spark_session.loadFromMapRDB("/tmp/user_profiles")
df.filter("first_name = 'Bill'")

支持以下filter的pushdown:

  • =!=
  • <, >, >=, <=
  • IN
  • LIKE
  • ANDOR
  • NOT

1.3 限制

filter pushdown不支持复杂类型:array, map, struct, 比如

scala

1
df.filter($"address.city" === "Milpitas")

java

1
df.filter(col("address.city").equalTo("Milpitas"));

projection pushdown也不支持这些复杂类型, 比如

scala

1
ds.select($"hobbies" (0))

java

1
df.select(col("hobbies").getItem(0));

但是spark3.0进行了一些改进。

2. databricks关于FilterPushdown的例子

databricks有一些例子How logical plan optimizations work in Catalyst

2.1 more interesting example

创建两个DataFrame

1
2
3
4
5
val items = Seq((0, "Macbook Pro", 1999.0), (1, "Macbook Air", 1500.0), (2, "iPad Air", 1200.0)).toDF("id", "name", "price")
val orders = Seq((100, 0, 1), (100, 1, 1), (101, 2, 3)).toDF("id", "itemid", "count")

items.createOrReplaceTempView("item")
orders.createOrReplaceTempView("order")

然后搞一个简单的join以及filter

1
2
3
4
SELECT order.id, item.name, item.price, order.count
FROM item
JOIN order
WHERE item.id = order.itemid and item.price < 1400 and order.count > 2 - 1

2.1.1 analyzed plan

然后看analyzed plan

1
val analyzedPlan = sql("SELECT order.id, item.name, item.price, order.count FROM item JOIN order WHERE item.id = order.itemid and item.price < 1400 and order.count > 2 - 1").queryExecution.analyzed

结果为

1
2
3
4
5
6
7
8
9
10
analyzedPlan: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan = 
Project [id#15942, name#15929, price#15930, count#15944]
+- Filter (((id#15928 = itemid#15943) && (price#15930 < cast(1400 as double))) && (count#15944 > (2 - 1)))
+- Join Inner
:- SubqueryAlias item
: +- Project [_1#15924 AS id#15928, _2#15925 AS name#15929, _3#15926 AS price#15930]
: +- LocalRelation [_1#15924, _2#15925, _3#15926]
+- SubqueryAlias order
+- Project [_1#15938 AS id#15942, _2#15939 AS itemid#15943, _3#15940 AS count#15944]
+- LocalRelation [_1#15938, _2#15939, _3#15940]

这里边FilterJoin的父节点,意味着filter条件里的item.price < 1400 and order.count > 2 - 1是在join之后才被执行。

2.1.2 optimized plan

看optimized plan

1
2
// Apply Spark SQL optimizations
val optimizedPlan = SimpleTestOptimizer.execute(analyzedPlan)

结果为

1
2
3
4
5
6
7
8
9
optimizedPlan: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan = 
Project [id#15942, name#15929, price#15930, count#15944]
+- Join Inner, (id#15928 = itemid#15943)
:- Project [_1#15924 AS id#15928, _2#15925 AS name#15929, _3#15926 AS price#15930]
: +- Filter (_3#15926 < 1400.0)
: +- LocalRelation [_1#15924, _2#15925, _3#15926]
+- Project [_1#15938 AS id#15942, _2#15939 AS itemid#15943, _3#15940 AS count#15944]
+- Filter (_3#15940 > 1)
+- LocalRelation [_1#15938, _2#15939, _3#15940]

这里可以看到几点变化:

  1. Filter被下推,直接作用于LocalRelation。 适用的规则为PushDownPredicate
  2. 2 - 1 被替换成了1。 适用的规则为ConstantFolding
  3. SubqueryAlia节点被直接优化掉了
  4. cast(1400 as double)被换成了1400.0

2.1.3 Write rules for logical plan

Catalyst operates with logical plans and expressions, even attribute is an expression. Below are some examples how to convert one expression into another another using rules. Each logical plan is essentially a bunch of QueryPlan[LogicalPlan] instances and Expression expressions. Some examples of logical plans are Project, Generate, Filter, etc.

翻译一下:

Catalyst操作的对象是_logical plans_以及_expressions_, 即使是attribute也是一种expression。下边是一些使用规则(rule)将一个表达式转换成另一个的例子。每个logical plan本质是就是一组QueryPlan[LogicalPlan]实例和Expression表达式。logical plan的例子包括Project, Generate, Filter等。

这里有一些东东不大明白:

  1. QueryPlan[LogicalPlan]是个什么东东?
  2. plan这个概念和expression有啥区别?

继续往下看

2.1.3.1 expression的转换

1
2
3
4
5
6
7
8
// Simple expression transform
val add = Add(Literal(2), Literal(3))

val subtract = add transform {
case Add(left, right) => Subtract(left, right)
}
add: org.apache.spark.sql.catalyst.expressions.Add = (2 + 3)
subtract: org.apache.spark.sql.catalyst.expressions.Expression = (2 - 3)

Analyzer is built from rules, each rule is essentially one operation that takes logical plan and returns logical plan with very minimal change, hopefully better change. See example below:

是说rule就是一个operator,输入和输出都是logical plan, 不过这俩logical plan会有些许不同,目的是进行一些优化。

transform方法的定义为

1
2
3
def transform(rule: PartialFunction[BaseType, BaseType]): BaseType = {
transformDown(rule)
}

这里的

1
2
3
{
case Add(left, right) => Subtract(left, right)
}

就是一个rule。addsubtract就是两个Expression。这里通过一个自己实现的rule,将Add表达式转成了Subtract表达式。

2.1.3.2 TreeNode

transformAdd继承自TreeNode的方法。

这里TreeNode的类型参数有点绕。

1
2
3
abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product {
// scalastyle:on
self: BaseType =>

在这里要注意两点

  1. TreeNode是一个invariant class,不是协变类,也不是逆变类。

  2. TreeNode要求其自身是一个BaseType。也就是说, BaseType实际上就是指它自己或它的子类。这样在定义方法的时候就可以用到这个限制。比如,TreeNode的foreach方法这么定义

    1
    2
    3
    4
    5
    6
    7
    8
    /**
    * Runs the given function on this node and then recursively on [[children]].
    * @param f the function to be applied to each node in the tree.
    */
    def foreach(f: BaseType => Unit): Unit = {
    f(this)
    children.foreach(_.foreach(f))
    }

    这里之所以可以f(this), 就是因为self: BaseType =>这个自身类型限制。如果去掉自身类型的限制,那么就f(this)就不合语法了。

实际上它的直接子类都这样类义的:

Expression

1
abstract class Expression extends TreeNode[Expression]

Block

1
trait Block extends TreeNode[Block] with JavaCode

QueryPlan

1
2
abstract class QueryPlan[PlanType <: QueryPlan[PlanType]] extends TreeNode[PlanType] {
self: PlanType =>

这样写的话,Expression继承的TreeNode的方法,如果参数类型是BaseType,那么就只能传进去Expression的子类,而不能传进去Block的子类。

这样是不可以的

1
2
3
val plan3 = add transform {
case foo: LogicalPlan => foo
}

就会报错

pattern type is incompatible with expected type;
found : org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
required: org.apache.spark.sql.catalyst.expressions.Expression
case foo: LogicalPlan => foo

也就是说add接受的transform实际上的类型要求是PartialFunction[Expression, Expression] (这里要注意PartitalFunction的型变)。

这么搞,有利于递归调用rule的时候避免错误匹配,比如一个用于Expression的规则被误用到了一个LogicalPlan,就要出问题了。

2.1.4 Existing filter optimizations

1
2
3
4
5
6
val logicalPlan = LocalRelation(int('a), str('b)).
select('a).
where(GreaterThan(Add('a, Literal(1)), Literal(2)))

val analyzedPlan = logicalPlan.analyze
val optimizedPlan = SimpleTestOptimizer.execute(analyzedPlan)

结果为

1
2
3
4
5
6
7
8
9
10
11
12
13
14
logicalPlan: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan = 
'Filter (('a + 1) > 2)
+- 'Project ['a]
+- LocalRelation <empty>, [a#21651, b#21652]

analyzedPlan: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =
Filter ((a#21651 + 1) > 2)
+- Project [a#21651]
+- LocalRelation <empty>, [a#21651, b#21652]

optimizedPlan: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =
Project [a#21651]
+- Filter (isnotnull(a#21651) && ((a#21651 + 1) > 2))
+- LocalRelation <empty>, [a#21651, b#21652]

这里可以看到((a#21651 + 1) > 2))并非最优的形式。所以,下面自己创建一个rule来解析这个表达式。

2.1.5 Optimize filter folding

创建一个Rule,叫做SimpleFilterFolding。代码为

1
2
3
4
5
6
7
8
9
10
11
// Let's call our rule SimpleFilterFolding
object SimpleFilterFolding extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
// We take logical plan and only apply our rule when we encounter filter with a simple `add` condition
case filter @ Filter(condition, _) => filter transformExpressionsUp {
// What we need to do is replacing our filter where `expr` is greater than right side - literal
case GreaterThan(Add(expr, literal: Literal), right) =>
GreaterThan(expr, Subtract(right, literal))
}
}
}

然后把这个rule注册到我们的optimizer中。

1
2
3
4
5
6
7
object SimpleOptimizer extends RuleExecutor[LogicalPlan] {
val batches = Batch("Filter folding", Once,
SimpleFilterFolding) :: Nil
}

// We take analyzed plan and run through Optimizer object
val optimizedPlan = SimpleOptimizer.execute(analyzedPlan)

结果为

1
2
3
4
5
defined module SimpleOptimizer
optimizedPlan: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =
Filter (a#21651 > (2 - 1))
+- Project [a#21651]
+- LocalRelation <empty>, [a#21651, b#21652]

这里(2 - 1)并没有被优化掉。但是已有rule可以做这件事,就是叫做ConstantFolding的rule,现在把它跟我们的rule一起使用。

1
2
3
4
5
6
7
object SimpleOptimizer extends RuleExecutor[LogicalPlan] {
val batches = Batch("Filter folding", Once,
SimpleFilterFolding, ConstantFolding) :: Nil
}

// We take analyzed plan and run through Optimizer object
val optimizedPlan = SimpleOptimizer.execute(analyzedPlan)

这样结果就对了

1
2
3
4
5
defined module SimpleOptimizer
optimizedPlan: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =
Filter (a#21651 > 1)
+- Project [a#21651]
+- LocalRelation <empty>, [a#21651, b#21652]

Spark 3.0的在filter pushdown方面有了进步,这个下个blog再研究。

参考资料

如何得到一个空的Hadoop Configuration

都怪JDK, File类的delete()方法竟然不能删除非空的目录。所以,我就想用Hadoop的API,于是有了下面一段程序

1
2
3
4
5
6
7
val fileSystem = FileSystem.newInstance(new Configuration())
val warehousePath = new Path("spark-warehouse")
if(fileSystem.exists(warehousePath))
fileSystem.delete(warehousePath, true)
val metastoreDB = new Path("metastore_db")
if (fileSystem.exists(metastoreDB))
fileSystem.delete(metastoreDB)

问题是,new Configuration()默认会从classpath里找到core-site.xmlcore-default.xml来加载,所以我想,万一以后不小心把这些文件加到classpath里呢?比如哪天我想要测试连接别的机器上的HDFS。还好,Configuration类有个方法来禁止对这俩文件的加载, 正如这个类的注释所说的

1
2
3
4
Unless explicitly turned off, Hadoop by default specifies two resources, loaded in-order from the classpath:

1. `core-default.xml`: Read-only defaults for hadoop.
2. `core-site.xml`: Site-specific configuration for a given hadoop installation.

我搞了个core-site.xml到classpath下,于是这段代码就会报错说

in thread "main" java.lang.IllegalArgumentException: java.net.UnknownHostException: cdh
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
	at org.apache.hadoop.security.SecurityUtil.buildTokenService(SecurityUtil.java:378)
at org.apache.hadoop.hdfs.NameNodeProxies.createNonHAProxy(NameNodeProxies.java:320)
at org.apache.hadoop.hdfs.NameNodeProxies.createProxy(NameNodeProxies.java:176)
at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:678)
at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:619)
at org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:149)
at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2669)
at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:94)
at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2703)
at org.apache.hadoop.fs.FileSystem$Cache.getUnique(FileSystem.java:2691)
at org.apache.hadoop.fs.FileSystem.newInstance(FileSystem.java:420)
at org.apache.hadoop.fs.FileSystem.newInstance(FileSystem.java:428)
at sleepy.spark.SparkHiveExample$.main(SparkHiveExample.scala:45)
at sleepy.spark.SparkHiveExample.main(SparkHiveExample.scala)
Caused by: java.net.UnknownHostException: cdh
... 14 more

看起来程序去连接外部的HDFS时,发现无法识别cdh, 实际上它也并非是域名,而是dfs.nameservices的值。

好的,那就用Configuration(boolean)这个构造器, 这的文档是这样说的

A new configuration where the behavior of reading from the default resources can be turned off. If the parameter loadDefaults is false, the new instance will not load resources from the default files.

但是呢,执行的时候仍然在报错

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
Exception in thread "main" java.io.IOException: failure to login
at org.apache.hadoop.security.UserGroupInformation.loginUserFromSubject(UserGroupInformation.java:841)
at org.apache.hadoop.security.UserGroupInformation.getLoginUser(UserGroupInformation.java:777)
at org.apache.hadoop.security.UserGroupInformation.getCurrentUser(UserGroupInformation.java:650)
at org.apache.hadoop.fs.FileSystem$Cache$Key.<init>(FileSystem.java:2828)
at org.apache.hadoop.fs.FileSystem$Cache.getUnique(FileSystem.java:2690)
at org.apache.hadoop.fs.FileSystem.newInstance(FileSystem.java:420)
at org.apache.hadoop.fs.FileSystem.newInstance(FileSystem.java:428)
at sleepy.spark.SparkHiveExample$.main(SparkHiveExample.scala:47)
at sleepy.spark.SparkHiveExample.main(SparkHiveExample.scala)
Caused by: javax.security.auth.login.LoginException: java.lang.IllegalArgumentException: Illegal principal name foo@FOO.COM: org.apache.hadoop.security.authentication.util.KerberosName$NoMatchingRule: No rules applied to foo@FOO.COM
at org.apache.hadoop.security.UserGroupInformation$HadoopLoginModule.commit(UserGroupInformation.java:201)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at javax.security.auth.login.LoginContext.invoke(LoginContext.java:755)
at javax.security.auth.login.LoginContext.access$000(LoginContext.java:195)
at javax.security.auth.login.LoginContext$4.run(LoginContext.java:682)
at javax.security.auth.login.LoginContext$4.run(LoginContext.java:680)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.login.LoginContext.invokePriv(LoginContext.java:680)
at javax.security.auth.login.LoginContext.login(LoginContext.java:588)
at org.apache.hadoop.security.UserGroupInformation.loginUserFromSubject(UserGroupInformation.java:815)
... 8 more
Caused by: java.lang.IllegalArgumentException: Illegal principal name foo@FOO: org.apache.hadoop.security.authentication.util.KerberosName$NoMatchingRule: No rules applied to foo@FOO
at org.apache.hadoop.security.User.<init>(User.java:51)
at org.apache.hadoop.security.User.<init>(User.java:43)
at org.apache.hadoop.security.UserGroupInformation$HadoopLoginModule.commit(UserGroupInformation.java:199)
... 20 more
Caused by: org.apache.hadoop.security.authentication.util.KerberosName$NoMatchingRule: No rules applied to foo@FOO
at org.apache.hadoop.security.authentication.util.KerberosName.getShortName(KerberosName.java:400)
at org.apache.hadoop.security.User.<init>(User.java:48)
... 22 more

究其原因是在初始化FileSystem的时候,化调用到UserGroupInformation#ensureIntialized()

1
2
3
4
5
6
7
8
9
private static void ensureInitialized() {
if (conf == null) {
synchronized(UserGroupInformation.class) {
if (conf == null) { // someone might have beat us
initialize(new Configuration(), false);
}
}
}
}

这里直接调用了new Configuration, 而这个对象是加载了classpath里的配置文件的。

所以,直接给UGI指定个configuration就行了

1
2
3
val conf = new Configuration(false)
UserGroupInformation.setConfiguration(conf)
val fileSystem = FileSystem.newInstance(conf)

mio::poll文档 —— 翻译和注解

翻译自mio文档: Poll

Struct mio::Poll

1
pub struct Poll { /* fields omitted */ }

Polls for readiness events on all registered values.

Poll allows a program to monitor a large number of Evented types, waiting until one or more become “ready” for some class of operations; e.g. reading and writing. An Evented type is considered ready if it is possible to immediately perform a corresponding operation; e.g. read or write.

To use Poll, an Evented type must first be registered with the Poll instance using the register method, supplying readiness interest. The readiness interest tells Poll which specific operations on the handle to monitor for readiness. A Token is also passed to the register function. When Poll returns a readiness event, it will include this token. This associates the event with the Evented handle that generated the event.

Poll用于从所有注册的value里poll处于就绪状态的事件。这个跟Java里NIO里的概念可以类比下:

  • Selector <-> Poll。可以把Evented注册到Poll上,使用它来监听Evented有关的IO事件。是实现异步IO的关键组件。
  • Evented <-> SelectionKey。可以通过SelectionKey获取底层的channel进行读写。Evented可以直接进行读写。
  • Token <-> SelectionKey里attach的对象。用于将事件跟Evented关联起来。
阅读更多

关于reborrow的一个复杂的例子

在查找关于Rust的reborrow的语法时,发现这么一篇文章Stuff the Identity Function Does (in Rust)。然后……看不懂,《Programming Rust》快看完了,这篇文章还是看不懂。
但是有很多不懂之处的文章,往往是最值得读的,因为它提供了一个线索,能把遗漏的知识串连起来,这是很难得的。

还好有Google, 一路搜索过来,大体也搞清楚了。

例子

文章里例子是这样的。有一个递归的数据结构,List:

1
2
3
struct List {
next: Option<Box<List>>,
}

写一个函数来遍历它

1
2
3
4
5
6
7
8
9
10
11
impl List {
fn walk_the_list(&mut self) {
let mut current = self;
loop {
match current.next {
None => return,
Some(ref mut inner) => current = inner,
}
}
}
}

可以在Rust的playgroud里测试一下。你会发现这段代码是通不过编译的。
问题在哪呢?

实际上这短短一段代码使用了很多隐晦的语法。

阅读更多