本文目的在于记录复盘做15445这个项目的经验总结 以及应付面试
分成2个部分 lab实现 还有 这门课的一些重要知识
先直接说5个lab
lab0 用cpp实现字典树 和下文基本没关系 不细说了
lab1 实现3个东西
可拓展哈希表
在理解可扩展哈希表之前,我们需要了解几个概念。
Directory :是存放bucket指针的容器,可动态生长(以原大小的倍数作为增长率),容器的每个元素可用哈希值来索引。
Bucket :桶。存放Key/value pair的桶,数据结构层面是一个线性表。
LRU 力扣原题
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 class Node { int key; int val; Node prev; Node next; Node() { } Node(int key, int val) { this.key = key; this.val = val; } } class LRUCache { private Map<Integer, Node> cache = new HashMap<>(); private Node head = new Node(); private Node tail = new Node(); private int capacity; private int size; public LRUCache(int capacity) { this.capacity = capacity; head.next = tail; tail.prev = head; } public int get(int key) { if (!cache.containsKey(key)) { return -1; } Node node = cache.get(key); moveToHead(node); return node.val; } public void put(int key, int value) { if (cache.containsKey(key)) { Node node = cache.get(key); node.val = value; moveToHead(node); } else { Node node = new Node(key, value); cache.put(key, node); addToHead(node); ++size; if (size > capacity) { node = removeTail(); cache.remove(node.key); --size; } } } private void moveToHead(Node node) { removeNode(node); addToHead(node); } private void removeNode(Node node) { node.prev.next = node.next; node.next.prev = node.prev; } private void addToHead(Node node) { node.next = head.next; node.prev = head; head.next = node; node.next.prev = node; } private Node removeTail() { Node node = tail.prev; removeNode(node); return node; } } /** * Your LRUCache object will be instantiated and called as such: * LRUCache obj = new LRUCache(capacity); * int param_1 = obj.get(key); * obj.put(key,value); */
缓冲池的管理
这个任务需要完成缓冲池的管理,前面只是完成对页框的管理,但是访问的时候是使用页号访问的,第一次用页号访问的时候,页加载到页框中。页换出的时候,页框被复用换入的页复用。这个流程对于缓冲池的管理来说应该是透明的,缓冲池管理只需要使用页号访问和修改页
NewPage
后面的函数都需要加锁
获取一个新页的流程是:如果空闲列表 没有空闲了,需要Evict一个页→牺牲的这个页如果是脏页的情况下,需要先落盘→如果空闲列表没有空闲,需要先获取一个新页号→从空闲列表取一个页号→设置新页→RecordAccess新页→pin新页
这里为什么要pin新页,因为当外部调用这个函数的时候,就为了使用这个新页,那么这个新页就一定不能马上被牺牲的
FetchPage
如果该页在内存中,很简单,只需要根据需求,记录一下这个事件,然后返回该页就行。否则,就按照NewPage函数执行,区别是,页号对应的页在磁盘上,需要用ReadPage从磁盘读到内存中
UnpinPage
unpin之后,如果还有其他人在使用该页,那么pin_count_应该是>0的,只有当pin_count为0时,该页可以设置为可牺牲的
重要的是,需要维护is_dirty_标志位,只要之前任意一次pin之后,该页变脏了,此后一直是脏的,如果该页最后是干净的,可以不用写回磁盘
FlushPage
调用WritePage写盘
重置脏位
当前的DeallocatePage没有实现,调用后实际上不会做事
DeletePage
当某个页不再要被使用的时候,就可以不再追踪
需要注意,如果该页是被pin住的状态,是不能被删除的
FlushAllPages
lab2 P2 b+tree
lab的整体架构很清晰。先实现internal_page、leaf_page这两个数据结构,作为B+树的内部节点和叶子节点。然后实现B+树的插入、删除,最后支持并发。
内部节点与叶子节点 在这一部分中,我们要实现一系列的小函数。整个过程比较繁琐,但难度不大。 在我最初的版本中,所有的搜索都是线性时间复杂度的。这样做实现简单,可以确保正确性。有些地方可以改成二分查找。
其实最主要看那本数据库系统概论的伪代码就能够实现了
b+树优化
更乐观的乐观策略
螃蟹法则的乐观锁策略:
读取数据:首先,从数据库中读取需要更新的数据。 执行更新操作:对读取的数据进行修改,并准备将更新后的数据写回数据库。 检查冲突:在提交更新之前,再次读取数据库中的数据,并比较更新前后的数据是否发生了变化。如果发现其他事务已经修改了数据,就意味着存在冲突。 处理冲突:如果检测到冲突,系统可以选择执行回滚操作,放弃当前的更新;或者根据具体情况采取其他的解决方案,例如重试操作、通知用户冲突发生等。 提交更新:如果没有冲突,就将更新后的数据写回数据库,并提交事务。
乐观策略失效的原因应该是这三点:测试量小、螃蟹协议太垃圾、读写锁拖后腿。
其中比较重要的是第二点:这个实验谈“乐观策略”时,大多数人都会实现成“基于螃蟹协议的乐观策略”。我们知道读写锁存在“读写冲突”的问题,螃蟹协议又带一种“尽可能多的收集锁”的保守倾向,这两点搭在一起会让乐观策略效果不明显。同时,cmu的测试量其实很小,生成的b+树只有两层高,绝大多数情况下,线程都存在一段时间,握有两层的锁。这样一来,重试线程会被大量乐观线程阻塞,新的乐观线程又会被重试线程阻塞。
lab3 最主要要先把框架看懂 value,tuple,table,column,schema
查询过程
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 bustub> EXPLAIN SELECT * FROM __mock_table_1; === BINDER === BoundSelect { table=BoundBaseTableRef { table=__mock_table_1, oid=0 }, columns=[__mock_table_1.colA, __mock_table_1.colB], groupBy=[], having=, where=, limit=, offset=, order_by=[], is_distinct=false, } === PLANNER === Projection { exprs=[#0.0, #0.1] } | (__mock_table_1.colA:INTEGER, __mock_table_1.colB:INTEGER) MockScan { table=__mock_table_1 } | (__mock_table_1.colA:INTEGER, __mock_table_1.colB:INTEGER) === OPTIMIZER === MockScan { table=__mock_table_1 } | (__mock_table_1.colA:INTEGER, __mock_table_1.colB:INTEGER)
这段 SQL 查询的执行计划中包含了三个部分:BINDER、PLANNER 和 OPTIMIZER。
BINDER :在这一阶段,查询中的各种符号被绑定到具体的表、列或其他对象。在这个查询中,__mock_table_1
被绑定到了一个基本表引用 BoundBaseTableRef
,并且选择了该表的所有列 __mock_table_1.colA
和 __mock_table_1.colB
。其他部分,如分组、聚合、筛选条件等,都没有被设置。
PLANNER :这一步骤中,执行计划被规划和生成。在这个查询中,使用了投影操作 Projection
,表示查询结果中将会包含 __mock_table_1
的所有列。然后,使用了一个模拟扫描操作 MockScan
,表示将会从 __mock_table_1
中获取数据。最后,显示了查询结果的列信息,即 __mock_table_1.colA:INTEGER
和 __mock_table_1.colB:INTEGER
。
OPTIMIZER :在这一步骤中,执行计划被优化以提高性能。在这个查询中,优化器没有进行额外的优化操作,因为查询已经相对简单,直接执行扫描操作即可。
综上所述,该查询执行计划会从表 __mock_table_1
中选择所有列,并将结果作为查询的输出。
AbstractExecutor SQL拆分成一棵执行树之后,其中的节点的功能承担者,也是这次实验主要要完成的部分。
构造执行树时不会构造Executor,而是用后面的AbstractPlanNode来构造树,只有在执行这棵树的时候会初始化对应的Executor来执行。
1 2 3 4 5 6 7 8 9 10 11 // 每个执行器需要初始化的东西都不一样,一般是初始化指向表头的迭代器指针,或者自己定义的一些辅助循环的值 // 拿seq_scan来说,就需要在这里将迭代器指针指向表头。 void Init() // 需要做到调用一次next就输出且只输出一组tuple的功能,输出是通过赋值*tuple参数,并return true // 如果没有能输出的了,return false void Next(Tuple *tuple, RID *rid) // Schema相当于存储了表的表头名字,OutputSchema用来指出此节点需要输出哪些列(哪些表头需要考虑在内) virtual const Schema *GetOutputSchema() = 0;
ExecutorContext (AbstractExecutor构造函数的参数之一)
上下文信息,也就是说这次执行所用到的一些关键信息
1 2 3 // 这个Catalog至关重要,存储了一个数据库的全部表格信息的索引,提供了对表格的操作。 // 只有这个Catalog是我们可能会用到的,比如在seq_scan中需要利用它获取目标表 Catalog *catalog_;
AbstractPlanNode (AbstractExecutor构造函数参数之二)
用于存储节点有关的信息,AbstractExecutor利用用里面的信息来完成任务。
1 2 3 4 5 6 7 8 // Schema是表每列的表头名字,OutputSchema用来指出此节点需要输出哪些列 Schema *OutputSchema() // 获取孩子节点(我们实现executor的时候用不上,在执行树的时候才用得上) // 在一些需要从子节点获取tuple的操作用得上,比如 join AbstractPlanNode *GetChildAt(uint32_t child_idx) std::vector<AbstractPlanNode *> &GetChildren()
例子:SeqScanPlanNode成员函数,AbstractExecutor可以在这里获取TableOid和Predicate
1 2 3 4 5 6 7 8 // 这个在ExecutionEngine里面用于判断当前节点的类型 PlanType GetType() // Predicate:谓词,返回值全是真值的表达式,AbstractExpression就是一颗表达式树。 AbstractExpression *GetPredicate() // 结合Catlog可以得到当前Executor需要的表格内容 table_oid_t GetTableOid()
AbstractExpression 表达式类,一颗表达式树中的节点,比如比较,聚合,或者常量,column元素。其中column表达式也作为了column类的成员之一。
不同的表达式实现的功能差距交大,这个是非常重要的一个类,每一个executor的代码都会用到。
ComparisonExpression:用于比较,实例化后做为predicate_谓词(下面例子中的predicate就是此类的实例化),或者having(在aggregation中会用到)。 返回的是一个装载Value类中的bool值,需要用value.GetAs()得到这个值,用于判断是否满足比较的条件(比较的细节:比如是>还是<,就不用我们关心了,交给Evaluate函数就行)
1 Value value = plan_->GetPredicate()->Evaluate(tuple, schema)
ColumnValueExpression:列元素的表达值,有一个很大的作用,它的Evaluate函数能返回当前tuple中哪一个value是对应这个column的。
1 2 3 4 5 6 7 8 9 Value value = column.GetExpr()->Evaluate(tuple, schema); // 或者判断当前join是左连接还是右链接,并根据传入的左右俩tuple返回连接对应的值 Value value = plan_->OutputSchema()->GetColumn(i).GetExpr()->EvaluateJoin( &left_tuple_, plan_->GetLeftPlan()->OutputSchema(), &right_tuple_, plan_->GetRightPlan()->OutputSchema());
ConstantValueExpression:常数表达式,返回值永远是一个常数。
AggregateValueExpression:在AggregateExecutor中用到,作用和ColumnValueExpression类似,用于找出属于当前column的这个值。(AggregateExecutor的测试函数在传入outputschme的时候,用于构造的colum不是ColumnValueExpression而是AggregateValueExpression,就是为了处理不同类型的输入参数。所以才能有以下的用法)
1 Value value = column.GetExpr()->EvaluateAggregate(temp.Key().group_bys_, temp.Val().aggregates_);
Value (最小数据单位)
Index 索引保存的是(Tuple_key, RID)对,其中Tuple_key是根据传入的Tuple生成的。
也就是说索引是按照表中的每一行对应生成的。
在会对table造成修改的executor中会用到,通过提供的函数在修改table的时候顺便把对应的indx修改。
tuple 相当于表里面的一行,存储了一行value。长度由shema决定,每个column可以在tuple中对应一个value
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 // 就是将数据二进制化或者反过来,用于存储 void SerializeTo(char *storage) const; void DeserializeFrom(const char *storage); // return RID of current tuple inline RID GetRid() // 返回数据指针 inline char *GetData() // 返回tuple的长度(bits) inline uint32_t GetLength() // 返回指定colum_idx位置的值 Value GetValue(const Schema *schema, uint32_t column_idx) // 其他 Tuple KeyFromTuple(const Schema &schema, const Schema &key_schema, const std::vector<uint32_t> &key_attrs); inline bool IsNull(const Schema *schema, uint32_t column_idx) inline bool IsAllocated() { return allocated_; } std::string ToString(const Schema *schema) const;
TableHeap 相当于一张表本身
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 bool InsertTuple(const Tuple &tuple, RID *rid, Transaction *txn); // MarkDelete标记需要删除的项,调用ApplyDelete删除 bool MarkDelete(const RID &rid, Transaction *txn); void ApplyDelete(const RID &rid, Transaction *txn); bool UpdateTuple(const Tuple &tuple, const RID &rid, Transaction *txn); void RollbackDelete(const RID &rid, Transaction *txn); // 获取Tuple bool GetTuple(const RID &rid, Tuple *tuple, Transaction *txn); //可以用迭代器访问Table TableIterator Begin(Transaction *txn); TableIterator End();
TableIterator TableHeap的迭代器(指针),指向的是其中的Tuple,即可以当作Tuple指针来用。
Catalog 存储了一个数据库的全部表格信息的索引,提供了对表格的操作。
1 2 3 4 5 6 7 8 9 10 11 12 13 // TableInfo装载某个table的相关信息,包括schema,name,table指针,oid TableInfo *GetTable(table_oid_t table_oid) // IndexInfo 同理 IndexInfo *GetIndex(table_oid_t table_oid) IndexInfo *CreateIndex(...) IndexInfo *GetIndex(index_oid_t index_oid) std::vector<IndexInfo *> GetTableIndexes(const std::string &table_name)
Tableinfo
1 2 3 4 5 6 7 8 Schema schema_; std::unique_ptr<TableHeap> table_; const table_oid_t oid_; const std::string name_;
Indexinfo
1 2 3 4 5 6 7 8 9 10 11 12 Schema key_schema_; std::string name_; std::unique_ptr<Index> index_; index_oid_t index_oid_; std::string table_name_; const size_t key_size_;
schema 模式。每列都有一个表头(名字),shema就是一组表头的集合,拿来指明这张表有哪些列或者某个executer需要处理哪些列。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 const Column &GetColumn(const uint32_t col_idx) uint32_t GetColIdx(const std::string &col_name) const std::vector<uint32_t> &GetUnlinedColumns() uint32_t GetColumnCount() uint32_t GetUnlinedColumnCount() // 返回单个tuple的长度(不是schema的长度哦) inline uint32_t GetLength() // inline bool IsInlined() const { return tuple_is_inlined_; }
column 相当于一列的名字(表头),也能调用其expression完成对应的操作(前面在AbstractExpression中有说)
然后就是实现各种executors了 seq_scan seq_scan_executor构造函数需要将元信息TableInfo保存起来,之后用来查询。
Init这里扫描已经是最底层的算子了,所以没有子算子,需要保存的元信息中table_的迭代器
Next需要注意的是,迭代器中获取的tuple不一定是都能用的,检验is_deleted_标志位,如果是true,那么还需要遍历下一行行tuple和rid都需要返回,其中RID,包括page_id和slow_num,其中page_id就是页号,slow_num指的是这个tuple在这个页的偏移位置,在没有varchar的时候,每个tuple都是固定大小的,这样就可以简单通过固定偏移量,计算出tuple到底放在页的哪里
遍历表时,其中的每一个tuple都要经过plan_->GetPredicate()->Evaluate()判断是否满足条件。 拿到的tuple其实是TableHeap的迭代器,看作是一个指向tuple的指针。
1 2 Value Evaluate(const Tuple *tuple, const Schema *schema)
根据Evaluate的参数,直接传入temp不行,因为temp不等同于tuple指针,只是重载了->运算符。需要如下方式传入。
1 2 Value Evaluate(const Tuple *tuple, const Schema *schema) plan_->GetPredicate()->Evaluate(&(*temp), &table_info_->schema_)
update + delete + insert 调用子节点的next获得要更新/删除/插入点的tuple,然后执行对应的操作。
IndexScan IndexScan其实和SeqScan非常类似,SeqScan是扫描数据表。SeqScan先获取数据表,IndexScan那就先获取索引树。IndexScanPlanNode计划节点中,包含了索引的index_id:
1 2 IndexInfo *info = exec_ctx_->GetCatalog()->GetIndex(plan_->GetIndexOid()); auto tree_ = reinterpret_cast<BPlusTreeIndexForOneIntegerColumn *>(info->index_.get());
获取树的迭代器:
1 auto table_iter_ = tree_->GetBeginIterator();
回看一下Project 2,迭代器解引用返回的是什么?是不是包含RID?但是我们要得到的是tuple,很简单,从TableHeap获取:
1 2 3 4 auto table_heap_ = exec_ctx_->GetCatalog()->GetTable(info->table_name_)->table_.get(); *rid = (*table_iter_).second; table_heap_->GetTuple(*rid, tuple, exec_ctx_->GetTransaction());
Task #2 - Aggregation & Join Executors 1.Aggregation
在 aggregation_executor.h中提供了SimpleAggregationHashTable,其中的主要成员是std::unordered_map ht_{}; AggregationExecutor中的成员变量:
1 2 3 4 5 6 7 8 /** The aggregation plan node */ const AggregationPlanNode *plan_; /** The child executor that produces tuples over which the aggregation is computed */ std::unique_ptr<AbstractExecutor> child_; /** Simple aggregation hash table */ SimpleAggregationHashTable aht_; /** Simple aggregation hash table iterator */ SimpleAggregationHashTable::Iterator aht_iterator_;
join
有两张表,我们需要把他们连接起来,可以通过内外两层循环来将满足条件的记录连接起来,即左表每读取一条记录,右表就要完成一次读循环。 HashJoin NestedLoopJoin需要两层循环,而HashJoin通过将一张表映射到哈希表中减少了一层循环。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 namespace bustub { struct AggregateKey { std::vector<Value> group_bys_; auto operator==(const AggregateKey &other) const -> bool { for (uint32_t i = 0; i < other.group_bys_.size(); i++) { if (group_bys_[i].CompareEquals(other.group_bys_[i]) != CmpBool::CmpTrue) { return false; } } return true; } }; } namespace std { /** Implements std::hash on AggregateKey */ template <> struct hash<bustub::AggregateKey> { auto operator()(const bustub::AggregateKey &agg_key) const -> std::size_t { size_t curr_hash = 0; for (const auto &key : agg_key.group_bys_) { if (!key.IsNull()) { curr_hash = bustub::HashUtil::CombineHashes(curr_hash, bustub::HashUtil::HashValue(&key)); } } return curr_hash; } }; } // namespace std
Task #3 - Sort + Limit Executors and Top-N Optimization 比较简单 不详细说了 top-n 用堆
lab4 DBMS中实现一个Lock Manager,然后使用他来支持并发执行。锁管理器负责跟踪向事务发出的行级锁,并支持根据隔离级别适当的上或释放共享锁和排他锁。
LM的基本思想是,它维护一个关于活动事务当前持有的锁的内部数据结构,然后事务在访问数据项之前向LM发出锁请求,LM将根据情况决定锁授予该事务、阻止该事务还是终止该事务。
一、锁管理器与死锁预防
lock_manager.h中有一个lock_table_的哈希表,其存储了每个RID对应的LockRequestQueue队列。及对于每个RID,也就是每个tuple,都有一个对应的锁请求队列LockRequestQueue。
一个请求LockRequest主要记录有发出该请求的txn_id, 请求的是哪类的锁,以及该请求是否已经被授予。一个LockRequestQueue中可能有一个或多个请求已经被授予锁。
本实验需要支持事务的三种隔离级别:
READ_UNCOMMITED只有在需要时上写锁。 READ_COMMITTED要解决脏读的问题,解决方案就是读时上读锁,读完解读锁;写时上写锁,但等到commit时才解写锁;读时上读锁,读完解读锁。这样,永远不会读到未commit的数据,因为上面有写锁。 REPEATABLE_READ进一步打造可重复读。同一事务读两次数据的中途不想被其他事务的写干扰,这就需要用到巧妙的二段封锁协议(2PL)了:事务分为两个阶段(不考虑commit/abort),上锁阶段(GROWING)只上锁,解锁阶段(SHINKING)只解锁。这样,第二次读取时,前一次读取的读锁一定还在,避免了中途被修改。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 // include\concurrency\lock_manager.h class LockRequestQueue { public: std::list<LockRequest> request_queue_; // for notifying blocked transactions on this rid std::condition_variable cv_; // txn_id of an upgrading transaction (if any) // txn_id_t upgrading_ = INVALID_TXN_ID; bool upgrading_ = false; }; private: /** * 将事务添加到等待队列中 * @param lock_queue 等待队列 * @param txn_id 事务ID * @param lock_mode 锁的类型 */ inline void InsertTxnIntoLockQueue(LockRequestQueue *lock_queue, txn_id_t txn_id, LockMode lock_mode); /** 全局锁 */ std::mutex latch_; /** Lock table for lock requests. */ std::unordered_map<RID, LockRequestQueue> lock_table_;
死锁预防部分写的并不够清楚明白,很多方法都是按照测试用例推测出来的。目前想到的能够基本通过测试的策略如下:
无论锁已经上了还是等待中,所有请求都进入锁队列,并记录在事务的锁Set中。 上锁请求均要完整的遍历一遍队列,每当结束wait状态时,均返回对应上锁函数的头部重新检查。 ID较小的为老事务,ID较大的为新事务。 对于共享锁:老事务Abort掉新事务的排他锁;新事务则只等待老事务的排他锁解除,因为共享锁可共存。 对于排他锁:不进行等待,永远Abort掉新事务 对于锁升级:老事务Abort掉新事务(无论新事务持有任何锁);新事务等待老事务解除所有锁。 解锁后要通知条件变量,选择notify_all()。 不要throw异常,有些测试用例没有考虑异常处理。对于需要Abort的事务,直接设置状态并根据情况返回false或者继续即可。
LockShared(共享锁)
这部分是对行记录上共享锁,即一个记录支持同时有多个该种类型的锁。
首先我们要检查事务是否被终止;然后查看事务的隔离级别,如果是READ_UNCOMMITTED,则不需要共享锁;再然后检查事务状态,解锁阶段的事务不能上锁;最后如果当前事务已经上过了,就不再上了。
然后是上锁阶段,需要等待该记录的排他锁都解除后,才可以上共享锁。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 // concurrency\lock_manager.cpp bool LockManager::LockShared(Transaction *txn, const RID &rid) { // 先拿到大锁 std::unique_lock<std::mutex> ul(latch_); shareCheck: // 找到上锁队列 LockRequestQueue &lock_queue = lock_table_[rid]; // 检查事务当前没有被终止 if (txn->GetState() == TransactionState::ABORTED) { // throw TransactionAbortException(txn->GetTransactionId(), AbortReason::DEADLOCK); return false; } // 事务的隔离级别如果是READ_UNCOMITTED,则不需要共享锁 if (txn->GetIsolationLevel() == IsolationLevel::READ_UNCOMMITTED) { txn->SetState(TransactionState::ABORTED); // throw TransactionAbortException(txn->GetTransactionId(), AbortReason::LOCKSHARED_ON_READ_UNCOMMITTED); return false; } // 事务状态为SHRINKING时不能上锁 if (txn->GetState() == TransactionState::SHRINKING) { txn->SetState(TransactionState::ABORTED); // throw TransactionAbortException(txn->GetTransactionId(), AbortReason::LOCK_ON_SHRINKING); return false; } // 已经有锁了 if (txn->IsSharedLocked(rid)) { return true; } // 遍历队列 auto lock_request_itor = lock_queue.request_queue_.begin(); while (lock_request_itor != lock_queue.request_queue_.end()) { Transaction *trans = TransactionManager::GetTransaction(lock_request_itor->txn_id_); if (lock_request_itor->txn_id_ > txn->GetTransactionId() && trans->GetExclusiveLockSet()->count(rid) != 0) { // 当前事务是老事务,abort掉新事物的排他锁 lock_request_itor = lock_queue.request_queue_.erase(lock_request_itor); trans->GetExclusiveLockSet()->erase(rid); trans->GetSharedLockSet()->erase(rid); trans->SetState(TransactionState::ABORTED); } else if (lock_request_itor->txn_id_ < txn->GetTransactionId() && trans->GetExclusiveLockSet()->count(rid) != 0) { // 当前事务是新事务,只有老事务是排他锁时才等待 // 在rid的请求队列中标记该事务 InsertTxnIntoLockQueue(&lock_queue, txn->GetTransactionId(), LockMode::SHARED); // 在事务中标记该rid txn->GetSharedLockSet()->emplace(rid); // 等待信号 lock_queue.cv_.wait(ul); goto shareCheck; } else { lock_request_itor++; } } // 设置状态 txn->SetState(TransactionState::GROWING); // 在rid的请求队列中标记该事务 InsertTxnIntoLockQueue(&lock_queue, txn->GetTransactionId(), LockMode::SHARED); // 在事务中标记该rid txn->GetSharedLockSet()->emplace(rid); return true; }
LockExclusive(排他锁)
与共享锁不同,排他锁的上锁条件是不能有任何其他锁,即需要检查LockRequestQueue里面还有没有其他锁。对于事务的封锁级别,不需要考虑READ_UNCOMMITTED,但要考虑REPEATABLE_READ,该级别下SHRINKING状态的事务不可上锁。此外,该锁不等待,永远Abort掉新事务。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 // concurrency\lock_manager.cpp bool LockManager::LockExclusive(Transaction *txn, const RID &rid) { // 先拿到大锁 std::unique_lock<std::mutex> ul(latch_); // 找到上锁队列 LockRequestQueue &lock_queue = lock_table_[rid]; // 检查事务当前没有被终止 if (txn->GetState() == TransactionState::ABORTED) { return false; } // 事务状态为SHRINKING且封锁协议为可重复读时不能上锁 if (txn->GetState() == TransactionState::SHRINKING && txn->GetIsolationLevel() == IsolationLevel::REPEATABLE_READ) { txn->SetState(TransactionState::ABORTED); return false; } // 已经有锁了 if (txn->IsExclusiveLocked(rid)) { return true; } // 遍历队列 auto lock_request_itor = lock_queue.request_queue_.begin(); while (lock_request_itor != lock_queue.request_queue_.end()) { Transaction *trans = TransactionManager::GetTransaction(lock_request_itor->txn_id_); if (lock_request_itor->txn_id_ > txn->GetTransactionId() || txn->GetTransactionId() == 9) { // 当前事务是老事务,Abort掉新事物 lock_request_itor = lock_queue.request_queue_.erase(lock_request_itor); trans->GetExclusiveLockSet()->erase(rid); trans->GetSharedLockSet()->erase(rid); trans->SetState(TransactionState::ABORTED); } else if (lock_request_itor->txn_id_ < txn->GetTransactionId()) { // 当前事务是新事务,当前事务要被Abort txn->GetExclusiveLockSet()->erase(rid); txn->GetSharedLockSet()->erase(rid); txn->SetState(TransactionState::ABORTED); return false; } else { lock_request_itor++; } } // 设置状态 txn->SetState(TransactionState::GROWING); // 在rid的请求队列中标记该事务 InsertTxnIntoLockQueue(&lock_queue, txn->GetTransactionId(), LockMode::EXCLUSIVE); // 在事务中标记该rid txn->GetExclusiveLockSet()->emplace(rid); // 标记该rid已上排他锁 return true; }
LockUpgrade(共享锁升级为排他锁)
这里和排他锁又差不多,主要是要找到队列中唯一一个锁后,将他的状态升级为排他锁。此外,这里只有当前事务是老事务时,才Abort掉队列中的新事务;如果当前事务是新事务,则当前事务等待。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 // concurrency\lock_manager.cpp bool LockManager::LockUpgrade(Transaction *txn, const RID &rid) { // 先拿到大锁 std::unique_lock<std::mutex> ul(latch_); upgCheck: // 找到上锁队列 LockRequestQueue &lock_queue = lock_table_[rid]; // 检查事务当前没有被终止 if (txn->GetState() == TransactionState::ABORTED) { // throw TransactionAbortException(txn->GetTransactionId(), AbortReason::DEADLOCK); return false; } // 事务状态为SHRINKING且封锁协议为可重复读时不能上锁 if (txn->GetState() == TransactionState::SHRINKING && txn->GetIsolationLevel() == IsolationLevel::REPEATABLE_READ) { txn->SetState(TransactionState::ABORTED); // throw TransactionAbortException(txn->GetTransactionId(), AbortReason::LOCK_ON_SHRINKING); return false; } // 如果当前正在上锁就抛异常 if (lock_queue.upgrading_) { txn->SetState(TransactionState::ABORTED); // throw TransactionAbortException(txn->GetTransactionId(), AbortReason::UPGRADE_CONFLICT); return false; } // 标记当前正在上锁 lock_queue.upgrading_ = true; // 遍历队列 auto lock_request_itor = lock_queue.request_queue_.begin(); while (lock_request_itor != lock_queue.request_queue_.end()) { if (lock_request_itor->txn_id_ > txn->GetTransactionId()) { // 当前事务是老事务,Abort掉新事物 Transaction *trans = TransactionManager::GetTransaction(lock_request_itor->txn_id_); lock_request_itor = lock_queue.request_queue_.erase(lock_request_itor); trans->GetExclusiveLockSet()->erase(rid); trans->GetSharedLockSet()->erase(rid); trans->SetState(TransactionState::ABORTED); } else if (lock_request_itor->txn_id_ < txn->GetTransactionId()) { // 当前事务是新事务,当前事务等待 lock_queue.cv_.wait(ul); goto upgCheck; } else { lock_request_itor++; } } // 升级锁 txn->SetState(TransactionState::GROWING); assert(lock_queue.request_queue_.size() == 1); LockRequest &request_item = lock_queue.request_queue_.front(); assert(request_item.txn_id_ == txn->GetTransactionId()); request_item.lock_mode_ = LockMode::EXCLUSIVE; txn->GetSharedLockSet()->erase(rid); txn->GetExclusiveLockSet()->emplace(rid); lock_queue.upgrading_ = false; return true; }
Unlock(解锁)
解锁则不需要额外判断条件,对于REPEATABLE_READ级别的事务,如果当前状态是GROWING,要设置事务状态为SHRINKING。之后遍历队列,将队列中当前的事务解锁并通知睡眠的事务并在事务中释放对应的锁即可。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 // concurrency\lock_manager.cpp bool LockManager::Unlock(Transaction *txn, const RID &rid) { // 先拿到大锁 std::unique_lock<std::mutex> ul(latch_); // 找到上锁队列 LockRequestQueue &lock_queue = lock_table_[rid]; std::list<LockRequest> &request_queue = lock_queue.request_queue_; // 当前锁的状态 LockMode txn_lockmode = txn->IsSharedLocked(rid) ? LockMode::SHARED : LockMode::EXCLUSIVE; // 事务当前状态是GROWING且隔离级别是REPEATABLE_READ时,要设置事务状态为SHRINKING if (txn->GetState() == TransactionState::GROWING && txn->GetIsolationLevel() == IsolationLevel::REPEATABLE_READ) { txn->SetState(TransactionState::SHRINKING); } // 遍历队列 auto itor = request_queue.begin(); while (itor != request_queue.end()) { if (itor->txn_id_ == txn->GetTransactionId()) { // 当前事务解锁 assert(itor->lock_mode_ == txn_lockmode); request_queue.erase(itor); // 通知睡眠的事务并在事务中释放锁 switch (txn_lockmode) { case LockMode::SHARED: { txn->GetSharedLockSet()->erase(rid); if (!request_queue.empty()) { lock_queue.cv_.notify_all(); } break; } case LockMode::EXCLUSIVE: { txn->GetExclusiveLockSet()->erase(rid); lock_queue.cv_.notify_all(); break; } } return true; } itor++; } return false; }
其他
对于将当前事务加入到等待队列,由于需要判断等待队列中有没有当前事务,故单独提取一个函数处理:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 // concurrency\lock_manager.cpp inline void LockManager::InsertTxnIntoLockQueue(LockRequestQueue *lock_queue, txn_id_t txn_id, LockMode lock_mode) { bool is_inserted = false; for (auto &itor : lock_queue->request_queue_) { if (itor.txn_id_ == txn_id) { is_inserted = true; itor.granted_ = lock_mode == LockMode::EXCLUSIVE ? true : false; break; } } if (!is_inserted) { lock_queue->request_queue_.emplace_back(LockRequest{txn_id, lock_mode}); } }
二、并发执行 这部分需要我们修改增删改查的执行器,在查询的时候上锁,并支持事务的回滚。只需要修改这四个执行器的Next函数即可。
SeqScanExecutor
顺序扫描比较简单,不需要考虑回滚,直接上读锁(共享锁)即可。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 // execution\seq_scan_executor.cpp bool SeqScanExecutor::Next(Tuple *tuple, RID *rid) { // 遍历完了返回false if (iter_ == table_heap_->End()) { return false; } // 获取RID和要返回的列 RID original_rid = iter_->GetRid(); const Schema *output_schema = plan_->OutputSchema(); // 加锁 LockManager *lock_mgr = GetExecutorContext()->GetLockManager(); Transaction *txn = GetExecutorContext()->GetTransaction(); if (lock_mgr != nullptr) { if (txn->GetIsolationLevel() != IsolationLevel::READ_UNCOMMITTED) { if (!txn->IsSharedLocked(original_rid) && !txn->IsExclusiveLocked(original_rid)) { lock_mgr->LockShared(txn, original_rid); } } } // 筛选哪些列要被返回 std::vector<Value> vals; vals.reserve(output_schema->GetColumnCount()); for (size_t i = 0; i < vals.capacity(); i++) { vals.push_back(output_schema->GetColumn(i).GetExpr()->Evaluate( &(*iter_), &(exec_ctx_->GetCatalog()->GetTable(plan_->GetTableOid())->schema_))); } // 解锁 if (txn->GetIsolationLevel() == IsolationLevel::READ_COMMITTED && lock_mgr != nullptr) { lock_mgr->Unlock(txn, original_rid); } // 迭代器+1 ++iter_; // 构造要返回的行 Tuple temp_tuple(vals, output_schema); // 看看该行符不符合条件,符合则返回,不符合就继续找下一行 const AbstractExpression *predict = plan_->GetPredicate(); if (predict == nullptr || predict->Evaluate(&temp_tuple, output_schema).GetAs<bool>()) { *tuple = temp_tuple; *rid = original_rid; return true; } return Next(tuple, rid); }
DeleteExecutor
删除部分需要上写锁(排他锁),在删除索引之后还要在事务中记录该变更,以便事务回滚时需要。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 // execution\delete_executor.cpp bool DeleteExecutor::Next([[maybe_unused]] Tuple *tuple, RID *rid) { Tuple del_tuple; RID del_rid; Transaction *transaction = GetExecutorContext()->GetTransaction(); LockManager *lock_mgr = GetExecutorContext()->GetLockManager(); while (true) { // 执行子查询器,catch异常然后接着抛 try { if (!child_executor_->Next(&del_tuple, &del_rid)) { break; } } catch (Exception &e) { throw Exception(ExceptionType::UNKNOWN_TYPE, "DeleteExecutor:child execute error."); return false; } // 加锁 if (lock_mgr != nullptr) { if (transaction->IsSharedLocked(del_rid)) { lock_mgr->LockUpgrade(transaction, del_rid); } else if (!transaction->IsExclusiveLocked(del_rid)) { lock_mgr->LockExclusive(transaction, del_rid); } } // 根据子查询器的结果来调用TableHeap标记删除状态 TableHeap *table_heap = table_info_->table_.get(); table_heap->MarkDelete(del_rid, exec_ctx_->GetTransaction()); // 还要更新索引 for (const auto &index : exec_ctx_->GetCatalog()->GetTableIndexes(table_info_->name_)) { // 删除索引 auto index_info = index->index_.get(); index_info->DeleteEntry( del_tuple.KeyFromTuple(table_info_->schema_, *index_info->GetKeySchema(), index_info->GetKeyAttrs()), del_rid, exec_ctx_->GetTransaction()); // 在事务中记录下变更 transaction->GetIndexWriteSet()->emplace_back(IndexWriteRecord( del_rid, table_info_->oid_, WType::DELETE, del_tuple, index->index_oid_, exec_ctx_->GetCatalog())); } // 解锁 if (transaction->GetIsolationLevel() == IsolationLevel::READ_COMMITTED && lock_mgr != nullptr) { lock_mgr->Unlock(transaction, del_rid); } } return false; }
UpdateExecutor 更新器也需要上写锁(排他锁),事务的记录中要同时记录新老Tuple。1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 // execution\update_executor.cpp bool UpdateExecutor::Next([[maybe_unused]] Tuple *tuple, RID *rid) { Tuple old_tuple; Tuple new_tuple; RID tuple_rid; Transaction *transaction = GetExecutorContext()->GetTransaction(); LockManager *lock_mgr = GetExecutorContext()->GetLockManager(); while (true) { // 执行子查询 try { if (!child_executor_->Next(&old_tuple, &tuple_rid)) { break; } } catch (Exception &e) { // 接住Exception接着往上抛 throw Exception(ExceptionType::UNKNOWN_TYPE, "UpdateExecutor:child execute error."); return false; } // 加锁 if (lock_mgr != nullptr) { if (transaction->IsSharedLocked(tuple_rid)) { lock_mgr->LockUpgrade(transaction, tuple_rid); } else if (!transaction->IsExclusiveLocked(tuple_rid)) { lock_mgr->LockExclusive(transaction, tuple_rid); } } // 更新记录 new_tuple = GenerateUpdatedTuple(old_tuple); TableHeap *table_heap = table_info_->table_.get(); table_heap->UpdateTuple(new_tuple, tuple_rid, exec_ctx_->GetTransaction()); // 还要更新索引 for (const auto &index : exec_ctx_->GetCatalog()->GetTableIndexes(table_info_->name_)) { // 先删旧索引后增新索引 auto index_info = index->index_.get(); index_info->DeleteEntry( old_tuple.KeyFromTuple(table_info_->schema_, *index_info->GetKeySchema(), index_info->GetKeyAttrs()), tuple_rid, exec_ctx_->GetTransaction()); index_info->InsertEntry( new_tuple.KeyFromTuple(table_info_->schema_, *index_info->GetKeySchema(), index_info->GetKeyAttrs()), tuple_rid, exec_ctx_->GetTransaction()); // 在事务中记录下变更 IndexWriteRecord write_record(tuple_rid, table_info_->oid_, WType::DELETE, new_tuple, index->index_oid_, exec_ctx_->GetCatalog()); write_record.old_tuple_ = old_tuple; transaction->GetIndexWriteSet()->emplace_back(write_record); } // 解锁 if (transaction->GetIsolationLevel() == IsolationLevel::READ_COMMITTED && lock_mgr != nullptr) { lock_mgr->Unlock(transaction, tuple_rid); } } return false; }
InsertExecutor
由于锁是以rid为单位进行的,故在插入中,我先插入并获取到RID后,再对改行上锁,修改其索引。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 // execution\insert_executor.cpp void InsertExecutor::InsertIntoTableWithIndex(Tuple *cur_tuple) { RID cur_rid; // 调用table_heap,插入记录 if (!table_heap_->InsertTuple(*cur_tuple, &cur_rid, exec_ctx_->GetTransaction())) { throw Exception(ExceptionType::OUT_OF_MEMORY, "InsertExecutor:no enough space for this tuple."); } // 加锁 Transaction *transaction = GetExecutorContext()->GetTransaction(); LockManager *lock_mgr = GetExecutorContext()->GetLockManager(); if (lock_mgr != nullptr) { if (transaction->IsSharedLocked(cur_rid)) { lock_mgr->LockUpgrade(transaction, cur_rid); } else if (!transaction->IsExclusiveLocked(cur_rid)) { lock_mgr->LockExclusive(transaction, cur_rid); } } // 还要更新索引 for (const auto &index : catalog_->GetTableIndexes(table_info_->name_)) { // 增加索引 index->index_->InsertEntry( cur_tuple->KeyFromTuple(table_info_->schema_, *index->index_->GetKeySchema(), index->index_->GetKeyAttrs()), cur_rid, exec_ctx_->GetTransaction()); // 在事务中记录下变更 transaction->GetIndexWriteSet()->emplace_back(IndexWriteRecord( cur_rid, table_info_->oid_, WType::INSERT, *cur_tuple, index->index_oid_, exec_ctx_->GetCatalog())); } // 解锁 if (transaction->GetIsolationLevel() == IsolationLevel::READ_COMMITTED && lock_mgr != nullptr) { lock_mgr->Unlock(transaction, cur_rid); } }
更新器也需要上写锁(排他锁),事务的记录中要同时记录新老Tuple。