【CMU15-445 Fall2023】Project3 Query Execution 小结

该系列博客只是为了记录自己在写Lab时的思路,按照课程要求不会在Github和博客中公开源代码。欢迎与我一起讨论交流!

这个project和之前就不一样了,开始深入数据库内核的实现了。需要理清楚一条sql语句是如何被执行的,方才能写出代码。

前置奶酪

一条SQL语句的执行

这里需要去看看一条sql语句传入bustub内部之后的代码:src/common/bustub_instance.cpp:ExecuteSqlTxn

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
auto BustubInstance::ExecuteSqlTxn(const std::string &sql, ResultWriter &writer, Transaction *txn,
std::shared_ptr<CheckOptions> check_options) -> bool {
if (!sql.empty() && sql[0] == '\\') {
// 处理元命令
...
}

// binder,但是在其中会使用libpg_query来解析sql语句
bustub::Binder binder(*catalog_);
binder.ParseAndSave(sql);

// 经过上一步后,binder中的statement_nodes_存储着所有的语句解析节点
for (auto *stmt : binder.statement_nodes_) {
// 将stmt转换成BoundStatement对象,方便后面处理数据
auto statement = binder.BindStatement(stmt);

// 只有不需要构建plan树、不需要进行优化的sql语句才会在switch之后继续执行
switch (statement->type_) {
...
}

// 生成初步的执行计划
bustub::Planner planner(*catalog_);
planner.PlanQuery(*statement);

// 优化刚刚的执行计划
bustub::Optimizer optimizer(*catalog_, IsForceStarterRule());
auto optimized_plan = optimizer.Optimize(planner.plan_);

...

// 执行优化后的plan,这里会使用火山模型去根据下面节点的Next函数来执行相应的算子
execution_engine_->Execute(optimized_plan, &result_set, txn, exec_ctx.get());

// 将执行结果输出至指定位置
...
}
return 是否执行成功;
}

