在21岁,遇到存在主义危机

广州的天气总是很闷热,希望下一场大雨。

下大雨的时候,窗外的世界在消失,白天和黑夜没有了分界。

什么都不在乎了,下大雨时那种世界末日的感觉。

如同百年孤独里,下的那场4年10个月零5天的雨,是一场自我审判。

人或多或少都会有这么些时刻的自我毁灭倾向。

存在主义危机 危机

想动笔 打了寥寥几个字 又躺下了 躺了几分钟 又爬起来写

写在前面的

文字是思想的凝结出的结晶

所以要写下来 沉淀

在有llm的时代 人写的东西 反而更有价值了

打工

奴隶制度被废除了,所以这里改为了人才市场

上班其实是奴隶市场从买断制改成了订阅制

去公司上班,感觉到累,其实也说不清楚是身体上的疲惫还是心理上的疲惫,或许二者皆有吧。

如果真的认真分析的话 其实身体上的累来源于2点 上下班通勤太累了 解决办法应该是坐万胜围的班车 下班也坐班车得了 晚点回也可以 睡在公司的可行性分析

真正干活的话 倒不是很累 把目标细化一点 然后一步一步干 记录干了什么 就行了 毕竟没有绩效考核 没有kpi 毕竟这也暂时只是实习而已 混个经历 最后也要交付一个可行的产品

luv

现在

身体上的累很好解释,早上起得早,晚上

mit6.824-interview

本文主要记录一些分布式系统的相关知识,以及mit6.824这个lab的一些内容
lab的话 网上资料太多了 主要是把课程网站给的材料都看一下 基本都能做
lab1 mapreduce 熟悉一下go语法 把rpc的消息struct定义一下就能做了 比较简单
lab2 raft 看那篇经典论文 照着论文的每一步来实现 当时也是看网上很多资料来实现的
lab3, lab4 主要是写kv存储系统了 调用raft来实现主从复制 还有shard controler 负载均衡

最悲伤的是 感觉做了之后 也全都忘记了所有细节😭

1.认识分布式系统

  • 什么是分布式系统
  • 为什么要分布式系统
  • 示例 搜索引擎(多数据中心,gfs,spanner,chubby,mapreduce,bigtable), 区块链
  • 挑战 (网络延迟, 部分失效,时钟问题)

顺序读写速度远高于随机读写

2.分布式系统模型

  • 两将军问题 (无解) 工程解就是tcp
  • 拜占庭将军问题

要设计一个分布式系统 要首先考虑实际的情况

2.1 系统模型

网络链路模型

  • reliable link 可靠链路
  • fair-loss link
  • 任意链路
    通过加密技术让任意链路变成公平损失链路

节点故障模型

  • fail-stop
  • fial-recover
  • 拜占庭故障

时间模型

  • 同步 有限时间内一定返回
  • 异步 不一定 (接近现实)

消息传递语义
如何避免多次进行同一操作

  • 幂等操作 (代价较高)
    用唯一标识符 来解决信息重复

3. 分布式数据基础

主从数据库 一个负责读 一个负责写
有些nosql 天生就是分布式的

分区

可拓展性

