该系列博客只是为了记录自己在写Lab时的思路,按照课程要求不会在Github和博客中公开源代码。欢迎与我一起讨论交流!

这个project和之前就不一样了,开始深入数据库内核的实现了。需要理清楚一条sql语句是如何被执行的,方才能写出代码。

前置奶酪

一条SQL语句的执行

这里需要去看看一条sql语句传入bustub内部之后的代码:src/common/bustub_instance.cpp:ExecuteSqlTxn

auto BustubInstance::ExecuteSqlTxn(const std::string &sql, ResultWriter &writer, Transaction *txn,
                                   std::shared_ptr<CheckOptions> check_options) -> bool {
  if (!sql.empty() && sql[0] == '\\') {
    // 处理元命令
    ... 
  }

  // binder,但是在其中会使用libpg_query来解析sql语句
  bustub::Binder binder(*catalog_);
  binder.ParseAndSave(sql);

  // 经过上一步后,binder中的statement_nodes_存储着所有的语句解析节点
  for (auto *stmt : binder.statement_nodes_) {
    // 将stmt转换成BoundStatement对象,方便后面处理数据
    auto statement = binder.BindStatement(stmt);
    
    // 只有不需要构建plan树、不需要进行优化的sql语句才会在switch之后继续执行
    switch (statement->type_) {
      ...
    }

	// 生成初步的执行计划
    bustub::Planner planner(*catalog_);
    planner.PlanQuery(*statement);

	// 优化刚刚的执行计划
    bustub::Optimizer optimizer(*catalog_, IsForceStarterRule());
    auto optimized_plan = optimizer.Optimize(planner.plan_);

	...

	// 执行优化后的plan,这里会使用火山模型去根据下面节点的Next函数来执行相应的算子
	execution_engine_->Execute(optimized_plan, &result_set, txn, exec_ctx.get());

	// 将执行结果输出至指定位置
	...
  }
  return 是否执行成功;	
}

在binder之后,我们就有了一条sql的语句解析节点,例如执行select * from (select * from test_2 where colA > 10) where colB > 2;,其statement node如下:

BoundSelect {
  table=BoundSubqueryRef {
    alias=__subquery#0,
    subquery=BoundSelect {
      table=BoundBaseTableRef { table=test_2, oid=23 },
      columns=["test_2.colA", "test_2.colB", "test_2.colC"],
      groupBy=[],
      having=,
      where=(test_2.colA>10),
      limit=,
      offset=,
      order_by=[],
      is_distinct=false,
      ctes=,
    },
    columns=["test_2.colA", "test_2.colB", "test_2.colC"],
  },
  columns=["__subquery#0.test_2.colA", "__subquery#0.test_2.colB", "__subquery#0.test_2.colC"],
  groupBy=[],
  having=,
  where=(__subquery#0.test_2.colB>2),
  limit=,
  offset=,
  order_by=[],
  is_distinct=false,
  ctes=,
}

其中的子查询和where的条件还有需要哪些列都能非常清楚的看到。

Iterator Model

通常一个 SQL 会被组织成树状的查询计划,数据从叶子节点流到根节点,查询结果在根节点中得出。

bustub中采用的数据库查询执行模型叫做迭代器模型,也叫火山模型。

查询计划(query plan)中的每步operator对应的executor都实现一个next函数,每次调用时,operator返回一个tuple或者 null,后者表示数据已经遍历完毕。operator 本身实现一个循环,每次调用其 child operators 的 next 函数,从它们那边获取下一条数据供自己操作,这样整个 query plan 就被从上至下地串联起来。

但是像Joins, Aggregates, Subqueries, Order By这样的操作需要等所有children返回它们的tuple。虽然一次调用请求一条数据,占用内存较小,但函数调用开销大。

Catalog, Table and Index

下图出处:https://www.cnblogs.com/joey-wang/p/17351258.html

索引index

在Bustub中,索引用于加速数据访问。索引通过维护表中数据的有序结构,使得查询可以更快地定位到所需的记录。

结构

索引的结构图和上面表的结构图类似。在catalog中,可以获取到一个表对应的所有IndexInfo,每个IndexInfo中包含着这个索引的信息,这里讲两个个我认为比较重要的成员变量:

  • key_schema_:索引对应的列的结构,例如使用其ToString()函数时,其会返回(添加了索引的列的名称:该列的数据类型)
  • index_:这是一个指针,指向一个Index类的对象,也就是真正的索引
// catalog.h

class Catalog {
 public:
  template <class KeyType, class ValueType, class KeyComparator>
  auto CreateIndex(Transaction *txn, const std::string &index_name, const std::string &table_name, const Schema &schema,
                   const Schema &key_schema, const std::vector<uint32_t> &key_attrs, std::size_t keysize,
                   HashFunction<KeyType> hash_function, bool is_primary_key = false,
                   IndexType index_type = IndexType::HashTableIndex) -> IndexInfo *;
  auto GetIndex(const std::string &index_name, const std::string &table_name) -> IndexInfo *;
  auto GetIndex(const std::string &index_name, const table_oid_t table_oid) -> IndexInfo *;
  auto GetIndex(index_oid_t index_oid) -> IndexInfo *;
  auto GetTableIndexes(const std::string &table_name) const -> std::vector<IndexInfo *>;
  ...

 private:
  ...
  
  /**
   * Map index identifier -> index metadata.
   *
   * NOTE: that `indexes_` owns all index metadata.
   */
  std::unordered_map<index_oid_t, std::unique_ptr<IndexInfo>> indexes_;

  /** Map table name -> index names -> index identifiers. */
  std::unordered_map<std::string, std::unordered_map<std::string, index_oid_t>> index_names_;

  /** The next index identifier to be used. */
  std::atomic<index_oid_t> next_index_oid_{0};
};

struct IndexInfo {
  ...
  /** The schema for the index key */
  Schema key_schema_;
  /** The name of the index */
  std::string name_;
  /** An owning pointer to the index */
  std::unique_ptr<Index> index_;
  /** The unique OID for the index */
  index_oid_t index_oid_;
  /** The name of the table on which the index is created */
  std::string table_name_;
  /** The size of the index key, in bytes */
  const size_t key_size_;
  /** Is primary key index? */
  bool is_primary_key_;
  /** The index type */
  [[maybe_unused]] IndexType index_type_{IndexType::BPlusTreeIndex};
};

Index中有着三个虚函数供其派生类去实现,其唯一的成员变量的类型为IndexMeta,用来存储一些元信息,例如这个索引的名称,它所属的表的名称,最重要的还有一个key_attrs_,稍后就说谈论它。

// index.h

class IndexMeta {
 ...
 private:
  /** The name of the index */
  std::string name_;
  /** The name of the table on which the index is created */
  std::string table_name_;
  /** The mapping relation between key schema and tuple schema */
  const std::vector<uint32_t> key_attrs_;
  /** The schema of the indexed key */
  std::shared_ptr<Schema> key_schema_;
  /** Is primary key? */
  bool is_primary_key_;
};

class Index {
 ...
 private:
  /** The Index structure owns its metadata */
  std::unique_ptr<IndexMetadata> metadata_;
};

fall2023我们使用的是哈希索引,底层使用的就是在project2中实现的可拓展哈希。

// extendible_hash_table_index.h

#define HASH_TABLE_INDEX_TYPE ExtendibleHashTableIndex<KeyType, ValueType, KeyComparator>

template <typename KeyType, typename ValueType, typename KeyComparator>
class ExtendibleHashTableIndex : public Index {
 public:
  ExtendibleHashTableIndex(std::unique_ptr<IndexMetadata> &&metadata, BufferPoolManager *buffer_pool_manager,
                           const HashFunction<KeyType> &hash_fn);

  ~ExtendibleHashTableIndex() override = default;

  auto InsertEntry(const Tuple &key, RID rid, Transaction *transaction) -> bool override;

  void DeleteEntry(const Tuple &key, RID rid, Transaction *transaction) override;

  void ScanKey(const Tuple &key, std::vector<RID> *result, Transaction *transaction) override;

 protected:
  // comparator for key
  KeyComparator comparator_;
  // container
  DiskExtendibleHashTable<KeyType, ValueType, KeyComparator> container_;
};

更新索引

当插入新记录时,不仅需要将记录插入到表中,还需要将相应的索引条目插入到索引中。这样,后续的查询操作可以利用索引快速定位到目标记录。如果不更新索引,后续的查询操作可能会错过新插入的记录,导致查询结果不准确。

bustub中有哈希索引和B+Tree索引,fall2023版本使用的是可拓展哈希作为作为索引。不过这两个具体的实现都有一个基类Index,其中有以下虚函数需要子类去实现:

  • InsertEntry(const Tuple &key, RID rid, Transaction *transaction): 插入一个索引条目。
  • DeleteEntry(const Tuple &key, RID rid, Transaction *transaction): 删除一个索引条目。
  • ScanKey(const Tuple &key, std::vector<RID> *result, Transaction *transaction): 根据索引键搜索记录,并将结果RID存储在指定的向量中。

所以不管用的是哈希还是B+Tree,在操作索引时用的接口都相同。

如何理解索引

就如网上很多介绍索引的博客所描述的那样,数据库索引是用来加速检索速度的,就如同新华字典中的音节索引一样:

如同table_info,catalog中也有许多的index_info,每个index_info就如同上图音节表中的一个字母。我们对一个字段(列)构建一个索引,就如同在上图中音节表中多加一个字母(例如X)。

  • 需要插入一条记录时,就往对应的索引下插入(记录, 对应记录的地址)这样的键值对,例如上图的(xian, 519),这里的地址为RID
  • 需要删除一条记录时,在对应的索引下删掉匹配的键值对。
  • 需要更新一条记录时,由于bustub没有提供更新索引的API,所以可以用先删除再插入的方式模拟更新。

执行器如何使用索引获取数据

当执行器需要从表中获取数据时,如果查询计划中包含索引扫描操作,执行器会通过索引来快速定位数据。以下是具体的步骤:

  1. 解析查询计划
    • 执行器根据查询计划确定需要使用的索引。
    • 获取索引的元数据,包括索引键的模式和表列的映射关系。
  2. 构建索引键
    • 根据查询条件和索引的元数据,构建索引键。这通常涉及到从查询条件中提取列值,并根据索引键的模式进行转换。
  3. 使用索引进行搜索
    • 调用索引的 ScanKey 方法,传入构建好的索引键和一个结果RID向量。
    • 索引会根据键值查找对应的记录,并将找到的RID存储在结果向量中。
  4. 读取数据页
    • 使用结果向量中的RID,从缓冲池中查找对应的页。如果页不在缓冲池中,则从磁盘加载到缓冲池。
    • 从页中读取数据并创建 Tuple 对象。
  5. 处理和返回结果
    • 使用 Tuple 对象的方法(如 GetValueIsNull 等)访问和处理元组中的数据。
    • 将处理后的数据作为结果返回给用户或进一步处理。

谓词下推

谓词下推(Predicate Pushdown)是数据库查询优化中的一种技术,其核心思想是将查询中的过滤条件(即谓词)尽可能早地应用到查询执行计划的底部,也就是数据生成的地方。这样做的目的是为了减少数据的传输量和处理量,从而提高查询效率。

具体来说,谓词下推包括以下几个方面:

  1. 过滤条件前移:在查询执行的过程中,尽早地对数据进行过滤,这样不需要将所有数据都传递到上层操作中,只传递满足条件的数据。
  2. 减少数据传输:通过在数据生成的阶段就进行过滤,可以减少从数据库存储引擎到查询处理引擎之间的数据传输量。
  3. 减少CPU处理:不需要对所有数据进行后续的处理,只需要处理已经过滤的数据,这样可以减少CPU的工作量。
  4. 利用索引:如果过滤条件可以利用现有的索引,谓词下推可以使得查询直接利用索引来快速定位数据,而不是扫描整个表。
  5. 优化查询计划:数据库优化器会根据谓词下推的原则重新规划查询的执行步骤,生成更高效的查询计划。

例如:

SELECT * FROM employees WHERE department_id = 5 AND salary > 50000;

在这个查询中,WHERE子句包含了两个过滤条件。如果不进行谓词下推,数据库可能会先扫描整个employees表,然后将所有行传递给上层操作,之后再应用过滤条件。而通过谓词下推,数据库可以在扫描表的时候直接应用这些过滤条件,只返回部门ID为5且薪资大于50000的员工记录。

谓词下推是数据库查询优化中非常重要的一环,它有助于提高查询性能,特别是在处理大规模数据集时。数据库优化器会尝试自动应用谓词下推,但有时开发者也可以通过编写更优化的查询条件来帮助优化器更好地进行谓词下推。

Task1 - Access Method Executors

SeqScan

顺序扫描指定的表,表的遍历可以使用TableIterator

每次找到一条没有被标记为“删除”或者不是where之类的过滤子句匹配(这里会在delete操作中说明)的tuple(记录)就并返回,如果已经扫描到了表的结束位置则返回false。

Insert

为什么Insert等Executor有child而SeqScan没有?

InsertExecutor 的主要职责是将一条或多条记录插入到指定的表中。它可能需要依赖于其他 Executor 来获取要插入的数据。例如,如果 INSERT 操作是从一个 SELECT 查询的结果集中插入数据,那么 InsertExecutor 可能会有一个子 Executor(如 SeqScanExecutor 或其他类型的 Executor),该子 Executor 负责执行 SELECT 操作并提供数据给 InsertExecutor

因此,InsertExecutor 有 child 是因为它可能需要从另一个查询的结果中获取数据。

SeqScanExecutor的主要职责是对表进行全表扫描,即按顺序读取表中的所有记录。这是一个基本的操作,通常不需要其他 Executor 的支持来完成其工作。

它直接作用于存储层,遍历表中的每一行数据,因此没有子 Executor。它的任务相对简单,就是遍历和返回表中的所有记录。

简而言之,InsertExecutor 需要 child 是因为它的操作可能涉及从其他查询结果中获取数据,而 SeqScanExecutor 不需要 child 是因为它的任务是独立完成的,只需遍历表中的所有记录即可。这反映了数据库执行计划中不同操作之间的依赖关系和交互方式。其他Executor同理。

举个批量插入的🌰:

假设我们有一个 orders 表,包含以下列:

  • order_id (主键)
  • customer_id
  • product_id
  • quantity
  • order_date

我们希望通过一个子查询(select)来获取一批订单记录,并将这些记录插入到 orders 表中:

INSERT INTO orders (customer_id, product_id, quantity, order_date)
SELECT customer_id, product_id, quantity, order_date
FROM pending_orders
WHERE status = 'approved';

很明显,我们在插入之前要从select子句中获取数据,因此这个子查询操作就是insert操作的child_executor

没有子操作时,需要插入的数据从哪里获取?

比如执行如下SQL时:

insert into test_1 values (202, 1, 2, 3);

从肉眼看可以知道需要插入的数据为(202, 1, 2, 3),但是在代码中又是从哪里获取的呢?

让我们使用一下explain工具来看看这条SQL语句在bustub内部做了什么:

bustub> explain insert into test_1 values (202, 1, 2, 3);
=== BINDER ===
BoundInsert {
  table=BoundBaseTableRef { table=test_1, oid=22 },
  select=  BoundSelect {
    table=BoundExpressionListRef { identifier=__values#0, values=[["202", "1", "2", "3"]] },
    columns=["__values#0.0", "__values#0.1", "__values#0.2", "__values#0.3"],
    groupBy=[],
    having=,
    where=,
    limit=,
    offset=,
    order_by=[],
    is_distinct=false,
    ctes=,
  }
}
=== PLANNER ===
Insert { table_oid=22 } | (__bustub_internal.insert_rows:INTEGER)
  Projection { exprs=["#0.0", "#0.1", "#0.2", "#0.3"] } | (__values#0.0:INTEGER, __values#0.1:INTEGER, __values#0.2:INTEGER, __values#0.3:INTEGER)
    Values { rows=1 } | (__values#0.0:INTEGER, __values#0.1:INTEGER, __values#0.2:INTEGER, __values#0.3:INTEGER)
=== OPTIMIZER ===
Insert { table_oid=22 } | (__bustub_internal.insert_rows:INTEGER)
  Values { rows=1 } | (__values#0.0:INTEGER, __values#0.1:INTEGER, __values#0.2:INTEGER, __values#0.3:INTEGER)

将目标聚焦在生成的查询计划上,从上到下,在一个Insert的查询计划中,使用Projection从其输入源(如表扫描、索引扫描、连接等)中提取所需的列,最下层使用Value获取到要操作的数据!

所以对应的,InsertExecutorchild_executorProjectionExecutor,而ProjectionExecutorchild_executorValuesExecutor,使用迭代器模型就能很方便的获取到数据了(ValuesExecutor就是最后的Executor,其Next函数不会再往下调用,其所做的只是根据在解析SQL及其之后的一些步骤中得到的需要操作的数据封装成一个tuple进行返回)。

Delete

需要写的代码和insert操作的基本相同。但有个地方需要注意一下,在执行一条delete语句时,让我们看看做了些什么:

bustub> explain delete from test_1 where colA = 999;
=== BINDER ===
Delete { table=BoundBaseTableRef { table=test_1, oid=22 }, expr=(test_1.colA=999) }
=== PLANNER ===
Delete { table_oid=22 } | (__bustub_internal.delete_rows:INTEGER)
  Filter { predicate=(#0.0=999) } | (test_1.colA:INTEGER, test_1.colB:INTEGER, test_1.colC:INTEGER, test_1.colD:INTEGER)
    SeqScan { table=test_1 } | (test_1.colA:INTEGER, test_1.colB:INTEGER, test_1.colC:INTEGER, test_1.colD:INTEGER)
=== OPTIMIZER ===
Delete { table_oid=22 } | (__bustub_internal.delete_rows:INTEGER)
  SeqScan { table=test_1, filter=(#0.0=999) } | (test_1.colA:INTEGER, test_1.colB:INTEGER, test_1.colC:INTEGER, test_1.colD:INTEGER)

可以看到,在optimizer阶段,where子句的filter下放至SeqScan处与其合并了,也就是说,我们需要在实现SeqScanExecutor时注意处理一下filter。这里提示一下:

while (cur_tuple.first.is_deleted_ ||
	    (plan_->filter_predicate_ && !(plan_->filter_predicate_->Evaluate(tuple, GetOutputSchema()).GetAs<bool>())))

如果其返回true,说明filter匹配到了数据(就如例子中匹配到了colA列为999的tuple),如果此时这个tuple没有被标记为删除,那么就说明找到了我们需要删除的tuple。

Update

我们如何知道update需要更新的数据从哪里取呢?

bustub> explain(p, o) update test_1 set colB = 15445;
=== PLANNER ===
Update { table_oid=22, target_exprs=["#0.0", "15445", "#0.2", "#0.3"] }
  Filter { predicate=true }
    SeqScan { table=test_1 }
=== OPTIMIZER ===
Update { table_oid=22, target_exprs=["#0.0", "15445", "#0.2", "#0.3"] }
  SeqScan { table=test_1, filter=true }

看看Update中,有一个target_exprs数组,这个数组可不就是我们的一行数据吗,并且是需要更新的那行数据:update语句可以不加where子句,这样就是选中表中的所有行,也就是这里将表中colB列的数据都update为15445!

对于target_exprs这个数组,我们可以通过plan_->target_expressions_获取,然后用其构建一个新的tuple。

需要注意的是,这里并没有提供直接更新tuple的操作,所以我们的update操作可以用先删除后插入的方式来模拟。

IndexScan

我们首先需要完成OptimizeSeqScanAsIndexScan这个优化步骤。

假设现在我们有一个表叫“test_1”,其列如下:

+-------------+-------------+-------------+-------------+
| test_1.colA | test_1.colB | test_1.colC | test_1.colD |
+-------------+-------------+-------------+-------------+

现在我们希望这条SQL能执行的更快:

select * from test_1 where colB = 11;

那比较不错的方法就是给colB列加上索引:

create index v1 on test_1(colB);

这样在执行时可以更快速的查找数据。

那么为了实现这一目标,我们需要通过OptimizeSeqScanAsIndexScan将plan树中的SeqScanPlanNode转换成IndexScanPlanNode,这样我们才能使用IndexScanPlanNode对应的算子—IndexScanExecutor去使用索引。但是由于bustub的一些设计,需要遵循以下规则才可以转化:

  1. 当前的节点的类型必须是PlanType::SeqScan
  2. 当前节点必须有filter谓词,如果只是select * from test_1;这样的是不需要使用索引的
  3. 当前表中必须有索引,没有索引还玩啥呢
  4. fileter谓词中的逻辑表达式只能有一个,并且其类型必须是ComparisonType::Equal(我想这里必须是“等于”是不是因为fall2023使用的索引是哈希索引)
  5. 在当前表的索引信息中找到与filter谓词相对应的索引后才能返回一个IndexScanPlanNode

select * from test_1 where colB = 11;中,加了索引后,最需要关注的就是where colB = 11这一个过滤条件。bustub中要求IndexScan过滤运算符必须是=,且只能有一个条件。如果colAcolB都是索引,然后执行select * from test_1 where colB = 11 and colA = 1;,这样是不会走索引优化的。

我们将查询计划中的过滤谓词转化成ComparisonExpression类型,据我的理解,其可通过GetChildAt函数获取比较谓词左边的列名表达式(即这里的“colB”,有了这个列名的表达式,我们就可以获取到这个列的col_id值)和右边的值(即“11”)。之后需要去这张表中的所有索引中去找是否有colB的索引,怎么确定是否有呢,那就要看这个表中的每个Indexkey_attrs_

Index中,key_attrs_决定了索引的关键字由哪些列组成,对应了每个列的下标。例如:

  • 如果表的 schema 定义有 5 列,分别为 A, B, C, D, E
  • 某个索引的 key_attrs_[0, 2],则表示该索引使用了第 0 列(A)和第 2 列(C)作为其关键字。

如果之前我们获取的列的col_id值和某个Indexkey_attrs_中的值相同,那么就存在相应的索引!这时构造一个IndexScanPlanNode返回即可(参考merge_filter_scan.cpp中是如何做的)。

当优化器成功更换节点后,在执行时就会走索引,其底层算子就会使用到IndexScanExecutor。到这里,这个算子需要做的事情就很简单了,就是调用哈希索引的ScanKey进行查找,不过这里有3点需要注意:

  1. 在project2中,我们实现的索引引擎只支持一个键对应一个值(不只是这个版本的可拓展哈希,其他版本中的B+Tree也只要求这样实现),也就是我们的这个算子在底层索引引擎不扩展的情况下最多查到一条记录,这样的话就可以在算子的Init函数中调用ScanKey查找记录就行。
  2. 可拓展哈希中是将一个Tuple的data转化一下当作key,所以在索引中,其是将索引列的值作为key,其对应的oid作为值。在使用ScanKey时,第一个参数需要的tuple将用查询计划节点IndexScanPlanNode中的pred_key_构造。
  3. 表中的tuple的元数据中,其is_deleted_可能为true,这说明这个tuple在逻辑上已经删除了,所以如果我们通过2中获取的oid对应的tuple是这种情况,就不用向上返回数据。

Task2 - Aggregation & Join Executors

Aggregation

分析一个例子:

bustub> EXPLAIN SELECT MAX(colC), MIN(colB) FROM test_1 GROUP BY colA HAVING MAX(colB) > 10;
=== BINDER ===
BoundSelect {
  table=BoundBaseTableRef { table=test_1, oid=22 },
  columns=["max([\"test_1.colC\"])", "min([\"test_1.colB\"])"],
  groupBy=["test_1.colA"],
  having=(max(["test_1.colB"])>10),
  where=,
  limit=,
  offset=,
  order_by=[],
  is_distinct=false,
  ctes=,
}
=== PLANNER ===
Projection { exprs=["#0.2", "#0.3"] } | (<unnamed>:INTEGER, <unnamed>:INTEGER)
  Filter { predicate=(#0.1>10) } | (test_1.colA:INTEGER, agg#0:INTEGER, agg#1:INTEGER, agg#2:INTEGER)
    Agg { types=["max", "max", "min"], aggregates=["#0.1", "#0.2", "#0.1"], group_by=["#0.0"] } | (test_1.colA:INTEGER, agg#0:INTEGER, agg#1:INTEGER, agg#2:INTEGER)
      SeqScan { table=test_1 } | (test_1.colA:INTEGER, test_1.colB:INTEGER, test_1.colC:INTEGER, test_1.colD:INTEGER)
=== OPTIMIZER ===
Projection { exprs=["#0.2", "#0.3"] } | (<unnamed>:INTEGER, <unnamed>:INTEGER)
  Filter { predicate=(#0.1>10) } | (test_1.colA:INTEGER, agg#0:INTEGER, agg#1:INTEGER, agg#2:INTEGER)
    Agg { types=["max", "max", "min"], aggregates=["#0.1", "#0.2", "#0.1"], group_by=["#0.0"] } | (test_1.colA:INTEGER, agg#0:INTEGER, agg#1:INTEGER, agg#2:INTEGER)
      SeqScan { table=test_1 } | (test_1.colA:INTEGER, test_1.colB:INTEGER, test_1.colC:INTEGER, test_1.colD:INTEGER)

对于AggregationExecutor,我们可以获取到SQL语句中:

  1. 聚合操作的types(进行聚合操作的类型)和aggregates(需要聚合操作的列),二者一一对应
  2. 需要group by进行分组的列

再看看lecture中的这个例子,其对列cid使用group by进行分组,其中涉及的聚合操作为AVG,可转换成COUNTSUM操作。这里相当于:

types = ['count', 'sum']
aggregates = ['s.gpa', 's.gpa']

我的理解是根据group by的字段的值进行hash函数处理作为哈希表的键,例如图中的“15-445”,“15-826”等;然后哈希表的值为一个集合,这个集合的大小和typesaggregates的大小相同,并且对应的位置就为aggregates的值:例如图中键“15-445”的值中,第一个元素就为COUNT操作下s.gpa为15-445的个数。

在lab中需要实现countsummaxmin操作,其实就是在SimpleAggregationHashTable::CombineAggregateValues中实现对应的操作即可,本质上是对哈希表的几个很简单的操作。

aggregation通常需要对一组数据进行计算,这些计算具有以下特点:

  1. 需要完整输入:Aggregation 通常需要从下层拉取所有相关数据才能计算结果,例如计算 SUM 需要遍历所有行。
  2. 阻塞性:在传统实现中,Aggregation 算子通常被称为“阻塞算子”,因为它必须等待所有输入数据都拉取完成才能产出结果。这意味着 next() 调用会被延迟,直到聚合计算完成。

在我们的火山模型中 aggregation 是阻塞算子:

  • 当上层算子调用 next() 时,aggregation 会向下层算子连续调用 next(),直到拉取完全部数据并完成聚合。
  • 在数据尚未完全拉取并聚合完成之前,上层的 next() 调用无法直接返回结果。

需要注意的是:

  • SQL中进行group by后使用count,统计的是每组数据中的记录数,而非分组后新表的行数。
  • distinct其实就是对某个字段进行group by操作
  • count(*)统计null,而count(字段)不统计null

NestedLoopJoin

Inner Join:

bustub> EXPLAIN SELECT * FROM __mock_table_1, __mock_table_3 WHERE colA = colE;
=== BINDER ===
BoundSelect {
  table=BoundCrossProductRef { left=BoundBaseTableRef { table=__mock_table_1, oid=0 }, right=BoundBaseTableRef { table=__mock_table_3, oid=2 } },
  columns=["__mock_table_1.colA", "__mock_table_1.colB", "__mock_table_3.colE", "__mock_table_3.colF"],
  groupBy=[],
  having=,
  where=(__mock_table_1.colA=__mock_table_3.colE),
  limit=,
  offset=,
  order_by=[],
  is_distinct=false,
  ctes=,
}
=== PLANNER ===
Projection { exprs=["#0.0", "#0.1", "#0.2", "#0.3"] } | (__mock_table_1.colA:INTEGER, __mock_table_1.colB:INTEGER, __mock_table_3.colE:INTEGER, __mock_table_3.colF:VARCHAR)
  Filter { predicate=(#0.0=#0.2) } | (__mock_table_1.colA:INTEGER, __mock_table_1.colB:INTEGER, __mock_table_3.colE:INTEGER, __mock_table_3.colF:VARCHAR)
    NestedLoopJoin { type=Inner, predicate=true } | (__mock_table_1.colA:INTEGER, __mock_table_1.colB:INTEGER, __mock_table_3.colE:INTEGER, __mock_table_3.colF:VARCHAR)
      MockScan { table=__mock_table_1 } | (__mock_table_1.colA:INTEGER, __mock_table_1.colB:INTEGER)
      MockScan { table=__mock_table_3 } | (__mock_table_3.colE:INTEGER, __mock_table_3.colF:VARCHAR)
=== OPTIMIZER ===
NestedLoopJoin { type=Inner, predicate=(#0.0=#1.0) } | (__mock_table_1.colA:INTEGER, __mock_table_1.colB:INTEGER, __mock_table_3.colE:INTEGER, __mock_table_3.colF:VARCHAR)
  MockScan { table=__mock_table_1 } | (__mock_table_1.colA:INTEGER, __mock_table_1.colB:INTEGER)
  MockScan { table=__mock_table_3 } | (__mock_table_3.colE:INTEGER, __mock_table_3.colF:VARCHAR)
bustub> EXPLAIN SELECT * FROM __mock_table_1 INNER JOIN __mock_table_3 ON colA = colE;
=== BINDER ===
BoundSelect {
  table=BoundJoin { type=Inner, left=BoundBaseTableRef { table=__mock_table_1, oid=0 }, right=BoundBaseTableRef { table=__mock_table_3, oid=2 }, condition=(__mock_table_1.colA=__mock_table_3.colE) },
  columns=["__mock_table_1.colA", "__mock_table_1.colB", "__mock_table_3.colE", "__mock_table_3.colF"],
  groupBy=[],
  having=,
  where=,
  limit=,
  offset=,
  order_by=[],
  is_distinct=false,
  ctes=,
}
=== PLANNER ===
Projection { exprs=["#0.0", "#0.1", "#0.2", "#0.3"] } | (__mock_table_1.colA:INTEGER, __mock_table_1.colB:INTEGER, __mock_table_3.colE:INTEGER, __mock_table_3.colF:VARCHAR)
  NestedLoopJoin { type=Inner, predicate=(#0.0=#1.0) } | (__mock_table_1.colA:INTEGER, __mock_table_1.colB:INTEGER, __mock_table_3.colE:INTEGER, __mock_table_3.colF:VARCHAR)
    MockScan { table=__mock_table_1 } | (__mock_table_1.colA:INTEGER, __mock_table_1.colB:INTEGER)
    MockScan { table=__mock_table_3 } | (__mock_table_3.colE:INTEGER, __mock_table_3.colF:VARCHAR)
=== OPTIMIZER ===
NestedLoopJoin { type=Inner, predicate=(#0.0=#1.0) } | (__mock_table_1.colA:INTEGER, __mock_table_1.colB:INTEGER, __mock_table_3.colE:INTEGER, __mock_table_3.colF:VARCHAR)
  MockScan { table=__mock_table_1 } | (__mock_table_1.colA:INTEGER, __mock_table_1.colB:INTEGER)
  MockScan { table=__mock_table_3 } | (__mock_table_3.colE:INTEGER, __mock_table_3.colF:VARCHAR)

可以看到,即便我们不加inner join,默认情况下也是使用的Inner Join:NestedLoopJoin中的type均为Inner

Left Join:

bustub> EXPLAIN SELECT * FROM __mock_table_1 LEFT OUTER JOIN __mock_table_3 ON colA = colE;
=== BINDER ===
BoundSelect {
  table=BoundJoin { type=Left, left=BoundBaseTableRef { table=__mock_table_1, oid=0 }, right=BoundBaseTableRef { table=__mock_table_3, oid=2 }, condition=(__mock_table_1.colA=__mock_table_3.colE) },
  columns=["__mock_table_1.colA", "__mock_table_1.colB", "__mock_table_3.colE", "__mock_table_3.colF"],
  groupBy=[],
  having=,
  where=,
  limit=,
  offset=,
  order_by=[],
  is_distinct=false,
  ctes=,
}
=== PLANNER ===
Projection { exprs=["#0.0", "#0.1", "#0.2", "#0.3"] } | (__mock_table_1.colA:INTEGER, __mock_table_1.colB:INTEGER, __mock_table_3.colE:INTEGER, __mock_table_3.colF:VARCHAR)
  NestedLoopJoin { type=Left, predicate=(#0.0=#1.0) } | (__mock_table_1.colA:INTEGER, __mock_table_1.colB:INTEGER, __mock_table_3.colE:INTEGER, __mock_table_3.colF:VARCHAR)
    MockScan { table=__mock_table_1 } | (__mock_table_1.colA:INTEGER, __mock_table_1.colB:INTEGER)
    MockScan { table=__mock_table_3 } | (__mock_table_3.colE:INTEGER, __mock_table_3.colF:VARCHAR)
=== OPTIMIZER ===
NestedLoopJoin { type=Left, predicate=(#0.0=#1.0) } | (__mock_table_1.colA:INTEGER, __mock_table_1.colB:INTEGER, __mock_table_3.colE:INTEGER, __mock_table_3.colF:VARCHAR)
  MockScan { table=__mock_table_1 } | (__mock_table_1.colA:INTEGER, __mock_table_1.colB:INTEGER)
  MockScan { table=__mock_table_3 } | (__mock_table_3.colE:INTEGER, __mock_table_3.colF:VARCHAR)

由于火山模型中的Next每次返回一条tuple,所以我们需要在Init中得到join后的所有tuple。Nested Loop Join其实就是用两个for循环去遍历两张表,保存满足筛选条件的tuple。

inner join只会返回两个表中满足连接条件的tuple;而left join会返回左表中的所有记录,以及右表中满足条件的tuple。所以在嵌套for循环中,如果右表中有未能满足条件的tuple,那么就保存左表中的每一列的值,并且加上右表中每一列的null值。

Task3 - HashJoin Executor and Optimization

为什么需要hash join呢?如果两张要进行join的表非常大,这时使用NLJ的时间复杂度就为O(n*m),但是如果我们使用hash join,先对一张表建立哈希映射,然后再对另一张表进行哈希检测来判断是否满足连接条件,这样就只要各扫描一次两张表,时间复杂度就降为了O(n+m)

将 NL Join 优化为 Hash Join

实验指导中说,优化器需要将 nl join 优化为 hash join 的情况为:连接条件中有多个等值 AND 操作,即 x = y AND a = b AND ...。那我们只要在碰到 NestedLoopPlanNode 时获取其谓词,判断一下是否满足该情况就好了。

需要注意的是,当谓词仅为 a = b 时,这个谓词是一个 ComparationExpression;而当它为 x = y AND a = b AND ... 时,它是一个 LogicExpression。这里在判断的是否需要区分,而在之前的实验中,有学习到可以使用标准库中的 std::dynamic_pointer_cast 函数来将谓词从 Expression基类 转换为指定的 Expression派生类,当转换失败时,会返回空指针,这样我们可以通过返回的指针来判断是否是想要的 Expression,如果为空,就代表是另一个 Expression。

由于谓词中很有可能是 LogicExpression 嵌套着 LogicExpressionComparationExpression,数量也不确定,且子 LogicExpression 很可能还有嵌套,这样的话使用递归来处理就非常方便。

实现 Hash Join

hash join 的核心就是:先对一张表建立哈希映射,然后再对另一张表进行哈希检测来判断是否满足连接条件。哈希表中的 key 为连接条件中对应的值的拼接,value 为整个 tuple。而一张表中的所有 tuple,很有可能会有几个列是完全相同的,如果这些列刚好作为连接条件,那进行哈希时就会造成 key 相同的情况,这就造成了哈希碰撞

解决方法也很简单:

  1. 使用 std::unordered_map,将拼接的列值作为 key,而值的类型为 std::vector<Tuple>;
  2. 使用std::unordered_multimap来处理。

当我们构建好哈希表后,每次获取了另一张表中的 tuple 后进行一次哈希检测,如果 key 存在,需要将所有 value 都进行连接拼接。同 nested loop join 一样,需要特殊处理连接类型为 left 的情况。

Task4 - Sort + Limit Executors + Window Functions + Top-N Optimization

Sort

这个算子的实现思路很简单,但是需要注意两点:

  1. order by 后面可以跟多个关键字,也就是需要在 std::sort 中对多个关键字进行排序(利用 for 循环)。
  2. 这些关键字中可能进行算数运算(算数对象类型为 ArithmeticExpression,搞清楚进行运算的函数就可)。

Limit

这个就更简单了,根据 limit 的返回对应数量的 tuple。

TopN

topn 算子的优化只有 sort 和 limit 同时出现的是否才会触发,这里的优化逻辑比较简单。

而具体的算子的实现其实就是一个优先队列。举个例子,现在 tuple 按照某个关键字升序排列,并且 limit 为 10,那么就可以构造一个小根堆,并维护其大小最多为10,最后留在堆中的就是结果。如果力扣刷了一点题的话很容易就能理解这里。

Window Function

说来惭愧,学 SQL 的时候并不知道窗口函数这个东西…简单来理解,就是在使用聚合函数的后面加上 over ([可选操作]) 即可对区间进行聚合操作。

对于下面的 SQL,最后输出的 schema 应该和 WindowFunc.columns 相同,并且行数也和子 executor 返回的行数相同(下文将子 executor 返回的 tuple 称为 child_tuples),只是多了一些额外的计算列。

bustub> explain(o) select v1,  min(v1) over () as min_v1, max(v1) over () as max_v1, count(v1) over () as count_v1, sum(v1) over () as sum_v1 from t1;
=== OPTIMIZER ===
WindowFunc {
  columns=#0.0, placeholder, placeholder, placeholder, placeholder, ,
  window_functions={
    1=>{ function_arg=#0.0, type=min, partition_by=[], order_by=[] },
    2=>{ function_arg=#0.0, type=max, partition_by=[], order_by=[] },
    3=>{ function_arg=#0.0, type=count, partition_by=[], order_by=[] },
    4=>{ function_arg=#0.0, type=sum, partition_by=[], order_by=[] }
  }
}
  SeqScan { table=t1 }

在这条 SQL 中,返回的 tuple 的格式应该为:

这条 tuple 的 v1 列的值所有 tuple 中最小的 v1 的值所有 tuple 中最大的 v1 的值所有 tuple 中 v1 的个数所有 tuple 中 v1 的值的和

这其实和 WindowFunc.columns 有很大的关系,placeholder 说明这只是个占位符,其应该为 window_functions[下标] 对应的窗口函数。例如第二个 placeholder 在 columns 的下表为 1,其对应的窗口函数就是 { function_arg=#0.0, type=min, partition_by=[], order_by=[] }。那么就可以通过 column 来找到每个窗口对应的哈希表。

️需要注意的是:

  • 如果按照 ORDER BY 进行排序后,每一行的窗口范围从第一行开始扩展到当前行
  • 否则每一行的窗口范围是整个child_tuples

既然如此,我们需要先从 child_executor 获取到所有的 child_tuples,然后对 child_tuples 遍历:如果有分组行为,那就需要按照某一列(或多列)进行分组,然后对于每一个 tuple,都让其执行一次 WindowFunc.window_functions 中的窗口函数。

稍有不同的是,之前的 aggregation 操作分组后可能需要完成多个聚合函数,但是这里我们分组之后,只会完成一个聚合函数,因为每个窗口中只有一个聚合函数。

现在看来,这个窗口函数也无非就是对某个范围内的 tuple 进行分组和聚合操作,实验指导中也提示我们可以去利用 task2 中写的代码。在我的实现方案中,并不像 task2 中只使用一个哈希表,因为在一条 SQL 函数中,可能有多个窗口函数,每一个窗口函数又可能又不同的分组和不同的聚合操作,因此我对于每一个窗口函数都设置了一个哈希表。

该 task 中 bustub 很仁慈地简化了难度:如果窗口函数任意一个中有 order by,那么所有窗口函数的 order by 都相同。不过在有排序和没有排序的情况下,窗口的范围有所不同,处理起来的方法也不同。

无排序的情况

这个情况下,每个窗口函数的范围就是整个 child_tuples。之后再次遍历每一条 tuple,按照输出的 schema 来构建返回的 tuple。那么实现操作应该如下:

先描所有 tuple,生成这条 tuple 对应的 key 和 value,再把 key 和 value 加入对应的哈希表。

然后对于每一条 tuple,遍历 WindowFunc.columns

  1. 如果 column 不是 placeholder,那么说明这个位置是这个 tuple 中的一列,获取这一列对应的 Value 就好。
  2. 如果 column 是 placeholder,那么说明这个位置的值应该是对应的窗口函数的执行结果,那么从 column 对应的哈希表中找出这个 tuple 对应的值就 ok。

有排序的情况

在这个情况下,排序号后每条 tuple 的范围是自身及之前的所有 tuple,而不是像之前一样的所有 tuple。这就有点像一句话:“走一步看一步”

还是遍历每一条 tuple,对每一条 tuple 又遍历 WindowFunc.columns

  1. 如果 column 不是 placeholder,那么说明这个位置是这个 tuple 中的一列,获取这一列对应的 Value 就好。这里和无排序的情况一致。
  2. 如果 column 是 placeholder,
    1. 生成这条 tuple 对应的 key
    2. 生成这条 tuple 对应的 value,并将 {key, value} 插入 column 对应的哈希表(在 task2 中可知,这里的“插入”其实是对 key 处的旧值 old_value 与新值 value 进行聚合操作)
    3. 从哈希表中取出 key 对应的 value

这里的 2.2 和 2.3 就是之前所说的“走一步看一步”:当前 tuple 的结果是在之前的 tuple 上聚合而来的

总结

这个 Project 需要我们去深入理解 bustub 的源码,知道一条 SQL 会被解析成一棵什么样的 plan 树,在经过基于规则的优化器优化后才会是最终的物理 plan 树,这时又要去理解这棵树上每个节点对应的算子应该是怎么实现的。其实知道了 plan 树是什么样子后,节点对应的算子就按照要求去设计就好了。

在众多 AI 例如 ChatGPT、DeepSeek、Kimi 等帮助下,还是慢慢理解并完成了这个 Project!但是有的地方我可能还没有做的比较好,以后有时间再优化一下。