在binder之后,我们就有了一条sql的语句解析节点,例如执行select * from (select * from test_2 where colA > 10) where colB > 2;,其statement node如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
BoundSelect {
table=BoundSubqueryRef {
alias=__subquery#0,
subquery=BoundSelect {
table=BoundBaseTableRef { table=test_2, oid=23 },
columns=["test_2.colA", "test_2.colB", "test_2.colC"],
groupBy=[],
having=,
where=(test_2.colA>10),
limit=,
offset=,
order_by=[],
is_distinct=false,
ctes=,
},
columns=["test_2.colA", "test_2.colB", "test_2.colC"],
},
columns=["__subquery#0.test_2.colA", "__subquery#0.test_2.colB", "__subquery#0.test_2.colC"],
groupBy=[],
having=,
where=(__subquery#0.test_2.colB>2),
limit=,
offset=,
order_by=[],
is_distinct=false,
ctes=,
}

其中的子查询和where的条件还有需要哪些列都能非常清楚的看到。

Iterator Model

通常一个 SQL 会被组织成树状的查询计划,数据从叶子节点流到根节点,查询结果在根节点中得出。

bustub中采用的数据库查询执行模型叫做迭代器模型,也叫火山模型。

查询计划(query plan)中的每步operator对应的executor都实现一个next函数,每次调用时,operator返回一个tuple或者 null,后者表示数据已经遍历完毕。operator 本身实现一个循环,每次调用其 child operators 的 next 函数,从它们那边获取下一条数据供自己操作,这样整个 query plan 就被从上至下地串联起来。

但是像Joins, Aggregates, Subqueries, Order By这样的操作需要等所有children返回它们的tuple。虽然一次调用请求一条数据,占用内存较小,但函数调用开销大。

Catalog, Table and Index

下图出处:https://www.cnblogs.com/joey-wang/p/17351258.html

索引index

在Bustub中,索引用于加速数据访问。索引通过维护表中数据的有序结构,使得查询可以更快地定位到所需的记录。

结构

索引的结构图和上面表的结构图类似。在catalog中,可以获取到一个表对应的所有IndexInfo,每个IndexInfo中包含着这个索引的信息,这里讲两个个我认为比较重要的成员变量:

  • key_schema_:索引对应的列的结构,例如使用其ToString()函数时,其会返回(添加了索引的列的名称:该列的数据类型)
  • index_:这是一个指针,指向一个Index类的对象,也就是真正的索引
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
// catalog.h

class Catalog {
public:
template <class KeyType, class ValueType, class KeyComparator>
auto CreateIndex(Transaction *txn, const std::string &index_name, const std::string &table_name, const Schema &schema,
const Schema &key_schema, const std::vector<uint32_t> &key_attrs, std::size_t keysize,
HashFunction<KeyType> hash_function, bool is_primary_key = false,
IndexType index_type = IndexType::HashTableIndex) -> IndexInfo *;
auto GetIndex(const std::string &index_name, const std::string &table_name) -> IndexInfo *;
auto GetIndex(const std::string &index_name, const table_oid_t table_oid) -> IndexInfo *;
auto GetIndex(index_oid_t index_oid) -> IndexInfo *;
auto GetTableIndexes(const std::string &table_name) const -> std::vector<IndexInfo *>;
...

private:
...

/**
* Map index identifier -> index metadata.
*
* NOTE: that `indexes_` owns all index metadata.
*/
std::unordered_map<index_oid_t, std::unique_ptr<IndexInfo>> indexes_;

/** Map table name -> index names -> index identifiers. */
std::unordered_map<std::string, std::unordered_map<std::string, index_oid_t>> index_names_;

/** The next index identifier to be used. */
std::atomic<index_oid_t> next_index_oid_{0};
};

struct IndexInfo {
...
/** The schema for the index key */
Schema key_schema_;
/** The name of the index */
std::string name_;
/** An owning pointer to the index */
std::unique_ptr<Index> index_;
/** The unique OID for the index */
index_oid_t index_oid_;
/** The name of the table on which the index is created */
std::string table_name_;
/** The size of the index key, in bytes */
const size_t key_size_;
/** Is primary key index? */
bool is_primary_key_;
/** The index type */
[[maybe_unused]] IndexType index_type_{IndexType::BPlusTreeIndex};
};

Index中有着三个虚函数供其派生类去实现,其唯一的成员变量的类型为IndexMeta,用来存储一些元信息,例如这个索引的名称,它所属的表的名称,最重要的还有一个key_attrs_,稍后就说谈论它。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// index.h

class IndexMeta {
...
private:
/** The name of the index */
std::string name_;
/** The name of the table on which the index is created */
std::string table_name_;
/** The mapping relation between key schema and tuple schema */
const std::vector<uint32_t> key_attrs_;
/** The schema of the indexed key */
std::shared_ptr<Schema> key_schema_;
/** Is primary key? */
bool is_primary_key_;
};

class Index {
...
private:
/** The Index structure owns its metadata */
std::unique_ptr<IndexMetadata> metadata_;
};

fall2023我们使用的是哈希索引,底层使用的就是在project2中实现的可拓展哈希。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// extendible_hash_table_index.h

#define HASH_TABLE_INDEX_TYPE ExtendibleHashTableIndex<KeyType, ValueType, KeyComparator>

template <typename KeyType, typename ValueType, typename KeyComparator>
class ExtendibleHashTableIndex : public Index {
public:
ExtendibleHashTableIndex(std::unique_ptr<IndexMetadata> &&metadata, BufferPoolManager *buffer_pool_manager,
const HashFunction<KeyType> &hash_fn);

~ExtendibleHashTableIndex() override = default;

auto InsertEntry(const Tuple &key, RID rid, Transaction *transaction) -> bool override;

void DeleteEntry(const Tuple &key, RID rid, Transaction *transaction) override;

void ScanKey(const Tuple &key, std::vector<RID> *result, Transaction *transaction) override;

protected:
// comparator for key
KeyComparator comparator_;
// container
DiskExtendibleHashTable<KeyType, ValueType, KeyComparator> container_;
};

更新索引

当插入新记录时,不仅需要将记录插入到表中,还需要将相应的索引条目插入到索引中。这样,后续的查询操作可以利用索引快速定位到目标记录。如果不更新索引,后续的查询操作可能会错过新插入的记录,导致查询结果不准确。

bustub中有哈希索引和B+Tree索引,fall2023版本使用的是可拓展哈希作为作为索引。不过这两个具体的实现都有一个基类Index,其中有以下虚函数需要子类去实现:

  • InsertEntry(const Tuple &key, RID rid, Transaction *transaction): 插入一个索引条目。
  • DeleteEntry(const Tuple &key, RID rid, Transaction *transaction): 删除一个索引条目。
  • ScanKey(const Tuple &key, std::vector<RID> *result, Transaction *transaction): 根据索引键搜索记录,并将结果RID存储在指定的向量中。

所以不管用的是哈希还是B+Tree,在操作索引时用的接口都相同。

如何理解索引

就如网上很多介绍索引的博客所描述的那样,数据库索引是用来加速检索速度的,就如同新华字典中的音节索引一样:

如同table_info,catalog中也有许多的index_info,每个index_info就如同上图音节表中的一个字母。我们对一个字段(列)构建一个索引,就如同在上图中音节表中多加一个字母(例如X)。

  • 需要插入一条记录时,就往对应的索引下插入(记录, 对应记录的地址)这样的键值对,例如上图的(xian, 519),这里的地址为RID
  • 需要删除一条记录时,在对应的索引下删掉匹配的键值对。
  • 需要更新一条记录时,由于bustub没有提供更新索引的API,所以可以用先删除再插入的方式模拟更新。

执行器如何使用索引获取数据

当执行器需要从表中获取数据时,如果查询计划中包含索引扫描操作,执行器会通过索引来快速定位数据。以下是具体的步骤:

  1. 解析查询计划
    • 执行器根据查询计划确定需要使用的索引。
    • 获取索引的元数据,包括索引键的模式和表列的映射关系。
  2. 构建索引键
    • 根据查询条件和索引的元数据,构建索引键。这通常涉及到从查询条件中提取列值,并根据索引键的模式进行转换。
  3. 使用索引进行搜索
    • 调用索引的 ScanKey 方法,传入构建好的索引键和一个结果RID向量。
    • 索引会根据键值查找对应的记录,并将找到的RID存储在结果向量中。
  4. 读取数据页
    • 使用结果向量中的RID,从缓冲池中查找对应的页。如果页不在缓冲池中,则从磁盘加载到缓冲池。
    • 从页中读取数据并创建 Tuple 对象。
  5. 处理和返回结果
    • 使用 Tuple 对象的方法(如 GetValueIsNull 等)访问和处理元组中的数据。
    • 将处理后的数据作为结果返回给用户或进一步处理。

谓词下推

谓词下推(Predicate Pushdown)是数据库查询优化中的一种技术,其核心思想是将查询中的过滤条件(即谓词)尽可能早地应用到查询执行计划的底部,也就是数据生成的地方。这样做的目的是为了减少数据的传输量和处理量,从而提高查询效率。

具体来说,谓词下推包括以下几个方面:

  1. 过滤条件前移:在查询执行的过程中,尽早地对数据进行过滤,这样不需要将所有数据都传递到上层操作中,只传递满足条件的数据。
  2. 减少数据传输:通过在数据生成的阶段就进行过滤,可以减少从数据库存储引擎到查询处理引擎之间的数据传输量。
  3. 减少CPU处理:不需要对所有数据进行后续的处理,只需要处理已经过滤的数据,这样可以减少CPU的工作量。
  4. 利用索引:如果过滤条件可以利用现有的索引,谓词下推可以使得查询直接利用索引来快速定位数据,而不是扫描整个表。
  5. 优化查询计划:数据库优化器会根据谓词下推的原则重新规划查询的执行步骤,生成更高效的查询计划。

例如:

1
SELECT * FROM employees WHERE department_id = 5 AND salary > 50000;

在这个查询中,WHERE子句包含了两个过滤条件。如果不进行谓词下推,数据库可能会先扫描整个employees表,然后将所有行传递给上层操作,之后再应用过滤条件。而通过谓词下推,数据库可以在扫描表的时候直接应用这些过滤条件,只返回部门ID为5且薪资大于50000的员工记录。

谓词下推是数据库查询优化中非常重要的一环,它有助于提高查询性能,特别是在处理大规模数据集时。数据库优化器会尝试自动应用谓词下推,但有时开发者也可以通过编写更优化的查询条件来帮助优化器更好地进行谓词下推。

Task1 - Access Method Executors

SeqScan

顺序扫描指定的表,表的遍历可以使用TableIterator

每次找到一条没有被标记为“删除”或者不是where之类的过滤子句匹配(这里会在delete操作中说明)的tuple(记录)就并返回,如果已经扫描到了表的结束位置则返回false。

Insert

为什么Insert等Executor有child而SeqScan没有?

InsertExecutor 的主要职责是将一条或多条记录插入到指定的表中。它可能需要依赖于其他 Executor 来获取要插入的数据。例如,如果 INSERT 操作是从一个 SELECT 查询的结果集中插入数据,那么 InsertExecutor 可能会有一个子 Executor(如 SeqScanExecutor 或其他类型的 Executor),该子 Executor 负责执行 SELECT 操作并提供数据给 InsertExecutor

因此,InsertExecutor 有 child 是因为它可能需要从另一个查询的结果中获取数据。

SeqScanExecutor的主要职责是对表进行全表扫描,即按顺序读取表中的所有记录。这是一个基本的操作,通常不需要其他 Executor 的支持来完成其工作。

它直接作用于存储层,遍历表中的每一行数据,因此没有子 Executor。它的任务相对简单,就是遍历和返回表中的所有记录。

简而言之,InsertExecutor 需要 child 是因为它的操作可能涉及从其他查询结果中获取数据,而 SeqScanExecutor 不需要 child 是因为它的任务是独立完成的,只需遍历表中的所有记录即可。这反映了数据库执行计划中不同操作之间的依赖关系和交互方式。其他Executor同理。

举个批量插入的🌰:

假设我们有一个 orders 表,包含以下列:

  • order_id (主键)
  • customer_id
  • product_id
  • quantity
  • order_date

我们希望通过一个子查询(select)来获取一批订单记录,并将这些记录插入到 orders 表中:

1
2
3
4
INSERT INTO orders (customer_id, product_id, quantity, order_date)
SELECT customer_id, product_id, quantity, order_date
FROM pending_orders
WHERE status = 'approved';

很明显,我们在插入之前要从select子句中获取数据,因此这个子查询操作就是insert操作的child_executor

没有子操作时,需要插入的数据从哪里获取?

比如执行如下SQL时:

1
insert into test_1 values (202, 1, 2, 3);

从肉眼看可以知道需要插入的数据为(202, 1, 2, 3),但是在代码中又是从哪里获取的呢?

让我们使用一下explain工具来看看这条SQL语句在bustub内部做了什么:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
bustub> explain insert into test_1 values (202, 1, 2, 3);
=== BINDER ===
BoundInsert {
table=BoundBaseTableRef { table=test_1, oid=22 },
select= BoundSelect {
table=BoundExpressionListRef { identifier=__values#0, values=[["202", "1", "2", "3"]] },
columns=["__values#0.0", "__values#0.1", "__values#0.2", "__values#0.3"],
groupBy=[],
having=,
where=,
limit=,
offset=,
order_by=[],
is_distinct=false,
ctes=,
}
}
=== PLANNER ===
Insert { table_oid=22 } | (__bustub_internal.insert_rows:INTEGER)
Projection { exprs=["#0.0", "#0.1", "#0.2", "#0.3"] } | (__values#0.0:INTEGER, __values#0.1:INTEGER, __values#0.2:INTEGER, __values#0.3:INTEGER)
Values { rows=1 } | (__values#0.0:INTEGER, __values#0.1:INTEGER, __values#0.2:INTEGER, __values#0.3:INTEGER)
=== OPTIMIZER ===
Insert { table_oid=22 } | (__bustub_internal.insert_rows:INTEGER)
Values { rows=1 } | (__values#0.0:INTEGER, __values#0.1:INTEGER, __values#0.2:INTEGER, __values#0.3:INTEGER)

将目标聚焦在生成的查询计划上,从上到下,在一个Insert的查询计划中,使用Projection从其输入源(如表扫描、索引扫描、连接等)中提取所需的列,最下层使用Value获取到要操作的数据!

所以对应的,InsertExecutorchild_executorProjectionExecutor,而ProjectionExecutorchild_executorValuesExecutor,使用迭代器模型就能很方便的获取到数据了(ValuesExecutor就是最后的Executor,其Next函数不会再往下调用,其所做的只是根据在解析SQL及其之后的一些步骤中得到的需要操作的数据封装成一个tuple进行返回)。

Delete

需要写的代码和insert操作的基本相同。但有个地方需要注意一下,在执行一条delete语句时,让我们看看做了些什么:

1
2
3
4
5
6
7
8
9
10
bustub> explain delete from test_1 where colA = 999;
=== BINDER ===
Delete { table=BoundBaseTableRef { table=test_1, oid=22 }, expr=(test_1.colA=999) }
=== PLANNER ===
Delete { table_oid=22 } | (__bustub_internal.delete_rows:INTEGER)
Filter { predicate=(#0.0=999) } | (test_1.colA:INTEGER, test_1.colB:INTEGER, test_1.colC:INTEGER, test_1.colD:INTEGER)
SeqScan { table=test_1 } | (test_1.colA:INTEGER, test_1.colB:INTEGER, test_1.colC:INTEGER, test_1.colD:INTEGER)
=== OPTIMIZER ===
Delete { table_oid=22 } | (__bustub_internal.delete_rows:INTEGER)
SeqScan { table=test_1, filter=(#0.0=999) } | (test_1.colA:INTEGER, test_1.colB:INTEGER, test_1.colC:INTEGER, test_1.colD:INTEGER)

可以看到,在optimizer阶段,where子句的filter下放至SeqScan处与其合并了,也就是说,我们需要在实现SeqScanExecutor时注意处理一下filter。这里提示一下:

1
2
while (cur_tuple.first.is_deleted_ ||
(plan_->filter_predicate_ && !(plan_->filter_predicate_->Evaluate(tuple, GetOutputSchema()).GetAs<bool>())))

如果其返回true,说明filter匹配到了数据(就如例子中匹配到了colA列为999的tuple),如果此时这个tuple没有被标记为删除,那么就说明找到了我们需要删除的tuple。

Update

我们如何知道update需要更新的数据从哪里取呢?

1
2
3
4
5
6
7
8
bustub> explain(p, o) update test_1 set colB = 15445;
=== PLANNER ===
Update { table_oid=22, target_exprs=["#0.0", "15445", "#0.2", "#0.3"] }
Filter { predicate=true }
SeqScan { table=test_1 }
=== OPTIMIZER ===
Update { table_oid=22, target_exprs=["#0.0", "15445", "#0.2", "#0.3"] }
SeqScan { table=test_1, filter=true }

看看Update中,有一个target_exprs数组,这个数组可不就是我们的一行数据吗,并且是需要更新的那行数据:update语句可以不加where子句,这样就是选中表中的所有行,也就是这里将表中colB列的数据都update为15445!

对于target_exprs这个数组,我们可以通过plan_->target_expressions_获取,然后用其构建一个新的tuple。

需要注意的是,这里并没有提供直接更新tuple的操作,所以我们的update操作可以用先删除后插入的方式来模拟。

IndexScan

我们首先需要完成OptimizeSeqScanAsIndexScan这个优化步骤。

假设现在我们有一个表叫“test_1”,其列如下:

1
2
3
+-------------+-------------+-------------+-------------+
| test_1.colA | test_1.colB | test_1.colC | test_1.colD |
+-------------+-------------+-------------+-------------+

现在我们希望这条SQL能执行的更快:

1
select * from test_1 where colB = 11;

那比较不错的方法就是给colB列加上索引:

1
create index v1 on test_1(colB);

这样在执行时可以更快速的查找数据。

那么为了实现这一目标,我们需要通过OptimizeSeqScanAsIndexScan将plan树中的SeqScanPlanNode转换成IndexScanPlanNode,这样我们才能使用IndexScanPlanNode对应的算子—IndexScanExecutor去使用索引。但是由于bustub的一些设计,需要遵循以下规则才可以转化:

  1. 当前的节点的类型必须是PlanType::SeqScan
  2. 当前节点必须有filter谓词,如果只是select * from test_1;这样的是不需要使用索引的
  3. 当前表中必须有索引,没有索引还玩啥呢
  4. fileter谓词中的逻辑表达式只能有一个,并且其类型必须是ComparisonType::Equal(我想这里必须是“等于”是不是因为fall2023使用的索引是哈希索引)
  5. 在当前表的索引信息中找到与filter谓词相对应的索引后才能返回一个IndexScanPlanNode

select * from test_1 where colB = 11;中,加了索引后,最需要关注的就是where colB = 11这一个过滤条件。bustub中要求IndexScan过滤运算符必须是=,且只能有一个条件。如果colAcolB都是索引,然后执行select * from test_1 where colB = 11 and colA = 1;,这样是不会走索引优化的。

我们将查询计划中的过滤谓词转化成ComparisonExpression类型,据我的理解,其可通过GetChildAt函数获取比较谓词左边的列名表达式(即这里的“colB”,有了这个列名的表达式,我们就可以获取到这个列的col_id值)和右边的值(即“11”)。之后需要去这张表中的所有索引中去找是否有colB的索引,怎么确定是否有呢,那就要看这个表中的每个Indexkey_attrs_

Index中,key_attrs_决定了索引的关键字由哪些列组成,对应了每个列的下标。例如:

  • 如果表的 schema 定义有 5 列,分别为 A, B, C, D, E
  • 某个索引的 key_attrs_[0, 2],则表示该索引使用了第 0 列(A)和第 2 列(C)作为其关键字。

如果之前我们获取的列的col_id值和某个Indexkey_attrs_中的值相同,那么就存在相应的索引!这时构造一个IndexScanPlanNode返回即可(参考merge_filter_scan.cpp中是如何做的)。

当优化器成功更换节点后,在执行时就会走索引,其底层算子就会使用到IndexScanExecutor。到这里,这个算子需要做的事情就很简单了,就是调用哈希索引的ScanKey进行查找,不过这里有3点需要注意:

  1. 在project2中,我们实现的索引引擎只支持一个键对应一个值(不只是这个版本的可拓展哈希,其他版本中的B+Tree也只要求这样实现),也就是我们的这个算子在底层索引引擎不扩展的情况下最多查到一条记录,这样的话就可以在算子的Init函数中调用ScanKey查找记录就行。
  2. 可拓展哈希中是将一个Tuple的data转化一下当作key,所以在索引中,其是将索引列的值作为key,其对应的oid作为值。在使用ScanKey时,第一个参数需要的tuple将用查询计划节点IndexScanPlanNode中的pred_key_构造。
  3. 表中的tuple的元数据中,其is_deleted_可能为true,这说明这个tuple在逻辑上已经删除了,所以如果我们通过2中获取的oid对应的tuple是这种情况,就不用向上返回数据。

Task2 - Aggregation & Join Executors

Aggregation

分析一个例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
bustub> EXPLAIN SELECT MAX(colC), MIN(colB) FROM test_1 GROUP BY colA HAVING MAX(colB) > 10;
=== BINDER ===
BoundSelect {
table=BoundBaseTableRef { table=test_1, oid=22 },
columns=["max([\"test_1.colC\"])", "min([\"test_1.colB\"])"],
groupBy=["test_1.colA"],
having=(max(["test_1.colB"])>10),
where=,
limit=,
offset=,
order_by=[],
is_distinct=false,
ctes=,
}
=== PLANNER ===
Projection { exprs=["#0.2", "#0.3"] } | (<unnamed>:INTEGER, <unnamed>:INTEGER)
Filter { predicate=(#0.1>10) } | (test_1.colA:INTEGER, agg#0:INTEGER, agg#1:INTEGER, agg#2:INTEGER)
Agg { types=["max", "max", "min"], aggregates=["#0.1", "#0.2", "#0.1"], group_by=["#0.0"] } | (test_1.colA:INTEGER, agg#0:INTEGER, agg#1:INTEGER, agg#2:INTEGER)
SeqScan { table=test_1 } | (test_1.colA:INTEGER, test_1.colB:INTEGER, test_1.colC:INTEGER, test_1.colD:INTEGER)
=== OPTIMIZER ===
Projection { exprs=["#0.2", "#0.3"] } | (<unnamed>:INTEGER, <unnamed>:INTEGER)
Filter { predicate=(#0.1>10) } | (test_1.colA:INTEGER, agg#0:INTEGER, agg#1:INTEGER, agg#2:INTEGER)
Agg { types=["max", "max", "min"], aggregates=["#0.1", "#0.2", "#0.1"], group_by=["#0.0"] } | (test_1.colA:INTEGER, agg#0:INTEGER, agg#1:INTEGER, agg#2:INTEGER)
SeqScan { table=test_1 } | (test_1.colA:INTEGER, test_1.colB:INTEGER, test_1.colC:INTEGER, test_1.colD:INTEGER)

对于AggregationExecutor,我们可以获取到SQL语句中:

  1. 聚合操作的types(进行聚合操作的类型)和aggregates(需要聚合操作的列),二者一一对应
  2. 需要group by进行分组的列

再看看lecture中的这个例子,其对列cid使用group by进行分组,其中涉及的聚合操作为AVG,可转换成COUNTSUM操作。这里相当于:

1
2
types = ['count', 'sum']
aggregates = ['s.gpa', 's.gpa']

我的理解是根据group by的字段的值进行hash函数处理作为哈希表的键,例如图中的“15-445”,“15-826”等;然后哈希表的值为一个集合,这个集合的大小和typesaggregates的大小相同,并且对应的位置就为aggregates的值:例如图中键“15-445”的值中,第一个元素就为COUNT操作下s.gpa为15-445的个数。

在lab中需要实现countsummaxmin操作,其实就是在SimpleAggregationHashTable::CombineAggregateValues中实现对应的操作即可,本质上是对哈希表的几个很简单的操作。

aggregation通常需要对一组数据进行计算,这些计算具有以下特点:

  1. 需要完整输入:Aggregation 通常需要从下层拉取所有相关数据才能计算结果,例如计算 SUM 需要遍历所有行。
  2. 阻塞性:在传统实现中,Aggregation 算子通常被称为“阻塞算子”,因为它必须等待所有输入数据都拉取完成才能产出结果。这意味着 next() 调用会被延迟,直到聚合计算完成。

在我们的火山模型中 aggregation 是阻塞算子:

  • 当上层算子调用 next() 时,aggregation 会向下层算子连续调用 next(),直到拉取完全部数据并完成聚合。
  • 在数据尚未完全拉取并聚合完成之前,上层的 next() 调用无法直接返回结果。

需要注意的是:

  • SQL中进行group by后使用count,统计的是每组数据中的记录数,而非分组后新表的行数。
  • distinct其实就是对某个字段进行group by操作
  • count(*)统计null,而count(字段)不统计null

NestedLoopJoin

Inner Join:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
bustub> EXPLAIN SELECT * FROM __mock_table_1, __mock_table_3 WHERE colA = colE;
=== BINDER ===
BoundSelect {
table=BoundCrossProductRef { left=BoundBaseTableRef { table=__mock_table_1, oid=0 }, right=BoundBaseTableRef { table=__mock_table_3, oid=2 } },
columns=["__mock_table_1.colA", "__mock_table_1.colB", "__mock_table_3.colE", "__mock_table_3.colF"],
groupBy=[],
having=,
where=(__mock_table_1.colA=__mock_table_3.colE),
limit=,
offset=,
order_by=[],
is_distinct=false,
ctes=,
}
=== PLANNER ===
Projection { exprs=["#0.0", "#0.1", "#0.2", "#0.3"] } | (__mock_table_1.colA:INTEGER, __mock_table_1.colB:INTEGER, __mock_table_3.colE:INTEGER, __mock_table_3.colF:VARCHAR)
Filter { predicate=(#0.0=#0.2) } | (__mock_table_1.colA:INTEGER, __mock_table_1.colB:INTEGER, __mock_table_3.colE:INTEGER, __mock_table_3.colF:VARCHAR)
NestedLoopJoin { type=Inner, predicate=true } | (__mock_table_1.colA:INTEGER, __mock_table_1.colB:INTEGER, __mock_table_3.colE:INTEGER, __mock_table_3.colF:VARCHAR)
MockScan { table=__mock_table_1 } | (__mock_table_1.colA:INTEGER, __mock_table_1.colB:INTEGER)
MockScan { table=__mock_table_3 } | (__mock_table_3.colE:INTEGER, __mock_table_3.colF:VARCHAR)
=== OPTIMIZER ===
NestedLoopJoin { type=Inner, predicate=(#0.0=#1.0) } | (__mock_table_1.colA:INTEGER, __mock_table_1.colB:INTEGER, __mock_table_3.colE:INTEGER, __mock_table_3.colF:VARCHAR)
MockScan { table=__mock_table_1 } | (__mock_table_1.colA:INTEGER, __mock_table_1.colB:INTEGER)
MockScan { table=__mock_table_3 } | (__mock_table_3.colE:INTEGER, __mock_table_3.colF:VARCHAR)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
bustub> EXPLAIN SELECT * FROM __mock_table_1 INNER JOIN __mock_table_3 ON colA = colE;
=== BINDER ===
BoundSelect {
table=BoundJoin { type=Inner, left=BoundBaseTableRef { table=__mock_table_1, oid=0 }, right=BoundBaseTableRef { table=__mock_table_3, oid=2 }, condition=(__mock_table_1.colA=__mock_table_3.colE) },
columns=["__mock_table_1.colA", "__mock_table_1.colB", "__mock_table_3.colE", "__mock_table_3.colF"],
groupBy=[],
having=,
where=,
limit=,
offset=,
order_by=[],
is_distinct=false,
ctes=,
}
=== PLANNER ===
Projection { exprs=["#0.0", "#0.1", "#0.2", "#0.3"] } | (__mock_table_1.colA:INTEGER, __mock_table_1.colB:INTEGER, __mock_table_3.colE:INTEGER, __mock_table_3.colF:VARCHAR)
NestedLoopJoin { type=Inner, predicate=(#0.0=#1.0) } | (__mock_table_1.colA:INTEGER, __mock_table_1.colB:INTEGER, __mock_table_3.colE:INTEGER, __mock_table_3.colF:VARCHAR)
MockScan { table=__mock_table_1 } | (__mock_table_1.colA:INTEGER, __mock_table_1.colB:INTEGER)
MockScan { table=__mock_table_3 } | (__mock_table_3.colE:INTEGER, __mock_table_3.colF:VARCHAR)
=== OPTIMIZER ===
NestedLoopJoin { type=Inner, predicate=(#0.0=#1.0) } | (__mock_table_1.colA:INTEGER, __mock_table_1.colB:INTEGER, __mock_table_3.colE:INTEGER, __mock_table_3.colF:VARCHAR)
MockScan { table=__mock_table_1 } | (__mock_table_1.colA:INTEGER, __mock_table_1.colB:INTEGER)
MockScan { table=__mock_table_3 } | (__mock_table_3.colE:INTEGER, __mock_table_3.colF:VARCHAR)

可以看到,即便我们不加inner join,默认情况下也是使用的Inner Join:NestedLoopJoin中的type均为Inner

Left Join:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
bustub> EXPLAIN SELECT * FROM __mock_table_1 LEFT OUTER JOIN __mock_table_3 ON colA = colE;
=== BINDER ===
BoundSelect {
table=BoundJoin { type=Left, left=BoundBaseTableRef { table=__mock_table_1, oid=0 }, right=BoundBaseTableRef { table=__mock_table_3, oid=2 }, condition=(__mock_table_1.colA=__mock_table_3.colE) },
columns=["__mock_table_1.colA", "__mock_table_1.colB", "__mock_table_3.colE", "__mock_table_3.colF"],
groupBy=[],
having=,
where=,
limit=,
offset=,
order_by=[],
is_distinct=false,
ctes=,
}
=== PLANNER ===
Projection { exprs=["#0.0", "#0.1", "#0.2", "#0.3"] } | (__mock_table_1.colA:INTEGER, __mock_table_1.colB:INTEGER, __mock_table_3.colE:INTEGER, __mock_table_3.colF:VARCHAR)
NestedLoopJoin { type=Left, predicate=(#0.0=#1.0) } | (__mock_table_1.colA:INTEGER, __mock_table_1.colB:INTEGER, __mock_table_3.colE:INTEGER, __mock_table_3.colF:VARCHAR)
MockScan { table=__mock_table_1 } | (__mock_table_1.colA:INTEGER, __mock_table_1.colB:INTEGER)
MockScan { table=__mock_table_3 } | (__mock_table_3.colE:INTEGER, __mock_table_3.colF:VARCHAR)
=== OPTIMIZER ===
NestedLoopJoin { type=Left, predicate=(#0.0=#1.0) } | (__mock_table_1.colA:INTEGER, __mock_table_1.colB:INTEGER, __mock_table_3.colE:INTEGER, __mock_table_3.colF:VARCHAR)
MockScan { table=__mock_table_1 } | (__mock_table_1.colA:INTEGER, __mock_table_1.colB:INTEGER)
MockScan { table=__mock_table_3 } | (__mock_table_3.colE:INTEGER, __mock_table_3.colF:VARCHAR)

由于火山模型中的Next每次返回一条tuple,所以我们需要在Init中得到join后的所有tuple。Nested Loop Join其实就是用两个for循环去遍历两张表,保存满足筛选条件的tuple。

inner join只会返回两个表中满足连接条件的tuple;而left join会返回左表中的所有记录,以及右表中满足条件的tuple。所以在嵌套for循环中,如果右表中有未能满足条件的tuple,那么就保存左表中的每一列的值,并且加上右表中每一列的null值。

Reference