数据分区 (劣势是多表查询可能

  • 垂直分区 (联机分析处理)
  • 水平分区 (联机事务处理)(6.824做的就是水平分区)(范围分区 哈希分区 一致性哈希(用于解决哈希分区 增添节点 需要更改哈希函数的问题) 还能用虚拟节点 来对不同的机器划分)

复制

好处 可用和安全 减少往返时间 吞吐量

  • 单主复制(同步复制 异步复制 半同步复制)(简单 仅在主节点进行写操作 保证操作顺序 可拓展)(大量写操作 难拓展 宕机时 难以立马恢复)(自动切换可能产生脑裂 手动切换比较慢)

  • 多主复制 (避免冲突 通过一个哈希函数将所有的请求路由到相同的主节点)(客户端解决冲突 像极了git merge🤣 最后写入胜利 因果关系追踪)

  • 无主复制(客户端向多个节点发写请求 向多个节点读)(读修复 反熵修复) (基于quorum机制
    w+r > n 且 w > n / 2)

cap定理

consistency availability partition tolerance
线性一致性读 读操作返回最近一次写操作的结果
一个简单的证明是 v1和v2根本连不上 写在v1 读在v2 不可能满足一致性

  • pacelc if p ac else lc
  • base 舍弃强一致性 追求最终一致性

线性一致性

给定一个执行历史,将并发操作拓展为多个顺序操作,有一个合法就行

实现线性一致性 加锁
cas操作
大多数情况可能没必要 因为cpu访问内存都没有做到

顺序一致性
同一客户端的操作顺序不变
cpu访问内存同样没有做到

因果一致性
最终一致性
以客户端为中心的一致性

隔离级别

串行化
可重复读
快照隔离
读已提交
读未提交

脏写
脏读
。。。

4.分布式共识

状态机复制 多日志副本

flp不可能定理

完全异步的系统中,即使只有一个节点故障,也不存在一个算法让系统达成共识

所以要么牺牲共识算法的一个属性,要么放宽异步网络假设

绕过flp

  • 故障屏蔽 假设坏了的能修好
  • 故障检测器(每一个故障都能被一些故障检测器 检测出来 正确的不会被怀疑)
  • 随机性算法(类似于区块链 具体不懂 )

同步系统
f < N f +1轮信息总能达成共识

paxos

只有一种共识协议,就是paxos
只说basic paxos
multi-paxos 的变种就是raft

客户端 提议者 接受者 学习者
两阶段协议
phase 1a
广播提案编号 超过半数
phase 1b
大于就回复 小于就拒绝 回复附上已经接受的提案值和编号
phase 2a
把提案值和编号广播出去
phase 2b
如果有一半接受者接受 就接受

活锁
用随机值 避免冲突

raft

选举

3个状态
heartbeat
安全性和活性 有且只有一个领导 只投一次票 随机化开始
选举过程比较简单 不说了
根据任期号来投票

日志复制

索引
任期号
命令
用index和term来唯一确定一个日志
反复发送
判断leadercommit
用prelogindex 和 prevlogterm 来填补缺失 一致性检查

rpc

AppendEntries RPC
RequestVote RPC

领导者更替

不会在一开始就执行清理操作 在正常情况下执行清理操作 (因为有些节点没恢复)
发送no-op来保证自己的权威 让其他节点应用已经commit了的日志

清理不一致日志

用nextIndex[]来为follow 补全日志

客户端协议

命令加唯一标识符

实现线性一致性

读操作当写操作 (开销大)
所以用租约 start + electionTimeout 之前一定是leader

配置变更

这个部分mit6.824没要求
大致是两阶段协议 避免脑裂
还有进行pre-vote 只有在自己有可能赢的时候 发起选举
建议一次进行一个节点的变更

日志压缩

基于内存的状态机的压缩
基于磁盘的状态机的压缩

性能优化

批处理
quorum大小优化 不一定要满足半数

拜占庭容错和PBFT算法

太复杂了
暂时没有需要写的场景

分布式事务

ACID 一致 原子 隔离 持久

原子提交

wal和日志 机械硬盘能保证512B的原子写

两阶段提交

  • 好了吗
  • 干活
  • 干完了
    协调者故障就会有问题

三阶段提交

在加一轮
如果协调者故障 就在参与者里面再选一个
延迟较长 基本不用

基于paxos的提交协议 和 基于quorum的提交协议

用paxos让协调者更能容错

还有基于quorum

saga事务

有趣的思想
例如 保险公司理赔
把大事务拆成一系列小事务 小事务要么一起完成 要么都不完成

并发控制

悲观并发控制

简单来说 就是都上锁
两阶段锁
死锁避免 银行家算法 或者使用行锁 对资源排序 一定次序上锁
死锁预防 通俗的话来讲就是 已经被上锁的资源 1个进程要访问 就退出一个 把以前的资源都释放掉
死锁检测 出现有向无环图 回退

乐观并发控制

基于检查

基于时间戳
别读到未来的
别修改未来的
难以保证精准的时间源

多版本并发控制

就是多加个版本 就能读到以前的了

仅追加

垃圾回收
如果不用了的版本 就回收掉 不同级别

GFS

单个master来处理

mapreduce

mysql innodb

![截屏2024-03-05 14.37.52](mysql-innodb/截屏2024-03-05 14.37.52.png)

![截屏2024-03-05 14.39.09](mysql-innodb/截屏2024-03-05 14.39.09.png)

MVCC

常用的进程通信方式有管道、命名管道、命名字 、 TCP/IP套接字 ,UNIX域套接字

![截屏2024-03-05 17.57.35](mysql-innodb/截屏2024-03-05 17.57.35.png)

![截屏2024-03-05 18.05.23](mysql-innodb/截屏2024-03-05 18.05.23.png)

![截屏2024-03-05 18.05.34](mysql-innodb/截屏2024-03-05 18.05.34.png)

![截屏2024-03-05 18.06.23](mysql-innodb/截屏2024-03-05 18.06.23.png)

![截屏2024-03-05 18.07.26](mysql-innodb/截屏2024-03-05 18.07.26.png)

![截屏2024-03-05 18.08.26](mysql-innodb/截屏2024-03-05 18.08.26.png)

![截屏2024-03-05 18.09.01](mysql-innodb/截屏2024-03-05 18.09.01.png)

![截屏2024-03-05 18.57.54](mysql-innodb/截屏2024-03-05 18.57.54.png)

cmu15445interview

本文目的在于记录复盘做15445这个项目的经验总结 以及应付面试

分成2个部分 lab实现 还有 这门课的一些重要知识

先直接说5个lab

lab0

用cpp实现字典树 和下文基本没关系 不细说了

lab1

实现3个东西

可拓展哈希表

在理解可扩展哈希表之前,我们需要了解几个概念。

  • Directory:是存放bucket指针的容器,可动态生长(以原大小的倍数作为增长率),容器的每个元素可用哈希值来索引。
  • Bucket:桶。存放Key/value pair的桶,数据结构层面是一个线性表。

LRU 力扣原题

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
class Node {
int key;
int val;
Node prev;
Node next;

Node() {
}

Node(int key, int val) {
this.key = key;
this.val = val;
}
}

class LRUCache {
private Map<Integer, Node> cache = new HashMap<>();
private Node head = new Node();
private Node tail = new Node();
private int capacity;
private int size;

public LRUCache(int capacity) {
this.capacity = capacity;
head.next = tail;
tail.prev = head;
}

public int get(int key) {
if (!cache.containsKey(key)) {
return -1;
}
Node node = cache.get(key);
moveToHead(node);
return node.val;
}

public void put(int key, int value) {
if (cache.containsKey(key)) {
Node node = cache.get(key);
node.val = value;
moveToHead(node);
} else {
Node node = new Node(key, value);
cache.put(key, node);
addToHead(node);
++size;
if (size > capacity) {
node = removeTail();
cache.remove(node.key);
--size;
}
}
}

private void moveToHead(Node node) {
removeNode(node);
addToHead(node);
}

private void removeNode(Node node) {
node.prev.next = node.next;
node.next.prev = node.prev;
}

private void addToHead(Node node) {
node.next = head.next;
node.prev = head;
head.next = node;
node.next.prev = node;
}

private Node removeTail() {
Node node = tail.prev;
removeNode(node);
return node;
}
}

/**
* Your LRUCache object will be instantiated and called as such:
* LRUCache obj = new LRUCache(capacity);
* int param_1 = obj.get(key);
* obj.put(key,value);
*/


缓冲池的管理

这个任务需要完成缓冲池的管理,前面只是完成对页框的管理,但是访问的时候是使用页号访问的,第一次用页号访问的时候,页加载到页框中。页换出的时候,页框被复用换入的页复用。这个流程对于缓冲池的管理来说应该是透明的,缓冲池管理只需要使用页号访问和修改页

NewPage

  • 后面的函数都需要加锁
  • 获取一个新页的流程是:如果空闲列表没有空闲了,需要Evict一个页→牺牲的这个页如果是脏页的情况下,需要先落盘→如果空闲列表没有空闲,需要先获取一个新页号→从空闲列表取一个页号→设置新页→RecordAccess新页→pin新页
  • 这里为什么要pin新页,因为当外部调用这个函数的时候,就为了使用这个新页,那么这个新页就一定不能马上被牺牲的

FetchPage

  • 如果该页在内存中,很简单,只需要根据需求,记录一下这个事件,然后返回该页就行。否则,就按照NewPage函数执行,区别是,页号对应的页在磁盘上,需要用ReadPage从磁盘读到内存中

UnpinPage

  • unpin之后,如果还有其他人在使用该页,那么pin_count_应该是>0的,只有当pin_count为0时,该页可以设置为可牺牲的
  • 重要的是,需要维护is_dirty_标志位,只要之前任意一次pin之后,该页变脏了,此后一直是脏的,如果该页最后是干净的,可以不用写回磁盘

FlushPage

  • 调用WritePage写盘
  • 重置脏位
  • 当前的DeallocatePage没有实现,调用后实际上不会做事

DeletePage

  • 当某个页不再要被使用的时候,就可以不再追踪
  • 需要注意,如果该页是被pin住的状态,是不能被删除的

FlushAllPages

  • 把所有内存中的页刷盘

lab2

P2 b+tree

lab的整体架构很清晰。先实现internal_page、leaf_page这两个数据结构,作为B+树的内部节点和叶子节点。然后实现B+树的插入、删除,最后支持并发。

内部节点与叶子节点
在这一部分中,我们要实现一系列的小函数。整个过程比较繁琐,但难度不大。
在我最初的版本中,所有的搜索都是线性时间复杂度的。这样做实现简单,可以确保正确性。有些地方可以改成二分查找。

其实最主要看那本数据库系统概论的伪代码就能够实现了

b+树优化

更乐观的乐观策略

螃蟹法则的乐观锁策略:

读取数据:首先,从数据库中读取需要更新的数据。
执行更新操作:对读取的数据进行修改,并准备将更新后的数据写回数据库。
检查冲突:在提交更新之前,再次读取数据库中的数据,并比较更新前后的数据是否发生了变化。如果发现其他事务已经修改了数据,就意味着存在冲突。
处理冲突:如果检测到冲突,系统可以选择执行回滚操作,放弃当前的更新;或者根据具体情况采取其他的解决方案,例如重试操作、通知用户冲突发生等。
提交更新:如果没有冲突,就将更新后的数据写回数据库,并提交事务。

乐观策略失效的原因应该是这三点:测试量小、螃蟹协议太垃圾、读写锁拖后腿。

其中比较重要的是第二点:这个实验谈“乐观策略”时,大多数人都会实现成“基于螃蟹协议的乐观策略”。我们知道读写锁存在“读写冲突”的问题,螃蟹协议又带一种“尽可能多的收集锁”的保守倾向,这两点搭在一起会让乐观策略效果不明显。同时,cmu的测试量其实很小,生成的b+树只有两层高,绝大多数情况下,线程都存在一段时间,握有两层的锁。这样一来,重试线程会被大量乐观线程阻塞,新的乐观线程又会被重试线程阻塞。

lab3

最主要要先把框架看懂
value,tuple,table,column,schema

查询过程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
bustub> EXPLAIN SELECT * FROM __mock_table_1;
=== BINDER ===
BoundSelect {
table=BoundBaseTableRef { table=__mock_table_1, oid=0 },
columns=[__mock_table_1.colA, __mock_table_1.colB],
groupBy=[],
having=,
where=,
limit=,
offset=,
order_by=[],
is_distinct=false,
}
=== PLANNER ===
Projection { exprs=[#0.0, #0.1] } | (__mock_table_1.colA:INTEGER, __mock_table_1.colB:INTEGER)
MockScan { table=__mock_table_1 } | (__mock_table_1.colA:INTEGER, __mock_table_1.colB:INTEGER)
=== OPTIMIZER ===
MockScan { table=__mock_table_1 } | (__mock_table_1.colA:INTEGER, __mock_table_1.colB:INTEGER)

这段 SQL 查询的执行计划中包含了三个部分:BINDER、PLANNER 和 OPTIMIZER。

  1. BINDER:在这一阶段,查询中的各种符号被绑定到具体的表、列或其他对象。在这个查询中,__mock_table_1 被绑定到了一个基本表引用 BoundBaseTableRef,并且选择了该表的所有列 __mock_table_1.colA__mock_table_1.colB。其他部分,如分组、聚合、筛选条件等,都没有被设置。

  2. PLANNER:这一步骤中,执行计划被规划和生成。在这个查询中,使用了投影操作 Projection,表示查询结果中将会包含 __mock_table_1 的所有列。然后,使用了一个模拟扫描操作 MockScan,表示将会从 __mock_table_1 中获取数据。最后,显示了查询结果的列信息,即 __mock_table_1.colA:INTEGER__mock_table_1.colB:INTEGER

  3. OPTIMIZER:在这一步骤中,执行计划被优化以提高性能。在这个查询中,优化器没有进行额外的优化操作,因为查询已经相对简单,直接执行扫描操作即可。

综上所述,该查询执行计划会从表 __mock_table_1 中选择所有列,并将结果作为查询的输出。

AbstractExecutor
SQL拆分成一棵执行树之后,其中的节点的功能承担者,也是这次实验主要要完成的部分。

构造执行树时不会构造Executor,而是用后面的AbstractPlanNode来构造树,只有在执行这棵树的时候会初始化对应的Executor来执行。

1
2
3
4
5
6
7
8
9
10
11
// 每个执行器需要初始化的东西都不一样,一般是初始化指向表头的迭代器指针,或者自己定义的一些辅助循环的值
// 拿seq_scan来说,就需要在这里将迭代器指针指向表头。
void Init()

// 需要做到调用一次next就输出且只输出一组tuple的功能,输出是通过赋值*tuple参数,并return true
// 如果没有能输出的了,return false
void Next(Tuple *tuple, RID *rid)

// Schema相当于存储了表的表头名字,OutputSchema用来指出此节点需要输出哪些列(哪些表头需要考虑在内)
virtual const Schema *GetOutputSchema() = 0;

ExecutorContext
(AbstractExecutor构造函数的参数之一)

上下文信息,也就是说这次执行所用到的一些关键信息

1
2
3
// 这个Catalog至关重要,存储了一个数据库的全部表格信息的索引,提供了对表格的操作。
// 只有这个Catalog是我们可能会用到的,比如在seq_scan中需要利用它获取目标表
Catalog *catalog_;

AbstractPlanNode
(AbstractExecutor构造函数参数之二)

用于存储节点有关的信息,AbstractExecutor利用用里面的信息来完成任务。

1
2
3
4
5
6
7
8
// Schema是表每列的表头名字,OutputSchema用来指出此节点需要输出哪些列
Schema *OutputSchema()

// 获取孩子节点(我们实现executor的时候用不上,在执行树的时候才用得上)
// 在一些需要从子节点获取tuple的操作用得上,比如 join
AbstractPlanNode *GetChildAt(uint32_t child_idx)
std::vector<AbstractPlanNode *> &GetChildren()

例子:SeqScanPlanNode成员函数,AbstractExecutor可以在这里获取TableOid和Predicate

1
2
3
4
5
6
7
8
// 这个在ExecutionEngine里面用于判断当前节点的类型
PlanType GetType()

// Predicate:谓词,返回值全是真值的表达式,AbstractExpression就是一颗表达式树。
AbstractExpression *GetPredicate()

// 结合Catlog可以得到当前Executor需要的表格内容
table_oid_t GetTableOid()

AbstractExpression
表达式类,一颗表达式树中的节点,比如比较,聚合,或者常量,column元素。其中column表达式也作为了column类的成员之一。

不同的表达式实现的功能差距交大,这个是非常重要的一个类,每一个executor的代码都会用到。

ComparisonExpression:用于比较,实例化后做为predicate_谓词(下面例子中的predicate就是此类的实例化),或者having(在aggregation中会用到)。
返回的是一个装载Value类中的bool值,需要用value.GetAs()得到这个值,用于判断是否满足比较的条件(比较的细节:比如是>还是<,就不用我们关心了,交给Evaluate函数就行)

1
Value value = plan_->GetPredicate()->Evaluate(tuple, schema)

ColumnValueExpression:列元素的表达值,有一个很大的作用,它的Evaluate函数能返回当前tuple中哪一个value是对应这个column的。

1
2
3
4
5
6
7
8
9
 Value value = column.GetExpr()->Evaluate(tuple, schema);

// 或者判断当前join是左连接还是右链接,并根据传入的左右俩tuple返回连接对应的值
Value value = plan_->OutputSchema()->GetColumn(i).GetExpr()->EvaluateJoin(
&left_tuple_,
plan_->GetLeftPlan()->OutputSchema(),
&right_tuple_,
plan_->GetRightPlan()->OutputSchema());

ConstantValueExpression:常数表达式,返回值永远是一个常数。

AggregateValueExpression:在AggregateExecutor中用到,作用和ColumnValueExpression类似,用于找出属于当前column的这个值。(AggregateExecutor的测试函数在传入outputschme的时候,用于构造的colum不是ColumnValueExpression而是AggregateValueExpression,就是为了处理不同类型的输入参数。所以才能有以下的用法)

1
Value value = column.GetExpr()->EvaluateAggregate(temp.Key().group_bys_, temp.Val().aggregates_);

Value
(最小数据单位)

Index
索引保存的是(Tuple_key, RID)对,其中Tuple_key是根据传入的Tuple生成的。

也就是说索引是按照表中的每一行对应生成的。

在会对table造成修改的executor中会用到,通过提供的函数在修改table的时候顺便把对应的indx修改。

tuple
相当于表里面的一行,存储了一行value。长度由shema决定,每个column可以在tuple中对应一个value

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// 就是将数据二进制化或者反过来,用于存储
void SerializeTo(char *storage) const;
void DeserializeFrom(const char *storage);

// return RID of current tuple
inline RID GetRid()

// 返回数据指针
inline char *GetData()

// 返回tuple的长度(bits)
inline uint32_t GetLength()

// 返回指定colum_idx位置的值
Value GetValue(const Schema *schema, uint32_t column_idx)

// 其他
Tuple KeyFromTuple(const Schema &schema, const Schema &key_schema, const std::vector<uint32_t> &key_attrs);
inline bool IsNull(const Schema *schema, uint32_t column_idx)
inline bool IsAllocated() { return allocated_; }
std::string ToString(const Schema *schema) const;

TableHeap
相当于一张表本身

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
bool InsertTuple(const Tuple &tuple, RID *rid, Transaction *txn);

// MarkDelete标记需要删除的项,调用ApplyDelete删除
bool MarkDelete(const RID &rid, Transaction *txn);
void ApplyDelete(const RID &rid, Transaction *txn);

bool UpdateTuple(const Tuple &tuple, const RID &rid, Transaction *txn);

void RollbackDelete(const RID &rid, Transaction *txn);

// 获取Tuple
bool GetTuple(const RID &rid, Tuple *tuple, Transaction *txn);

//可以用迭代器访问Table
TableIterator Begin(Transaction *txn);
TableIterator End();

TableIterator
TableHeap的迭代器(指针),指向的是其中的Tuple,即可以当作Tuple指针来用。

Catalog
存储了一个数据库的全部表格信息的索引,提供了对表格的操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
// TableInfo装载某个table的相关信息,包括schema,name,table指针,oid
TableInfo *GetTable(table_oid_t table_oid)

// IndexInfo 同理
IndexInfo *GetIndex(table_oid_t table_oid)

IndexInfo *CreateIndex(...)

IndexInfo *GetIndex(index_oid_t index_oid)

std::vector<IndexInfo *> GetTableIndexes(const std::string &table_name)


Tableinfo

1
2
3
4
5
6
7
8
Schema schema_;

std::unique_ptr<TableHeap> table_;

const table_oid_t oid_;

const std::string name_;

Indexinfo

1
2
3
4
5
6
7
8
9
10
11
12
Schema key_schema_;

std::string name_;

std::unique_ptr<Index> index_;

index_oid_t index_oid_;

std::string table_name_;

const size_t key_size_;

schema
模式。每列都有一个表头(名字),shema就是一组表头的集合,拿来指明这张表有哪些列或者某个executer需要处理哪些列。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
const Column &GetColumn(const uint32_t col_idx)

uint32_t GetColIdx(const std::string &col_name)

const std::vector<uint32_t> &GetUnlinedColumns()

uint32_t GetColumnCount()

uint32_t GetUnlinedColumnCount()

// 返回单个tuple的长度(不是schema的长度哦)
inline uint32_t GetLength()

// inline bool IsInlined() const { return tuple_is_inlined_; }

column
相当于一列的名字(表头),也能调用其expression完成对应的操作(前面在AbstractExpression中有说)

1
std::string GetName()

然后就是实现各种executors了
seq_scan
seq_scan_executor构造函数需要将元信息TableInfo保存起来,之后用来查询。

Init这里扫描已经是最底层的算子了,所以没有子算子,需要保存的元信息中table_的迭代器

Next需要注意的是,迭代器中获取的tuple不一定是都能用的,检验is_deleted_标志位,如果是true,那么还需要遍历下一行行tuple和rid都需要返回,其中RID,包括page_id和slow_num,其中page_id就是页号,slow_num指的是这个tuple在这个页的偏移位置,在没有varchar的时候,每个tuple都是固定大小的,这样就可以简单通过固定偏移量,计算出tuple到底放在页的哪里

遍历表时,其中的每一个tuple都要经过plan_->GetPredicate()->Evaluate()判断是否满足条件。
拿到的tuple其实是TableHeap的迭代器,看作是一个指向tuple的指针。

1
2
Value Evaluate(const Tuple *tuple, const Schema *schema)

根据Evaluate的参数,直接传入temp不行,因为temp不等同于tuple指针,只是重载了->运算符。需要如下方式传入。

1
2
Value Evaluate(const Tuple *tuple, const Schema *schema)
plan_->GetPredicate()->Evaluate(&(*temp), &table_info_->schema_)

update + delete + insert
调用子节点的next获得要更新/删除/插入点的tuple,然后执行对应的操作。

IndexScan
IndexScan其实和SeqScan非常类似,SeqScan是扫描数据表。SeqScan先获取数据表,IndexScan那就先获取索引树。IndexScanPlanNode计划节点中,包含了索引的index_id:

1
2
IndexInfo *info = exec_ctx_->GetCatalog()->GetIndex(plan_->GetIndexOid());
auto tree_ = reinterpret_cast<BPlusTreeIndexForOneIntegerColumn *>(info->index_.get());

获取树的迭代器:

1
auto table_iter_ = tree_->GetBeginIterator();

回看一下Project 2,迭代器解引用返回的是什么?是不是包含RID?但是我们要得到的是tuple,很简单,从TableHeap获取:

1
2
3
4
auto table_heap_ = exec_ctx_->GetCatalog()->GetTable(info->table_name_)->table_.get();
*rid = (*table_iter_).second;
table_heap_->GetTuple(*rid, tuple, exec_ctx_->GetTransaction());

Task #2 - Aggregation & Join Executors
1.Aggregation

在 aggregation_executor.h中提供了SimpleAggregationHashTable,其中的主要成员是std::unordered_map ht_{};
AggregationExecutor中的成员变量:

1
2
3
4
5
6
7
8
/** The aggregation plan node */
const AggregationPlanNode *plan_;
/** The child executor that produces tuples over which the aggregation is computed */
std::unique_ptr<AbstractExecutor> child_;
/** Simple aggregation hash table */
SimpleAggregationHashTable aht_;
/** Simple aggregation hash table iterator */
SimpleAggregationHashTable::Iterator aht_iterator_;

join

有两张表,我们需要把他们连接起来,可以通过内外两层循环来将满足条件的记录连接起来,即左表每读取一条记录,右表就要完成一次读循环。
HashJoin
NestedLoopJoin需要两层循环,而HashJoin通过将一张表映射到哈希表中减少了一层循环。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
namespace bustub {
struct AggregateKey {
std::vector<Value> group_bys_;
auto operator==(const AggregateKey &other) const -> bool {
for (uint32_t i = 0; i < other.group_bys_.size(); i++) {
if (group_bys_[i].CompareEquals(other.group_bys_[i]) != CmpBool::CmpTrue) {
return false;
}
}
return true;
}
};
}
namespace std {
/** Implements std::hash on AggregateKey */
template <>
struct hash<bustub::AggregateKey> {
auto operator()(const bustub::AggregateKey &agg_key) const -> std::size_t {
size_t curr_hash = 0;
for (const auto &key : agg_key.group_bys_) {
if (!key.IsNull()) {
curr_hash = bustub::HashUtil::CombineHashes(curr_hash, bustub::HashUtil::HashValue(&key));
}
}
return curr_hash;
}
};
} // namespace std

Task #3 - Sort + Limit Executors and Top-N Optimization
比较简单 不详细说了 top-n 用堆

lab4

DBMS中实现一个Lock Manager,然后使用他来支持并发执行。锁管理器负责跟踪向事务发出的行级锁,并支持根据隔离级别适当的上或释放共享锁和排他锁。

LM的基本思想是,它维护一个关于活动事务当前持有的锁的内部数据结构,然后事务在访问数据项之前向LM发出锁请求,LM将根据情况决定锁授予该事务、阻止该事务还是终止该事务。

一、锁管理器与死锁预防

lock_manager.h中有一个lock_table_的哈希表,其存储了每个RID对应的LockRequestQueue队列。及对于每个RID,也就是每个tuple,都有一个对应的锁请求队列LockRequestQueue。

一个请求LockRequest主要记录有发出该请求的txn_id, 请求的是哪类的锁,以及该请求是否已经被授予。一个LockRequestQueue中可能有一个或多个请求已经被授予锁。

本实验需要支持事务的三种隔离级别:

READ_UNCOMMITED只有在需要时上写锁。
READ_COMMITTED要解决脏读的问题,解决方案就是读时上读锁,读完解读锁;写时上写锁,但等到commit时才解写锁;读时上读锁,读完解读锁。这样,永远不会读到未commit的数据,因为上面有写锁。
REPEATABLE_READ进一步打造可重复读。同一事务读两次数据的中途不想被其他事务的写干扰,这就需要用到巧妙的二段封锁协议(2PL)了:事务分为两个阶段(不考虑commit/abort),上锁阶段(GROWING)只上锁,解锁阶段(SHINKING)只解锁。这样,第二次读取时,前一次读取的读锁一定还在,避免了中途被修改。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// include\concurrency\lock_manager.h
class LockRequestQueue {
public:
std::list<LockRequest> request_queue_;
// for notifying blocked transactions on this rid
std::condition_variable cv_;
// txn_id of an upgrading transaction (if any)
// txn_id_t upgrading_ = INVALID_TXN_ID;
bool upgrading_ = false;
};

private:
/**
* 将事务添加到等待队列中
* @param lock_queue 等待队列
* @param txn_id 事务ID
* @param lock_mode 锁的类型
*/
inline void InsertTxnIntoLockQueue(LockRequestQueue *lock_queue, txn_id_t txn_id, LockMode lock_mode);
/** 全局锁 */
std::mutex latch_;
/** Lock table for lock requests. */
std::unordered_map<RID, LockRequestQueue> lock_table_;

死锁预防部分写的并不够清楚明白,很多方法都是按照测试用例推测出来的。目前想到的能够基本通过测试的策略如下:

无论锁已经上了还是等待中,所有请求都进入锁队列,并记录在事务的锁Set中。
上锁请求均要完整的遍历一遍队列,每当结束wait状态时,均返回对应上锁函数的头部重新检查。
ID较小的为老事务,ID较大的为新事务。
对于共享锁:老事务Abort掉新事务的排他锁;新事务则只等待老事务的排他锁解除,因为共享锁可共存。
对于排他锁:不进行等待,永远Abort掉新事务
对于锁升级:老事务Abort掉新事务(无论新事务持有任何锁);新事务等待老事务解除所有锁。
解锁后要通知条件变量,选择notify_all()。
不要throw异常,有些测试用例没有考虑异常处理。对于需要Abort的事务,直接设置状态并根据情况返回false或者继续即可。

  1. LockShared(共享锁)

这部分是对行记录上共享锁,即一个记录支持同时有多个该种类型的锁。

首先我们要检查事务是否被终止;然后查看事务的隔离级别,如果是READ_UNCOMMITTED,则不需要共享锁;再然后检查事务状态,解锁阶段的事务不能上锁;最后如果当前事务已经上过了,就不再上了。

然后是上锁阶段,需要等待该记录的排他锁都解除后,才可以上共享锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
// concurrency\lock_manager.cpp
bool LockManager::LockShared(Transaction *txn, const RID &rid) {
// 先拿到大锁
std::unique_lock<std::mutex> ul(latch_);
shareCheck:
// 找到上锁队列
LockRequestQueue &lock_queue = lock_table_[rid];
// 检查事务当前没有被终止
if (txn->GetState() == TransactionState::ABORTED) {
// throw TransactionAbortException(txn->GetTransactionId(), AbortReason::DEADLOCK);
return false;
}
// 事务的隔离级别如果是READ_UNCOMITTED,则不需要共享锁
if (txn->GetIsolationLevel() == IsolationLevel::READ_UNCOMMITTED) {
txn->SetState(TransactionState::ABORTED);
// throw TransactionAbortException(txn->GetTransactionId(), AbortReason::LOCKSHARED_ON_READ_UNCOMMITTED);
return false;
}
// 事务状态为SHRINKING时不能上锁
if (txn->GetState() == TransactionState::SHRINKING) {
txn->SetState(TransactionState::ABORTED);
// throw TransactionAbortException(txn->GetTransactionId(), AbortReason::LOCK_ON_SHRINKING);
return false;
}
// 已经有锁了
if (txn->IsSharedLocked(rid)) {
return true;
}
// 遍历队列
auto lock_request_itor = lock_queue.request_queue_.begin();
while (lock_request_itor != lock_queue.request_queue_.end()) {
Transaction *trans = TransactionManager::GetTransaction(lock_request_itor->txn_id_);
if (lock_request_itor->txn_id_ > txn->GetTransactionId() && trans->GetExclusiveLockSet()->count(rid) != 0) {
// 当前事务是老事务,abort掉新事物的排他锁
lock_request_itor = lock_queue.request_queue_.erase(lock_request_itor);
trans->GetExclusiveLockSet()->erase(rid);
trans->GetSharedLockSet()->erase(rid);
trans->SetState(TransactionState::ABORTED);
} else if (lock_request_itor->txn_id_ < txn->GetTransactionId() && trans->GetExclusiveLockSet()->count(rid) != 0) {
// 当前事务是新事务,只有老事务是排他锁时才等待
// 在rid的请求队列中标记该事务
InsertTxnIntoLockQueue(&lock_queue, txn->GetTransactionId(), LockMode::SHARED);
// 在事务中标记该rid
txn->GetSharedLockSet()->emplace(rid);
// 等待信号
lock_queue.cv_.wait(ul);
goto shareCheck;
} else {
lock_request_itor++;
}
}
// 设置状态
txn->SetState(TransactionState::GROWING);
// 在rid的请求队列中标记该事务
InsertTxnIntoLockQueue(&lock_queue, txn->GetTransactionId(), LockMode::SHARED);
// 在事务中标记该rid
txn->GetSharedLockSet()->emplace(rid);
return true;
}
  1. LockExclusive(排他锁)

与共享锁不同,排他锁的上锁条件是不能有任何其他锁,即需要检查LockRequestQueue里面还有没有其他锁。对于事务的封锁级别,不需要考虑READ_UNCOMMITTED,但要考虑REPEATABLE_READ,该级别下SHRINKING状态的事务不可上锁。此外,该锁不等待,永远Abort掉新事务。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49

// concurrency\lock_manager.cpp
bool LockManager::LockExclusive(Transaction *txn, const RID &rid) {
// 先拿到大锁
std::unique_lock<std::mutex> ul(latch_);
// 找到上锁队列
LockRequestQueue &lock_queue = lock_table_[rid];
// 检查事务当前没有被终止
if (txn->GetState() == TransactionState::ABORTED) {
return false;
}
// 事务状态为SHRINKING且封锁协议为可重复读时不能上锁
if (txn->GetState() == TransactionState::SHRINKING && txn->GetIsolationLevel() == IsolationLevel::REPEATABLE_READ) {
txn->SetState(TransactionState::ABORTED);
return false;
}
// 已经有锁了
if (txn->IsExclusiveLocked(rid)) {
return true;
}
// 遍历队列
auto lock_request_itor = lock_queue.request_queue_.begin();
while (lock_request_itor != lock_queue.request_queue_.end()) {
Transaction *trans = TransactionManager::GetTransaction(lock_request_itor->txn_id_);
if (lock_request_itor->txn_id_ > txn->GetTransactionId() || txn->GetTransactionId() == 9) {
// 当前事务是老事务,Abort掉新事物
lock_request_itor = lock_queue.request_queue_.erase(lock_request_itor);
trans->GetExclusiveLockSet()->erase(rid);
trans->GetSharedLockSet()->erase(rid);
trans->SetState(TransactionState::ABORTED);
} else if (lock_request_itor->txn_id_ < txn->GetTransactionId()) {
// 当前事务是新事务,当前事务要被Abort
txn->GetExclusiveLockSet()->erase(rid);
txn->GetSharedLockSet()->erase(rid);
txn->SetState(TransactionState::ABORTED);
return false;
} else {
lock_request_itor++;
}
}
// 设置状态
txn->SetState(TransactionState::GROWING);
// 在rid的请求队列中标记该事务
InsertTxnIntoLockQueue(&lock_queue, txn->GetTransactionId(), LockMode::EXCLUSIVE);
// 在事务中标记该rid
txn->GetExclusiveLockSet()->emplace(rid);
// 标记该rid已上排他锁
return true;
}
  1. LockUpgrade(共享锁升级为排他锁)

这里和排他锁又差不多,主要是要找到队列中唯一一个锁后,将他的状态升级为排他锁。此外,这里只有当前事务是老事务时,才Abort掉队列中的新事务;如果当前事务是新事务,则当前事务等待。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
// concurrency\lock_manager.cpp
bool LockManager::LockUpgrade(Transaction *txn, const RID &rid) {
// 先拿到大锁
std::unique_lock<std::mutex> ul(latch_);
upgCheck:
// 找到上锁队列
LockRequestQueue &lock_queue = lock_table_[rid];
// 检查事务当前没有被终止
if (txn->GetState() == TransactionState::ABORTED) {
// throw TransactionAbortException(txn->GetTransactionId(), AbortReason::DEADLOCK);
return false;
}
// 事务状态为SHRINKING且封锁协议为可重复读时不能上锁
if (txn->GetState() == TransactionState::SHRINKING && txn->GetIsolationLevel() == IsolationLevel::REPEATABLE_READ) {
txn->SetState(TransactionState::ABORTED);
// throw TransactionAbortException(txn->GetTransactionId(), AbortReason::LOCK_ON_SHRINKING);
return false;
}
// 如果当前正在上锁就抛异常
if (lock_queue.upgrading_) {
txn->SetState(TransactionState::ABORTED);
// throw TransactionAbortException(txn->GetTransactionId(), AbortReason::UPGRADE_CONFLICT);
return false;
}
// 标记当前正在上锁
lock_queue.upgrading_ = true;
// 遍历队列
auto lock_request_itor = lock_queue.request_queue_.begin();
while (lock_request_itor != lock_queue.request_queue_.end()) {
if (lock_request_itor->txn_id_ > txn->GetTransactionId()) {
// 当前事务是老事务,Abort掉新事物
Transaction *trans = TransactionManager::GetTransaction(lock_request_itor->txn_id_);
lock_request_itor = lock_queue.request_queue_.erase(lock_request_itor);
trans->GetExclusiveLockSet()->erase(rid);
trans->GetSharedLockSet()->erase(rid);
trans->SetState(TransactionState::ABORTED);
} else if (lock_request_itor->txn_id_ < txn->GetTransactionId()) {
// 当前事务是新事务,当前事务等待
lock_queue.cv_.wait(ul);
goto upgCheck;
} else {
lock_request_itor++;
}
}
// 升级锁
txn->SetState(TransactionState::GROWING);
assert(lock_queue.request_queue_.size() == 1);
LockRequest &request_item = lock_queue.request_queue_.front();
assert(request_item.txn_id_ == txn->GetTransactionId());
request_item.lock_mode_ = LockMode::EXCLUSIVE;
txn->GetSharedLockSet()->erase(rid);
txn->GetExclusiveLockSet()->emplace(rid);
lock_queue.upgrading_ = false;
return true;
}
  1. Unlock(解锁)

解锁则不需要额外判断条件,对于REPEATABLE_READ级别的事务,如果当前状态是GROWING,要设置事务状态为SHRINKING。之后遍历队列,将队列中当前的事务解锁并通知睡眠的事务并在事务中释放对应的锁即可。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
// concurrency\lock_manager.cpp
bool LockManager::Unlock(Transaction *txn, const RID &rid) {
// 先拿到大锁
std::unique_lock<std::mutex> ul(latch_);
// 找到上锁队列
LockRequestQueue &lock_queue = lock_table_[rid];
std::list<LockRequest> &request_queue = lock_queue.request_queue_;
// 当前锁的状态
LockMode txn_lockmode = txn->IsSharedLocked(rid) ? LockMode::SHARED : LockMode::EXCLUSIVE;
// 事务当前状态是GROWING且隔离级别是REPEATABLE_READ时,要设置事务状态为SHRINKING
if (txn->GetState() == TransactionState::GROWING && txn->GetIsolationLevel() == IsolationLevel::REPEATABLE_READ) {
txn->SetState(TransactionState::SHRINKING);
}
// 遍历队列
auto itor = request_queue.begin();
while (itor != request_queue.end()) {
if (itor->txn_id_ == txn->GetTransactionId()) {
// 当前事务解锁
assert(itor->lock_mode_ == txn_lockmode);
request_queue.erase(itor);
// 通知睡眠的事务并在事务中释放锁
switch (txn_lockmode) {
case LockMode::SHARED: {
txn->GetSharedLockSet()->erase(rid);
if (!request_queue.empty()) {
lock_queue.cv_.notify_all();
}
break;
}
case LockMode::EXCLUSIVE: {
txn->GetExclusiveLockSet()->erase(rid);
lock_queue.cv_.notify_all();
break;
}
}
return true;
}
itor++;
}
return false;
}
  1. 其他

对于将当前事务加入到等待队列,由于需要判断等待队列中有没有当前事务,故单独提取一个函数处理:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// concurrency\lock_manager.cpp
inline void LockManager::InsertTxnIntoLockQueue(LockRequestQueue *lock_queue, txn_id_t txn_id, LockMode lock_mode) {
bool is_inserted = false;
for (auto &itor : lock_queue->request_queue_) {
if (itor.txn_id_ == txn_id) {
is_inserted = true;
itor.granted_ = lock_mode == LockMode::EXCLUSIVE ? true : false;
break;
}
}
if (!is_inserted) {
lock_queue->request_queue_.emplace_back(LockRequest{txn_id, lock_mode});
}
}

二、并发执行
这部分需要我们修改增删改查的执行器,在查询的时候上锁,并支持事务的回滚。只需要修改这四个执行器的Next函数即可。

  1. SeqScanExecutor

顺序扫描比较简单,不需要考虑回滚,直接上读锁(共享锁)即可。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
// execution\seq_scan_executor.cpp
bool SeqScanExecutor::Next(Tuple *tuple, RID *rid) {
// 遍历完了返回false
if (iter_ == table_heap_->End()) {
return false;
}

// 获取RID和要返回的列
RID original_rid = iter_->GetRid();
const Schema *output_schema = plan_->OutputSchema();

// 加锁
LockManager *lock_mgr = GetExecutorContext()->GetLockManager();
Transaction *txn = GetExecutorContext()->GetTransaction();
if (lock_mgr != nullptr) {
if (txn->GetIsolationLevel() != IsolationLevel::READ_UNCOMMITTED) {
if (!txn->IsSharedLocked(original_rid) && !txn->IsExclusiveLocked(original_rid)) {
lock_mgr->LockShared(txn, original_rid);
}
}
}

// 筛选哪些列要被返回
std::vector<Value> vals;
vals.reserve(output_schema->GetColumnCount());
for (size_t i = 0; i < vals.capacity(); i++) {
vals.push_back(output_schema->GetColumn(i).GetExpr()->Evaluate(
&(*iter_), &(exec_ctx_->GetCatalog()->GetTable(plan_->GetTableOid())->schema_)));
}

// 解锁
if (txn->GetIsolationLevel() == IsolationLevel::READ_COMMITTED && lock_mgr != nullptr) {
lock_mgr->Unlock(txn, original_rid);
}

// 迭代器+1
++iter_;

// 构造要返回的行
Tuple temp_tuple(vals, output_schema);

// 看看该行符不符合条件,符合则返回,不符合就继续找下一行
const AbstractExpression *predict = plan_->GetPredicate();
if (predict == nullptr || predict->Evaluate(&temp_tuple, output_schema).GetAs<bool>()) {
*tuple = temp_tuple;
*rid = original_rid;
return true;
}
return Next(tuple, rid);
}
  1. DeleteExecutor

删除部分需要上写锁(排他锁),在删除索引之后还要在事务中记录该变更,以便事务回滚时需要。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
// execution\delete_executor.cpp
bool DeleteExecutor::Next([[maybe_unused]] Tuple *tuple, RID *rid) {
Tuple del_tuple;
RID del_rid;
Transaction *transaction = GetExecutorContext()->GetTransaction();
LockManager *lock_mgr = GetExecutorContext()->GetLockManager();
while (true) {
// 执行子查询器,catch异常然后接着抛
try {
if (!child_executor_->Next(&del_tuple, &del_rid)) {
break;
}
} catch (Exception &e) {
throw Exception(ExceptionType::UNKNOWN_TYPE, "DeleteExecutor:child execute error.");
return false;
}

// 加锁
if (lock_mgr != nullptr) {
if (transaction->IsSharedLocked(del_rid)) {
lock_mgr->LockUpgrade(transaction, del_rid);
} else if (!transaction->IsExclusiveLocked(del_rid)) {
lock_mgr->LockExclusive(transaction, del_rid);
}
}

// 根据子查询器的结果来调用TableHeap标记删除状态
TableHeap *table_heap = table_info_->table_.get();
table_heap->MarkDelete(del_rid, exec_ctx_->GetTransaction());

// 还要更新索引
for (const auto &index : exec_ctx_->GetCatalog()->GetTableIndexes(table_info_->name_)) {
// 删除索引
auto index_info = index->index_.get();
index_info->DeleteEntry(
del_tuple.KeyFromTuple(table_info_->schema_, *index_info->GetKeySchema(), index_info->GetKeyAttrs()), del_rid,
exec_ctx_->GetTransaction());
// 在事务中记录下变更
transaction->GetIndexWriteSet()->emplace_back(IndexWriteRecord(
del_rid, table_info_->oid_, WType::DELETE, del_tuple, index->index_oid_, exec_ctx_->GetCatalog()));
}

// 解锁
if (transaction->GetIsolationLevel() == IsolationLevel::READ_COMMITTED && lock_mgr != nullptr) {
lock_mgr->Unlock(transaction, del_rid);
}
}
return false;
}
  1. UpdateExecutor
    更新器也需要上写锁(排他锁),事务的记录中要同时记录新老Tuple。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    // execution\update_executor.cpp
    bool UpdateExecutor::Next([[maybe_unused]] Tuple *tuple, RID *rid) {
    Tuple old_tuple;
    Tuple new_tuple;
    RID tuple_rid;
    Transaction *transaction = GetExecutorContext()->GetTransaction();
    LockManager *lock_mgr = GetExecutorContext()->GetLockManager();
    while (true) {
    // 执行子查询
    try {
    if (!child_executor_->Next(&old_tuple, &tuple_rid)) {
    break;
    }
    } catch (Exception &e) { // 接住Exception接着往上抛
    throw Exception(ExceptionType::UNKNOWN_TYPE, "UpdateExecutor:child execute error.");
    return false;
    }

    // 加锁
    if (lock_mgr != nullptr) {
    if (transaction->IsSharedLocked(tuple_rid)) {
    lock_mgr->LockUpgrade(transaction, tuple_rid);
    } else if (!transaction->IsExclusiveLocked(tuple_rid)) {
    lock_mgr->LockExclusive(transaction, tuple_rid);
    }
    }

    // 更新记录
    new_tuple = GenerateUpdatedTuple(old_tuple);
    TableHeap *table_heap = table_info_->table_.get();
    table_heap->UpdateTuple(new_tuple, tuple_rid, exec_ctx_->GetTransaction());

    // 还要更新索引
    for (const auto &index : exec_ctx_->GetCatalog()->GetTableIndexes(table_info_->name_)) {
    // 先删旧索引后增新索引
    auto index_info = index->index_.get();
    index_info->DeleteEntry(
    old_tuple.KeyFromTuple(table_info_->schema_, *index_info->GetKeySchema(), index_info->GetKeyAttrs()),
    tuple_rid, exec_ctx_->GetTransaction());
    index_info->InsertEntry(
    new_tuple.KeyFromTuple(table_info_->schema_, *index_info->GetKeySchema(), index_info->GetKeyAttrs()),
    tuple_rid, exec_ctx_->GetTransaction());
    // 在事务中记录下变更
    IndexWriteRecord write_record(tuple_rid, table_info_->oid_, WType::DELETE, new_tuple, index->index_oid_,
    exec_ctx_->GetCatalog());
    write_record.old_tuple_ = old_tuple;
    transaction->GetIndexWriteSet()->emplace_back(write_record);
    }

    // 解锁
    if (transaction->GetIsolationLevel() == IsolationLevel::READ_COMMITTED && lock_mgr != nullptr) {
    lock_mgr->Unlock(transaction, tuple_rid);
    }
    }
    return false;
    }
  2. InsertExecutor

由于锁是以rid为单位进行的,故在插入中,我先插入并获取到RID后,再对改行上锁,修改其索引。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
// execution\insert_executor.cpp
void InsertExecutor::InsertIntoTableWithIndex(Tuple *cur_tuple) {
RID cur_rid;

// 调用table_heap,插入记录
if (!table_heap_->InsertTuple(*cur_tuple, &cur_rid, exec_ctx_->GetTransaction())) {
throw Exception(ExceptionType::OUT_OF_MEMORY, "InsertExecutor:no enough space for this tuple.");
}

// 加锁
Transaction *transaction = GetExecutorContext()->GetTransaction();
LockManager *lock_mgr = GetExecutorContext()->GetLockManager();
if (lock_mgr != nullptr) {
if (transaction->IsSharedLocked(cur_rid)) {
lock_mgr->LockUpgrade(transaction, cur_rid);
} else if (!transaction->IsExclusiveLocked(cur_rid)) {
lock_mgr->LockExclusive(transaction, cur_rid);
}
}

// 还要更新索引
for (const auto &index : catalog_->GetTableIndexes(table_info_->name_)) {
// 增加索引
index->index_->InsertEntry(
cur_tuple->KeyFromTuple(table_info_->schema_, *index->index_->GetKeySchema(), index->index_->GetKeyAttrs()),
cur_rid, exec_ctx_->GetTransaction());
// 在事务中记录下变更
transaction->GetIndexWriteSet()->emplace_back(IndexWriteRecord(
cur_rid, table_info_->oid_, WType::INSERT, *cur_tuple, index->index_oid_, exec_ctx_->GetCatalog()));
}

// 解锁
if (transaction->GetIsolationLevel() == IsolationLevel::READ_COMMITTED && lock_mgr != nullptr) {
lock_mgr->Unlock(transaction, cur_rid);
}
}

更新器也需要上写锁(排他锁),事务的记录中要同时记录新老Tuple。

cmu15445面试准备

本文目的在于记录复盘做15445这个项目的经验总结 以及应付面试
先直接说5个lab


lab0

lab1

lab2

lab3

最主要要把框架看懂
value,tuple,table,column,schema

AbstractExecutor
SQL拆分成一棵执行树之后,其中的节点的功能承担者,也是这次实验主要要完成的部分。

构造执行树时不会构造Executor,而是用后面的AbstractPlanNode来构造树,只有在执行这棵树的时候会初始化对应的Executor来执行。

1
2
3
4
5
6
7
8
9
10
11
// 每个执行器需要初始化的东西都不一样,一般是初始化指向表头的迭代器指针,或者自己定义的一些辅助循环的值
// 拿seq_scan来说,就需要在这里将迭代器指针指向表头。
void Init()

// 需要做到调用一次next就输出且只输出一组tuple的功能,输出是通过赋值*tuple参数,并return true
// 如果没有能输出的了,return false
void Next(Tuple *tuple, RID *rid)

// Schema相当于存储了表的表头名字,OutputSchema用来指出此节点需要输出哪些列(哪些表头需要考虑在内)
virtual const Schema *GetOutputSchema() = 0;

ExecutorContext
(AbstractExecutor构造函数的参数之一)

上下文信息,也就是说这次执行所用到的一些关键信息

1
2
3
// 这个Catalog至关重要,存储了一个数据库的全部表格信息的索引,提供了对表格的操作。
// 只有这个Catalog是我们可能会用到的,比如在seq_scan中需要利用它获取目标表
Catalog *catalog_;

AbstractPlanNode
(AbstractExecutor构造函数参数之二)

用于存储节点有关的信息,AbstractExecutor利用用里面的信息来完成任务。

1
2
3
4
5
6
7
8
// Schema是表每列的表头名字,OutputSchema用来指出此节点需要输出哪些列
Schema *OutputSchema()

// 获取孩子节点(我们实现executor的时候用不上,在执行树的时候才用得上)
// 在一些需要从子节点获取tuple的操作用得上,比如 join
AbstractPlanNode *GetChildAt(uint32_t child_idx)
std::vector<AbstractPlanNode *> &GetChildren()

例子:SeqScanPlanNode成员函数,AbstractExecutor可以在这里获取TableOid和Predicate

1
2
3
4
5
6
7
8
// 这个在ExecutionEngine里面用于判断当前节点的类型
PlanType GetType()

// Predicate:谓词,返回值全是真值的表达式,AbstractExpression就是一颗表达式树。
AbstractExpression *GetPredicate()

// 结合Catlog可以得到当前Executor需要的表格内容
table_oid_t GetTableOid()

lab4

延年益寿指南

算是一些备忘录

牙齿

智齿总是发炎的话 早点拔掉 长痛不如短痛
儿童时期应该早点做窝沟封闭 能防蛀
定期去洗牙 做检查
女生的话 怀孕时因激素变化 智齿容易发炎 建议早拔

new plan

现在暂时想着的是
新加坡读研 一年制 挺香的
人都是年纪越大越怕风险,越渴望安稳。

你十八岁时想要征服全世界的那股冒险精神会随着时间流逝而逐渐消散。

《万神殿》

万神殿

有几个很有意思的点
女主电脑后面的贴纸是nerv 是eva的机构
里面的劳伦很像素子
3个中国ui特别有意思