首页
Preview

在1000行代码内从头构建一个NoSQL数据库

照片由Pixabay提供

第一章

我是个个人项目的爱好者。多年来,我写过多篇与数据库相关的博客/GitHub项目。现在,我决定要让一个项目统领它们全部!

我已经用Go实现了我的数据库,并决定通过写一篇博客来分享我的知识,描述所有步骤。这篇博客相当长,所以我最初计划将其分为7部分系列。最终,我决定将其保持为一个长的博客文章,分成7个原始部分。这样,你可以停下来,然后轻松地回到每一章。

这个数据库故意设计得简单和最小,因此最重要的功能将会被包含在内,但代码仍然很短。

代码是用Go编写的,但它并不复杂,不熟悉它的程序员也能够理解。如果你知道Go,我鼓励你跟着一起工作!

整个代码库在GitHub上,当你完成后还有大约50个测试可以运行。在第二个存储库中,代码被分成了7个逻辑部分,每个部分在不同的文件夹中。

我们将在Go中创建一个简单的NoSQL数据库。我将介绍数据库的概念以及如何使用它们在Go中从头开始创建一个NoSQL键值数据库。我们将回答以下问题:

  • 什么是NoSQL?
  • 如何在磁盘上存储数据?
  • 磁盘数据库和内存数据库之间的区别
  • 如何制作索引?
  • 什么是ACID,事务如何工作?
  • 如何为最佳性能设计数据库?

第一部分将从我们将在数据库中使用的概念的概述开始,然后实现一个基本的向磁盘写入机制。

SQL vs NoSQL

数据库可以分为不同的类别。对我们来说相关的是关系型数据库(SQL)、键值存储和文档存储(这些被视为NoSQL)。最显着的区别是数据库使用的数据模型。

关系型数据库将数据组织成表格(或“关系”),由列和行组成,每行都有一个唯一的键来标识。行代表实体的实例(例如“商店”),列代表分配给该实例的值(例如“收入”或“支出”)。

在关系型数据库中,业务逻辑可能分布在整个数据库中。换句话说,单个对象的部分可能出现在数据库的不同表格中。我们可能有不同的表格来存储收入和支出,因此要从数据库中检索整个商店实体,我们必须查询两个表格。

键值和文档存储是不同的。单个实体的所有信息都集中保存在集合/桶中。以前面的例子为例,商店实体在单个实例中包含收入和支出,并驻留在商店集合中。

文档存储是键值存储的子类。在键值存储中,数据被认为对数据库是内在的,而文档导向的系统则依赖于文档的内部结构。例如,在文档存储中,可以通过内部字段(例如收入)查询所有商店,而键值只能通过其ID获取商店。

这些是基本的区别,但实际上,有更多的数据库类型和更多的原因来优选其中之一。

我们的数据库将是一个键值存储(不是文档存储),因为它的实现最简单和最直接。

基于磁盘的存储

数据库将其数据(集合、文档等)组织在数据库页面中。页面是数据库和磁盘之间交换的最小数据单元。拥有一个固定大小的工作单元很方便。此外,将相关数据放在相邻的位置上也是有意义的,这样可以一次性获取所有数据。

数据库页面在磁盘上连续存储,以最小化磁盘寻道。继续前面的例子,考虑一个包含8个商店的集合,其中单个页面由2个商店占用。在磁盘上,它看起来像下面这样:

MySQL的默认页面大小为16Kb,Postgres的页面大小为8Kb,而Apache Derby的页面大小为4Kb。

较大的页面大小会导致更好的性能,但它们有可能出现“撕裂页面”的情况。这是指在单个写事务的多个数据库页面的写入过程中系统崩溃的情况。

在选择实际数据库的页面大小时,会考虑这些因素。这些考虑因素对我们的数据库不相关,因此我们将任意选择我们的数据库页面大小为4KB。

底层数据结构

如前所述,数据库将其内部数据存储在许多页面上,这些页面存储在磁盘上。数据库使用数据结构来管理页面的数量。

数据库使用不同的数据结构来组织磁盘上的页面,主要有B/B+树和哈希桶。每个数据结构都有其自身的优点和优势:读/写性能、对更复杂的查询(如排序或范围扫描)的支持、易于实现等等。

我们将使用B树,因为它易于实现,但其原则与实际数据库中使用的原则相近。

我们的数据库

我们的数据库将是一个键值存储。其底层数据结构是B-Tree,每个页面的大小为4KB。它将具有以下架构:

数据库管理我们的程序,负责编排事务——即读写操作的序列。它还面向使用数据库的程序员,并为他们提供服务。

**数据访问层(DAL)**处理所有磁盘操作和数据在磁盘上的组织方式。它负责管理底层数据结构,将数据库页面写入磁盘,并回收空闲页面以避免碎片化。

开始编码

我们将从**数据访问层(DAL)**组件开始,从底层构建我们的数据库。我们将通过创建一个文件来开始编码。我们将拥有一个结构体、一个构造函数和一个关闭方法。

type dal struct {
	file *os.File
}

func newDal(path string) (*dal, error) {
	dal := &dal{}
	file, err := os.OpenFile(path, os.O_RDWR|os.O_CREATE, 0666)
	if err != nil {
		return nil, err
	}
	dal.file = file
	return dal, nil
}

func (d *dal) close() error {
	if d.file != nil {
		err := d.file.Close()
		if err != nil {
			return fmt.Errorf("could not close file: %s", err)
		}
		d.file = nil
	}
	return nil
}

数据库页面

DAL将管理读取和写入数据库页面。我们将在dal.go中添加一个page类型的结构体。它将包含一个数字,该数字用作唯一ID,但它的作用更大。该数字用于通过指针算术访问页面,例如(PageSize*pageNum)。我们还将在dal中添加PageSize属性。

type pgnum uint64

type page struct {
	num  pgnum
	data []byte
}

type dal struct {
	file     *os.File
	pageSize int
}

func newDal(path string, pageSize int) (*dal, error) {
	file, err := os.OpenFile(path, os.O_RDWR|os.O_CREATE, 0666)
	if err != nil {
		return nil, err
	}
	dal := &dal{
		file,
		pageSize,
	}
	return dal, nil
}

最后,我们将添加一个构造函数和它的读写操作。请注意,在第12和25行中,给定页面大小和页面编号,计算了文件中的正确偏移量。

func (d *dal) allocateEmptyPage() *page {
	return &page{
		data: make([]byte, d.pageSize),
	}
}

func (d *dal) readPage(pageNum pgnum) (*page, error) {
	p := d.allocateEmptyPage()

	// Notice how the correct offset calculation is performed
	// using the page number and page size 
	offset := int(pageNum) * d.pageSize
	
	// Then we read the data at the correct offset
	_, err := d.file.ReadAt(p.data, int64(offset))
	if err != nil {
		return nil, err
	}
	return p, err
}

func (d *dal) writePage(p *page) error {
	// Likewise, we calculate the correct offset
	// and write at the correct position
	offset := int64(p.num) * int64(d.pageSize)
	_, err := d.file.WriteAt(p.data, offset)
	return err
}

空闲列表

管理页面是一项复杂的任务。我们需要知道哪些页面是空闲的,哪些是占用的。如果页面变为空,页面也可以被释放,因此我们需要回收它们以供未来使用,以避免_碎片化_。

所有这些逻辑都由freelist管理。该组件是DAL的一部分。它具有一个名为maxPage的计数器,该计数器保持到目前为止分配的最高页面编号,并且具有一个名为releasedPages的属性,用于跟踪释放的页面。

在分配新页面时,首先要评估releasedPages以获取空闲页面。如果列表为空,则增加计数器,并提供新页面,从而增加文件大小。

灰色为占用页面。空闲页面为白色。

我们将从创建文件freelist.go开始,定义freelist类型并添加一个构造函数newFreeList。然后,我们需要添加getNextPagereleasePage。我们的数据库使用第一页来存储元数据(将在下一章节中解释),因此我们只能分配大于0的页面编号的新页面。

type freelist struct {
	maxPage       pgnum   // Holds the maximum page allocated. maxPage*PageSize = fileSize
	releasedPages []pgnum // Pages that were previouslly allocated but are now free
}

func newFreelist() *freelist {
	return &freelist{
		maxPage:       initialPage,
		releasedPages: []pgnum{},
	}
}

func (fr *freelist) getNextPage() pgnum {
	// If possible, fetch pages first from the released pages.
	// Else, increase the maximum page
	if len(fr.releasedPages) != 0 {
		pageID := fr.releasedPages[len(fr.releasedPages)-1]
		fr.releasedPages = fr.releasedPages[:len(fr.releasedPages)-1]
		return pageID
	}
	fr.maxPage += 1
	return fr.maxPage
}

func (fr *freelist) releasePage(page pgnum) {
	fr.releasedPages = append(fr.releasedPages, page)
}

最后,我们将在打开数据库时将freelist嵌入我们的dal结构中并创建一个实例。

type dal struct {
	file     *os.File
	pageSize int

	*freelist
}
func newDal(path string, pageSize int) (*dal, error) {
	file, err := os.OpenFile(path, os.O_RDWR|os.O_CREATE, 0666)
	if err != nil {
		return nil, err
	}
	
	dal := &dal{
		file,
		pageSize,
		newFreelist(),
	}
	return dal, nil
}

第1章结论

到目前为止,我们的数据库支持读取和写入页面。要测试我们的工作,我们可以创建一个主函数,写入一个页面并查看数据的存储方式。

package main

import "os"

func main() {
	// initialize db
	dal, _ := newDal("db.db", os.Getpagesize())

	// create a new page
	p := dal.allocateEmptyPage()
	p.num = dal.getNextPage()
	copy(p.data[:], "data")

	// commit it
	_ = dal.writePage(p)
}

命令hexdump-C标志一起用于显示二进制文件的内容。

前4096个字节(左侧的十六进制数为1000)保留用于元数据,而我们的数据紧随其后。

只要程序在运行,我们的数据库就能正常工作。如果程序被杀死,我们就无法知道哪些页面为空,哪些包含实际数据。首先,这意味着需要将freelist持久化到磁盘。这将是我们下一章的主题。

你可以在这里查看本章节完整的代码版本。

第2章

我们继续构建我们的数据库。在上一章中,我们查看了我们数据库的总体架构。

然后我们实现了处理磁盘操作的数据访问层。到目前为止,它支持:打开和关闭数据库文件,将数据写入磁盘页面并从中读取数据,以及使用freelist管理空和占用页面。

在本章中,我们将实现磁盘持久化。将数据库状态保存到磁盘上以便在重启之间正确运行是至关重要的。DAL将负责此任务。

我们将从将freelist的状态提交到磁盘开始。

开始编码

要保存freelist的状态,我们要将其作为数据库页面写入磁盘。在数据库中,通常有一个页面包含有关数据库的元数据。

它可以是必要页面的页面编号——在我们的情况下是freelist。它还可以包含魔术数字——用于识别文件格式的常量数字或文本值(我们将在最后一章中遇到它)。在我们的情况下,它将是两者兼而有之。

为了跟踪freelist页面编号,我们将添加一个名为meta的新页面。它是一个特殊页面,因为它是固定的,并位于编号0上。这样,在重启时,我们可以从页面编号0读取meta页面,并跟随它以加载freelist内容。

元页面持久化

在我们的数据库中,定义serializedeserialize方法以将实体转换为适合单个页面的原始数据是一种典型的模式。并使用读取和写入方法将其包装以返回实体本身。

我们将创建一个名为meta.go的新文件,定义一个新类型meta和一个构造函数。

我们需要将meta序列化为[]byte,以便我们可以将其写入磁盘。不幸的是,Go不支持像C那样轻松地将结构体转换为二进制,而不使用Unsafe包(由于明显的原因,我们不会使用它),因此我们必须自己实现它。

在每个方法中,我们将具有一个位置变量(pos),用于跟踪缓冲区中的位置。它类似于文件中的光标。

const (
	metaPageNum = 0
)

type meta struct {
	freelistPage pgnum
}

func newEmptyMeta() *meta {
	return &meta{}
}

func (m *meta) serialize(buf []byte) {
	pos := 0

	binary.LittleEndian.PutUint64(buf[pos:], uint64(m.freelistPage))
	pos += pageNumSize
}

func (m *meta) deserialize(buf []byte) {
	pos := 0
	m.freelistPage = pgnum(binary.LittleEndian.Uint64(buf[pos:]))
	pos += pageNumSize
}

我们还将创建一个名为const.go的新文件,用于保存大小并声明一个新的常量pageNumSize。它包含页面编号的大小(以字节为单位)。

const (
	pageNumSize = 8
)

const.go文件回答了存储每个变量需要多少字节的问题,而serialize和deserialize则负责解决将其存储在页面内的位置问题。

最后,我们将使用先前的元数据序列化/反序列化和读取/写入页面方法添加writeMetareadMeta方法。

func (d *dal) writeMeta(meta *meta) (*page, error) {
	p := d.allocateEmptyPage()
	p.num = metaPageNum
	meta.serialize(p.data)

	err := d.writePage(p)
	if err != nil {
		return nil, err
	}
	return p, nil
}

func (d *dal) readMeta() (*meta, error) {
	p, err := d.readPage(metaPageNum)
	if err != nil {
		return nil, err
	}

	meta := newEmptyMeta()
	meta.deserialize(p.data)
	return meta, nil
}

空闲列表持久化

我们已经定义了元页面,因此我们知道在哪里找到freelist数据。现在,我们需要添加读取/写入freelist本身的方法。与之前一样,我们将添加序列化和反序列化的方法。

const pageNumSize = 8

func (fr *freelist) serialize(buf []byte) []byte {
	pos := 0

	binary.LittleEndian.PutUint16(buf[pos:], uint16(fr.maxPage))
	pos += 2

	// released pages count
	binary.LittleEndian.PutUint16(buf[pos:], uint16(len(fr.releasedPages)))
	pos += 2

	for _, page := range fr.releasedPages {
		binary.LittleEndian.PutUint64(buf[pos:], uint64(page))
		pos += pageNumSize

	}
	return buf
}

func (fr *freelist) deserialize(buf []byte) {
	pos := 0
	fr.maxPage = pgnum(binary.LittleEndian.Uint16(buf[pos:]))
	pos += 2

	// released pages count
	releasedPagesCount := int(binary.LittleEndian.Uint16(buf[pos:]))
	pos += 2

	for i := 0; i < releasedPagesCount; i++ {
		fr.releasedPages = append(fr.releasedPages, pgnum(binary.LittleEndian.Uint64(buf[pos:])))
		pos += pageNumSize
	}
}

接下来,我们将在dal结构体中嵌入meta

type dal struct {
	file     *os.File
	pageSize int

	*meta
	*freelist
}

添加使用meta结构体内部的页面号码读取和写入freelist页面的方法。

func (d *dal) writeFreelist() (*page, error) {
	p := d.allocateEmptyPage()
	p.num = d.freelistPage
	d.freelist.serialize(p.data)

	err := d.writePage(p)
	if err != nil {
		return nil, err
	}
	d.freelistPage = p.num
	return p, nil
}

最后,更新dal构造函数以检查数据库是否存在。如果不存在,则创建新的meta文件,并初始化freelistmeta页面。如果存在,则读取meta页面以查找freelist页面号码并对其进行反序列化。

func newDal(path string) (*dal, error) {
	dal := &dal{
		meta:           newEmptyMeta(),
	}

	if _, err := os.Stat(path); err == nil {
		dal.file, err = os.OpenFile(path, os.O_RDWR|os.O_CREATE, 0666)
		if err != nil {
			_ = dal.close()
			return nil, err
		}

		meta, err := dal.readMeta()
		if err != nil {
			return nil, err
		}
		dal.meta = meta

		freelist, err := dal.readFreelist()
		if err != nil {
			return nil, err
		}
		dal.freelist = freelist
	} else if errors.Is(err, os.ErrNotExist) {
		dal.file, err = os.OpenFile(path, os.O_RDWR|os.O_CREATE, 0666)
		if err != nil {
			_ = dal.close()
			return nil, err
		}

		dal.freelist = newFreelist()
		dal.freelistPage = dal.getNextPage()
		_, err := dal.writeFreelist()
		if err != nil {
			return nil, err
		}
		_, err = dal.writeMeta(dal.meta)
	} else {
		return nil, err
	}
	return dal, nil
}

第2章结论

好的!我们已经完成了持久化。我们可以通过创建一个新数据库,关闭它,然后再次打开它来进行测试。

package main

func main() {
	// initialize db
	dal, _ := newDal("db.db")

	// create a new page
	p := dal.allocateEmptyPage()
	p.num = dal.getNextPage()
	copy(p.data[:], "data")

	// commit it
	_ = dal.writePage(p)
	_, _ = dal.writeFreelist()

	// Close the db
	_ = dal.close()

	// We expect the freelist state was saved, so we write to
	// page number 3 and not overwrite the one at number 2
	dal, _ = newDal("db.db")
	p = dal.allocateEmptyPage()
	p.num = dal.getNextPage()
	copy(p.data[:], "data2")
	_ = dal.writePage(p)

	// Create a page and free it so the released pages will be updated
	pageNum := dal.getNextPage()
	dal.releasePage(pageNum)

	// commit it
	_, _ = dal.writeFreelist()
}

请注意页面0上的元页面(从第1行开始),其中包含freelist页面的编号1。

下面是freelist(第三行)。我们有4,1,4。第一个4表示最大分配的页面编号。1表示已释放页面列表的长度,最后一个4表示唯一释放的页面。

最后,我们有我们的数据,并且如你在右侧栏中看到的那样,在数据库重新启动后它表现正确。

在下一篇文章中,我们将开始探索B-Tree,这是数据库的主要特色。我们将花费几篇文章来完全介绍它。我们将讨论它们如何帮助我们组织数据以及如何在磁盘上存储它们。

你可以在此处检查本部分代码的完整版本。

第3章

在过去的两章中,我们完成了将数据库持久化到磁盘上。这意味着我们的数据库可以在重启之间恢复其先前的状态。它还支持将页面读取和写入磁盘,但我们必须注意数据的记账。换句话说,当从数据库中获取项目时,我们必须记住它插入的位置。

今天,我们将讨论B-Tree,这是数据库的主要特色。我们将看到它们如何帮助我们组织数据,以便用户不必跟踪磁盘上每个对象的位置。

二叉搜索树

二叉搜索树(也称为BST)是一种用于按顺序存储数据的数据结构。树中的每个节点由一个键,与该键关联的值和两个指向子节点的指针(因此名称为二进制)标识。不变式规定,左子节点必须小于其直接父节点,右子节点必须大于其直接父节点。

二叉搜索树

这样,我们可以通过查看每个节点的值并相应地下降轻松找到元素。如果搜索的值小于当前值,则下降到左子树;如果大于,则下降到右子树。

二叉搜索树允许快速查找、添加和删除数据项,因为BST中的节点被布置得使得每次比较都跳过剩余树的约一半。

动机

我们为什么需要树?一种简单的方法是创建一个文本文件并按顺序写入数据。写入很快,因为新数据附加到文件末尾。另一方面,读取特定项需要搜索整个文件。如果我们的文件变大,读取将需要很长时间。

显然,它不适用于许多现实应用程序。与前面的方法相比,BST在读取方面提供了更好的性能,但写入方面较慢。这种平衡的方法非常适合通用数据库。

另一个考虑因素是磁盘访问。它是昂贵的,因此在设计驻留在磁盘上的数据结构时必须将其最小化。

二叉树节点有2个子节点,因此例如具有2⁵=32个项将导致具有5个级别的树。在树中搜索项目将需要在最坏情况下下降5个级别,因此每个级别需要5个磁盘访问。 5并不多,但我们能做得更好吗?

B-Tree

B-Tree是二叉搜索树的一般化,允许具有多个子节点的节点。每个节点具有_k_到_2k_个键值对和_k+1_到2_k+1_个子指针,除根节点外最少有2个子节点。

通过在每个节点中具有更多的子节点,树的高度变小,因此需要更少的磁盘访问。

K-V表示键值对。P表示子指针

编码

我们将从创建一个名为node.go的新文件开始,为B-Tree节点定义新类型Node和表示键值对的Item

我们还将添加isLeafNode以及Item的构造函数。

节点页面结构

我们希望将Node内容存储在磁盘上。一个微不足道的方法是“键值-子指针”三元组的串联,但它有一个重大的缺点。

type Item struct {
	key   []byte
	value []byte
}

type Node struct {
	*dal

	pageNum    pgnum
	items      []*Item
	childNodes []pgnum
}

func NewEmptyNode() *Node {
	return &Node{}
}

func newItem(key []byte, value []byte) *Item {
	return &Item{
		key:   key,
		value: value,
	}
}

func (n *Node) isLeaf() bool {
	return len(n.childNodes) == 0
}

项包含不同大小的键和值。这意味着迭代键值对变成了一个非平凡的任务,因为我们不知道每次迭代光标应该向前移动多少字节。

页面由p表示,键由k表示,值由v表示

为了解决这个问题,我们将使用一种称为“槽位页面”的技术。页面分为两个内存区域。页面末尾是键和值,而开头是记录的固定大小偏移量。我们还将添加一个包含页面记录数的标头,以及指示页面是否为叶子节点的标志(因为叶子节点没有子指针)。

这种设计允许以最小的努力对页面进行更改。通过对单元格偏移进行排序来保持逻辑顺序,因此不需要重新定位数据。

每个节点都序列化为单个页面,但重要的是要理解两者之间的区别。

Node仅包含键值对,并且在较高级别上使用。另一方面,Page在存储级别上工作,并保存任何类型的数据,只要它的大小为4KB即可。它可以包含以 slotted page 结构组织的键值对,但也可以包含以不那么严格的形式排列的 freelistmeta

节点持久性

与前几章一样,我们将添加数据库节点的序列化和反序列化。这些方法将数据从/转换为带槽页面格式。

实现很长而且技术性强,上述细节没有什么新意。更好地关注上述通用设计而不是实现本身(尽管我鼓励你阅读代码)。

func (n *Node) serialize(buf []byte) []byte {
	leftPos := 0
	rightPos := len(buf) - 1

	// Add page header: isLeaf, key-value pairs count, node num
	// isLeaf
	isLeaf := n.isLeaf()
	var bitSetVar uint64
	if isLeaf {
		bitSetVar = 1
	}
	buf[leftPos] = byte(bitSetVar)
	leftPos += 1

	// key-value pairs count
	binary.LittleEndian.PutUint16(buf[leftPos:], uint16(len(n.items)))
	leftPos += 2

	// We use slotted pages for storing data in the page. It means the actual keys and values (the cells) are appended
	// to right of the page whereas offsets have a fixed size and are appended from the left.
	// It's easier to preserve the logical order (alphabetical in the case of b-tree) using the metadata and performing
	// pointer arithmetic. Using the data itself is harder as it varies by size.

	// Page structure is:
	// ----------------------------------------------------------------------------------
	// |  Page  | key-value /  child node    key-value 		      |    key-value		 |
	// | Header |   offset /	 pointer	  offset         .... |      data      ..... |
	// ----------------------------------------------------------------------------------

	for i := 0; i < len(n.items); i++ {
		item := n.items[i]
		if !isLeaf {
			childNode := n.childNodes[i]

			// Write the child page as a fixed size of 8 bytes
			binary.LittleEndian.PutUint64(buf[leftPos:], uint64(childNode))
			leftPos += pageNumSize
		}

		klen := len(item.key)
		vlen := len(item.value)

		// write offset
		offset := rightPos - klen - vlen - 2
		binary.LittleEndian.PutUint16(buf[leftPos:], uint16(offset))
		leftPos += 2

		rightPos -= vlen
		copy(buf[rightPos:], item.value)

		rightPos -= 1
		buf[rightPos] = byte(vlen)

		rightPos -= klen
		copy(buf[rightPos:], item.key)

		rightPos -= 1
		buf[rightPos] = byte(klen)
	}

	if !isLeaf {
		// Write the last child node
		lastChildNode := n.childNodes[len(n.childNodes)-1]
		// Write the child page as a fixed size of 8 bytes
		binary.LittleEndian.PutUint64(buf[leftPos:], uint64(lastChildNode))
	}

	return buf
}
func (n *Node) deserialize(buf []byte) {
	leftPos := 0

	// Read header
	isLeaf := uint16(buf[0])

	itemsCount := int(binary.LittleEndian.Uint16(buf[1:3]))
	leftPos += 3

	// Read body
	for i := 0; i < itemsCount; i++ {
		if isLeaf == 0 { // False
			pageNum := binary.LittleEndian.Uint64(buf[leftPos:])
			leftPos += pageNumSize

			n.childNodes = append(n.childNodes, pgnum(pageNum))
		}

		// Read offset
		offset := binary.LittleEndian.Uint16(buf[leftPos:])
		leftPos += 2

		klen := uint16(buf[int(offset)])
		offset += 1

		key := buf[offset : offset+klen]
		offset += klen

		vlen := uint16(buf[int(offset)])
		offset += 1

		value := buf[offset : offset+vlen]
		offset += vlen
		n.items = append(n.items, newItem(key, value))
	}

	if isLeaf == 0 { // False
		// Read the last child node
		pageNum := pgnum(binary.LittleEndian.Uint64(buf[leftPos:]))
		n.childNodes = append(n.childNodes, pageNum)
	}
}

我们还将添加 getNodewriteNodedeleteNode 以返回节点。前两种方法返回 node 结构本身,因此与我们在 freelistmeta 中所做的相同。

func (d *dal) getNode(pageNum pgnum) (*Node, error) {
	p, err := d.readPage(pageNum)
	if err != nil {
		return nil, err
	}
	node := NewEmptyNode()
	node.deserialize(p.data)
	node.pageNum = pageNum
	return node, nil
}

func (d *dal) writeNode(n *Node) (*Node, error) {
	p := d.allocateEmptyPage()
	if n.pageNum == 0 {
		p.num = d.getNextPage()
		n.pageNum = p.num
	} else {
		p.num = n.pageNum
	}

	p.data = n.serialize(p.data)

	err := d.writePage(p)
	if err != nil {
		return nil, err
	}
	return n, nil
}

func (d *dal) deleteNode(pageNum pgnum) {
	d.releasePage(pageNum)
}

deleteNodefreelist 发送信号,表示可以删除节点,因此可以将其页面 ID 标记为已释放。

搜索、插入和删除

在实现搜索、插入和删除算法之前,安全地假设我们将多次调用接收和写入节点的函数。我们将用自己的层包装 dal 的调用,在以后的部分中这将很方便。

func (n *Node) writeNode(node *Node) *Node {
	return n.dal.writeNode(node)
}

func (n *Node) writeNodes(nodes ...*Node) {
	for _, node := range nodes {
		n.writeNode(node)
	}
}

func (n *Node) getNode(pageNum pgnum) (*Node, error) {
	return n.dal.getNode(pageNum)
}

搜索

使用二分搜索来搜索 B-Tree。算法从根节点开始遍历所有键,直到找到第一个大于搜索值的键。然后,我们使用相应的子指针向下遍历相关子树,并重复此过程,直到找到该值。

在示例中,提供了20。在第一层中,没有键与20完全匹配,因此算法进入正确的范围-大于13的所有数字。在第二层中的最右边的节点上重复此过程。

findKeyInNode 函数在节点内搜索键,如上图所示。它将搜索键与每个级别中的键进行比较。如果找到匹配项,则该函数返回。如果该键不存在,则返回正确的范围。

// findKeyInNode iterates all the items and finds the key. If the key is found, then the item is returned. If the key
// isn't found then return the index where it should have been (the first index that key is greater than it's previous)
func (n *Node) findKeyInNode(key []byte) (bool, int) {
	for i, existingItem := range n.items {
		res := bytes.Compare(existingItem.key, key)
		if res == 0 { // Keys match
			return true, i
		}

    		// The key is bigger than the previous key, so it doesn't exist in the node, but may exist in child nodes.
		if res == 1 { 
			return false, i
		}
	}
  
  	// The key isn't bigger than any of the keys which means it's in the last index.
	return false, len(n.items)
}

现在我们想要遍历整个树,并在搜索每个节点时使用前一个函数。我们将执行类似于二进制搜索的搜索。唯一的区别是搜索使用许多子节点,而不是像上面的图像中描述的两个。

findKey 函数是 findKeyHelper 的包装器,其中执行了重要的操作。

// findKey searches for a key inside the tree. Once the key is found, the parent node and the correct index are returned
// so the key itself can be accessed in the following way parent[index].
// If the key isn't found, a falsey answer is returned.
func (n *Node) findKey(key []byte) (int, *Node ,error) {
	index, node, err := findKeyHelper(n, key)
	if err != nil {
		return -1, nil, err
	}
	return index, node, nil
}

func findKeyHelper(node *Node, key []byte) (int, *Node ,error) {
	// Search for the key inside the node
	wasFound, index := node.findKeyInNode(key)
	if wasFound {
		return index, node, nil
	}

	// If we reached a leaf node and the key wasn't found, it means it doesn't exist.
	if node.isLeaf() {
		return -1, nil, nil
	}

	// Else keep searching the tree
	nextChild, err := node.getNode(node.childNodes[index])
	if err != nil {
		return -1, nil, err
	}
	return findKeyHelper(nextChild, key)
}

如上所述,搜索是从给定节点(通常是根节点)递归进行的。每个节点都可以从其父节点到达,但由于根节点没有父节点,因此我们通过在元页面中保存其页面编号来到达它。

root 属性已添加到元结构中,并且 serializedeserialize 也已处理此字段。

// meta is the meta page of the db
type meta struct {
	root         pgnum
	freelistPage pgnum
}

func (m *meta) serialize(buf []byte) {
	pos := 0

	binary.LittleEndian.PutUint64(buf[pos:], uint64(m.root))
	pos += pageNumSize

	binary.LittleEndian.PutUint64(buf[pos:], uint64(m.freelistPage))
	pos += pageNumSize
}

func (m *meta) deserialize(buf []byte) {
	pos := 0

	m.root = pgnum(binary.LittleEndian.Uint64(buf[pos:]))
	pos += pageNumSize

	m.freelistPage = pgnum(binary.LittleEndian.Uint64(buf[pos:]))
	pos += pageNumSize
}

第3章结论

让我们测试一下我们的搜索方法。我们的 DB 在 B-Tree 中执行搜索,因此它期望数据库页面按 B-Tree 格式排列。

我们还没有办法创建一个,因此我添加了一个名为 mainTest 的模拟文件,它已经构建了这种方式。该文件包含一个键值对。键是“Key1”,值为“Value1”。

package main

import "fmt"

func main() {
	// initialize db
	dal, _ := newDal("Part 3/mainTest")

	node, _ := dal.getNode(dal.root)
	node.dal = dal
	index, containingNode, _ := node.findKey([]byte("Key1"))
	res := containingNode.items[index]

	fmt.Printf("key is: %s, value is: %s", res.key, res.value)
	// Close the db
	_ = dal.close()
}

在接下来的两章中,我们将实现从数据库中写入和删除。你可以在这里找到本部分的代码。

第4章

在上一章中,我们谈到了 B-Tree。它们是二叉搜索树的推广,允许具有多个子节点的节点。通过最小化树高,从而减少磁盘访问。

我们还谈到了槽页面。一种通过在开头定位固定大小的偏移量并在末尾定位实际数据来组织不同大小的键值对的布局。

最后,我们开始在 B-Tree 上实现基本操作:搜索、插入、删除。在上一章中,我们实现了搜索操作。我们将在本章中执行插入操作。

问题

插入新项可能会导致节点具有过多项。例如,当所有新对都插入到同一个节点中时。

每次访问节点时,它会使操作变慢,因为我们要遍历更多的项。除此之外,它还会导致更严重的问题。节点被转换为固定大小的 数据库页面,因此每个节点中的键值对数量都有限制,这取决于键值对的总大小。

删除也遇到类似的问题。从同一个节点中删除键值对可能会使节点收缩,因此其中包含非常少的键值对。此时,最好将这些对重新分配到不同的节点中。

要知道节点是否已达到最大或低占用状态,我们必须首先定义它。我们将添加上限和下限阈值,即页面大小的百分比。

在代码中,我们引入了 Options,它将由用户初始化并在我们的 DB init 函数中使用。

type Options struct {
	pageSize int

	MinFillPercent float32
	MaxFillPercent float32
}

var DefaultOptions = &Options{
	MinFillPercent: 0.5,
	MaxFillPercent: 0.95,
}

type dal struct {
	pageSize       int
	minFillPercent float32
	maxFillPercent float32
	file *os.File

	*meta
	*freelist
}
func newDal(path string, options *Options) (*dal, error) {
	dal := &dal{
		meta:           newEmptyMeta(),
		pageSize:       options.pageSize,
		minFillPercent: options.MinFillPercent,
		maxFillPercent: options.MaxFillPercent,
    
		    .
		    .  (Rest of the function)
		    .
	}

我们还将添加新方法以dal计算node节点大小(以字节为单位)并将其与阈值进行比较。elementSize计算给定索引处单个键值子三元组的大小。nodeSize将节点标头大小以及所有键、值和子节点的大小相加。


// elementSize returns the size of a key-value-childNode triplet at a given index.
// If the node is a leaf, then the size of a key-value pair is returned. 
// It's assumed i <= len(n.items)
func (n *Node) elementSize(i int) int {
	size := 0
	size += len(n.items[i].key)
	size += len(n.items[i].value)
	size += pageNumSize // 8 is the pgnum size
	return size
}

// nodeSize returns the node's size in bytes
func (n *Node) nodeSize() int {
	size := 0
	size += nodeHeaderSize

	for i := range n.items {
		size += n.elementSize(i)
	}

	// Add last page
	size += pageNumSize // 8 is the pgnum size
	return size
}

解决方案

当树中的一个节点承载了过多的键值对时,解决方案是将其拆分成两个节点。

拆分是通过分配一个新节点,将一半的键值对转移到新节点,并将其第一个键和指向该节点的指针添加到父节点中完成的。如果它是非叶节点,则在拆分点之后的所有子指针也将被移动。

第一个键和相应的指针被移动到父节点,从而增加了它的占用。该过程可能会导致溢出,因此必须递归地向上重复平衡。

插入 4 和 7 时进行的拆分。图片来自维基百科。

插入

插入项的算法如下:

  • 搜索树以找到应该插入新元素的叶节点并将其添加。
  • 如果节点超过最大容量,则按上述方式重新平衡树:a. 找到分裂索引b. 将小于分隔值的值留在现有节点中,将大于分隔值的值放入新创建的右节点中。c. 将分隔值插入节点的父节点。
  • 从插入节点到根重复步骤 2。

辅助方法

我们将从算法的第 1 步开始。addItem 方法在给定的索引处添加键值对,然后移动所有的键值对和子节点。这个简单的方法可以很容易地与标准库一起使用。

func (n *Node) addItem(item *Item, insertionIndex int) int {
	if len(n.items) == insertionIndex { // nil or empty slice or after last element
		n.items = append(n.items, item)
		return insertionIndex
	}

	n.items = append(n.items[:insertionIndex+1], n.items[insertionIndex:]...)
	n.items[insertionIndex] = item
	return insertionIndex
}

对于步骤 2,我们将添加四种方法来确定一个节点是否达到了最大容量。

func (d *dal) maxThreshold() float32 {
	return d.maxFillPercent * float32(d.pageSize)
}

func (d *dal) isOverPopulated(node *Node) bool {
	return float32(node.nodeSize()) > d.maxThreshold()
}

func (d *dal) minThreshold() float32 {
	return d.minFillPercent * float32(d.pageSize)
}

func (d *dal) isUnderPopulated(node *Node) bool {
	return float32(node.nodeSize()) < d.minThreshold()
}

我们将为解耦添加相应的 node 方法,这将在以后有所帮助。

// isOverPopulated checks if the node size is bigger than the size of a page.
func (n *Node) isOverPopulated() bool {
	return n.dal.isOverPopulated(n)
}

// isUnderPopulated checks if the node size is smaller than the size of a page.
func (n *Node) isUnderPopulated() bool {
	return n.dal.isUnderPopulated(n)
}

现在我们知道了是否必须进行重新平衡,我们将编写重新平衡本身。重点是 2.a,getSplitIndex 根据元素大小和最小节点大小计算节点应该拆分的索引。它将以字节为单位计算键值对的大小。一旦累积了足够的量,就会返回索引。

// getSplitIndex should be called when performing rebalance after an item is removed. It checks if a node can spare an
// element, and if it does then it returns the index when there the split should happen. Otherwise -1 is returned.
func (d *dal) getSplitIndex(node *Node) int {
	size := 0
	size += nodeHeaderSize

	for i := range node.items {
		size += node.elementSize(i)

		// if we have a big enough page size (more than minimum), and didn't reach the last node, which means we can
		// spare an element
		if float32(size) > d.minThreshold() && i < len(node.items) - 1 {
			return i + 1
		}
	}

	return -1
}

最后,我们将添加 split 来处理步骤 2.b 和 2.c。它在一个节点上被调用,并接收其子节点和 childNodes 列表中的索引。

如前所述,首先确定拆分索引。拆分索引左侧的对不受影响,而右侧的对将移动到新节点。如果存在子指针,则第二半部分也将被移动。

将拆分索引中的键值对移动到父节点,并在父节点的正确位置添加指向新创建节点的指针。

// split rebalances the tree after adding. After insertion the modified node has to be checked to make sure it
// didn't exceed the maximum number of elements. If it did, then it has to be split and rebalanced. The transformation
// is depicted in the graph below. If it's not a leaf node, then the children has to be moved as well as shown.
// This may leave the parent unbalanced by having too many items so rebalancing has to be checked for all the ancestors.
// The split is performed in a for loop to support splitting a node more than once. (Though in practice used only once).
// 	           n                                        n
//                 3                                       3,6
//	      /        \           ------>       /          |          \
//	   a           modifiedNode            a       modifiedNode     newNode
//   1,2                 4,5,6,7,8            1,2          4,5         7,8
func (n *Node) split(nodeToSplit *Node, nodeToSplitIndex int) {
	// The first index where min amount of bytes to populate a page is achieved. Then add 1 so it will be split one
	// index after.
	splitIndex := nodeToSplit.dal.getSplitIndex(nodeToSplit)

	middleItem := nodeToSplit.items[splitIndex]
	var newNode *Node

	if nodeToSplit.isLeaf() {
		newNode, _ = n.writeNode(n.dal.newNode(nodeToSplit.items[splitIndex+1:], []pgnum{}))
		nodeToSplit.items = nodeToSplit.items[:splitIndex]
	} else {
		newNode, _ = n.writeNode(n.dal.newNode(nodeToSplit.items[splitIndex+1:], nodeToSplit.childNodes[splitIndex+1:]))
		nodeToSplit.items = nodeToSplit.items[:splitIndex]
		nodeToSplit.childNodes = nodeToSplit.childNodes[:splitIndex+1]
	}
	n.addItem(middleItem, nodeToSplitIndex)
	if len(n.childNodes) == nodeToSplitIndex+1 { // If middle of list, then move items forward
		n.childNodes = append(n.childNodes, newNode.pageNum)
	} else {
		n.childNodes = append(n.childNodes[:nodeToSplitIndex+1], n.childNodes[nodeToSplitIndex:]...)
		n.childNodes[nodeToSplitIndex+1] = newNode.pageNum
	}

	n.writeNodes(n, nodeToSplit)
}

将所有方法组合在一起

在将上述方法组合成插入函数之前,我们将介绍一个“集合”的概念。一个“集合”是键值对的分组。一个“集合”相当于一个 RDBMS 表。集合中的键值对具有相似的目的。

我们将在以后的文章中回到集合的概念。现在,我们将添加一个名为 Collection 的结构和 FindPut 方法。

集合本身就是 B-树。因此,每个集合都有一个名称和一个标记树根节点的根页面。

type Collection struct {
	name []byte
	root pgnum

	dal *dal
}

func newCollection(name []byte, root pgnum) *Collection {
	return &Collection{
		name: name,
		root: root,
	}
}

Find 方法加载集合根节点并在该节点上执行 findKey 来定位键。如果没有找到键,则返回 nil

// Find Returns an item according based on the given key by performing a binary search.
func (c *Collection) Find(key []byte) (*Item, error) {
	n, err := c.dal.getNode(c.root)
	if err != nil {
		return nil, err
	}

	index, containingNode, _, err := n.findKey(key, true)
	if err != nil {
		return nil, err
	}
	if index == -1 {
		return nil, nil
	}
	return containingNode.items[index], nil
}

Put 将我们在本章中所做的所有工作加起来。它从用户那里接收一个键值对,并将其插入到集合的 B-树 中。

在第 9-23 行,该方法检查是否已经初始化了根节点或者是第一次插入,因此必须创建一个新的根节点。

在第 26-36 行,我们搜索插入应该发生的树。findKey 返回节点和插入应该发生的索引。如果需要重新平衡树,则它还返回路径中的节点(作为索引,每个节点都可以从其父 childNodes 数组中检索)。

在第 31-41 行,我们执行实际的插入。我们首先检查键是否已经存在。如果是,我们只使用新值覆盖它。如果没有,则插入它。由于 B-树的工作方式,新项目只能添加到叶节点中。这意味着我们不需要关心子节点。

最后,在第 49-56 和 59-71 行,如果需要,我们进行重新平衡。第一个块重新平衡插入路径,但不包括根。第二个块重新平衡根并更新集合的根指针。

// Put adds a key to the tree. It finds the correct node and the insertion index and adds the item. When performing the
// search, the ancestors are returned as well. This way we can iterate over them to check which nodes were modified and
// rebalance by splitting them accordingly. If the root has too many items, then a new root of a new layer is
// created and the created nodes from the split are added as children.
func (c *Collection) Put(key []byte, value []byte) error {
	i := newItem(key, value)

	// On first insertion the root node does not exist, so it should be created
	var root *Node
	var err error
	if c.root == 0 {
		root, err = c.dal.writeNode(c.dal.newNode([]*Item{i}, []pgnum{}))
		if err != nil {
			return nil
		}
		c.root = root.pageNum
		return nil
	} else {
		root, err = c.dal.getNode(c.root)
		if err != nil {
			return err
		}
	}

	// Find the path to the node where the insertion should happen
	insertionIndex, nodeToInsertIn, ancestorsIndexes, err := root.findKey(i.key, false)
	if err != nil {
		return err
	}

	// If key already exists
	if nodeToInsertIn.items != nil && bytes.Compare(nodeToInsertIn.items[insertionIndex].key, key) == 0 {
		nodeToInsertIn.items[insertionIndex] = i
	} else {
		// Add item to the leaf node
		nodeToInsertIn.addItem(i, insertionIndex)
	}
	_, err  = c.dal.writeNode(nodeToInsertIn)
	if err != nil {
		return err
	}

	ancestors, err := c.getNodes(ancestorsIndexes)
	if err != nil {
		return err
	}

	// Rebalance the nodes all the way up. Start From one node before the last and go all the way up. Exclude root.
	for i := len(ancestors) - 2; i >= 0; i-- {
		pnode := ancestors[i]
		node := ancestors[i+1]
		nodeIndex := ancestorsIndexes[i+1]
		if node.isOverPopulated() {
			pnode.split(node, nodeIndex)
		}
	}

	// Handle root
	rootNode := ancestors[0]
	if rootNode.isOverPopulated() {
		newRoot := c.dal.newNode([]*Item{}, []pgnum{rootNode.pageNum})
		newRoot.split(rootNode, 0)

		// commit newly created root
		newRoot, err = c.dal.writeNode(newRoot)
		if err != nil {
			return err
		}

		c.root = newRoot.pageNum
	}

	return nil
}

第四章结论

本部分侧重于向数据库插入数据。现在,我们可以像用户一样插入和查找数据库中的项。

在下面的代码中,我们创建了一个新的数据库,其中 MinFillPercentMaxFillPercent 会在节点具有六个或更多项时导致节点拆分。我们创建了一个新的集合,将 DB 根页面指定为集合根页面。最后,我们模拟了一个实际的用户操作,向集合中添加新项并搜索这些项。

package main

import (
	"fmt"
	"os"
)

func main() {
	options := &Options{
		pageSize: os.Getpagesize(),
		MinFillPercent: 0.0125,
		MaxFillPercent: 0.025,
	}
	dal, _ := newDal("./mainTest", options)

	c := newCollection([]byte("collection1"), dal.root)
	c.dal = dal

	_ = c.Put([]byte("Key1"), []byte("Value1"))
	_ = c.Put([]byte("Key2"), []byte("Value2"))
	_ = c.Put([]byte("Key3"), []byte("Value3"))
	_ = c.Put([]byte("Key4"), []byte("Value4"))
	_ = c.Put([]byte("Key5"), []byte("Value5"))
	_ = c.Put([]byte("Key6"), []byte("Value6"))
	item, _ := c.Find([]byte("Key1"))

	fmt.Printf("key is: %s, value is: %s\n", item.key, item.value)
	_ = dal.close()
}

在文件上运行 hexdump,我们可以观察到已发生拆分。我们有一个包含 4 的根节点。然后我们有两个子节点,一个带有键 1-3,另一个带有键 5-6。

另外,由于节点被设计为插槽页面,因此观察每个页面上的上下地址以及它们之间的空间。

在下一部分中,我们将重点关注删除。它将比本部分少一些函数,因为我们稍后将使用本部分添加的许多函数。

你可以在这里查看本部分的代码。# 第五章

在前两部分中,我们讨论了B-树并实现了查找和插入方法。在这一部分中,我们将实现删除操作并完成对B-树的讨论。

删除

删除项的算法如下:

  • 定位要删除的项。
  • 删除该项:a如果该值在叶节点中,只需从节点中删除即可。b否则(该值在内部节点中),选择一个新的分隔符——左子树中最大的元素,在叶节点中将其删除,并用新的分隔符替换要删除的元素。
  • 从叶节点或选择分隔符的节点开始重新平衡树,根据上面的情况。

删除实现

我们将重点关注第2点。

我们需要处理两种情况。第一种(2.a)是从叶节点中删除节点。这种情况很简单,因为我们只需要从节点中删除它。

// removeItemFromLeaf removes an item from a leaf node. It means there is no handling of child nodes.
func (n *Node) removeItemFromLeaf(index int) {
	n.items = append(n.items[:index], n.items[index+1:]...)
	n.writeNode(n)
}

另一方面,如果我们遇到一个内部节点(2.b),我们需要找到其前驱并使其成为新的分隔符。在第15行,我们选择左子树并在第20-27行下降到最右边的子节点,从而得到左子树中最大的元素的节点。

在第30-32行,我们用其前驱替换要删除的项-左子树中最右侧的一对,并将其从它所在的叶节点中删除。

func (n *Node) removeItemFromInternal(index int) ([]int, error) {
	// Take element before inorder (The biggest element from the left branch), put it in the removed index and remove
	// it from the original node. Track in affectedNodes any nodes in the path leading to that node. It will be used
	// in case the tree needs to be rebalanced.
	//          p
	//       /
	//     ..
	//  /     \
	// ..      a

	affectedNodes := make([]int, 0)
	affectedNodes = append(affectedNodes, index)

	// Starting from its left child, descend to the rightmost descendant.
	aNode, err := n.getNode(n.childNodes[index])
	if err != nil {
		return nil, err
	}
	
	for !aNode.isLeaf() {
		traversingIndex := len(n.childNodes) - 1
		aNode, err = aNode.getNode(aNode.childNodes[traversingIndex])
		if err != nil {
			return nil, err
		}
		affectedNodes = append(affectedNodes, traversingIndex)
	}

	// Replace the item that should be removed with the item before inorder which we just found.
	n.items[index] = aNode.items[len(aNode.items)-1]
	aNode.items = aNode.items[:len(aNode.items)-1]
	n.writeNodes(n, aNode)

	return affectedNodes, nil
}

重新平衡

我们将重点关注第3点。

将新项输入节点时,可能会导致节点中的一个节点溢出,从而导致树失衡。删除可能会导致下溢,因此我们必须立即重新平衡树。

与插入不同,重新平衡仅通过拆分节点来完成,删除操作后的重新平衡可以是合并节点或旋转节点。

旋转可以向右或向左进行,具体取决于哪个节点有多余的项。例如,右旋是通过将分隔键从父节点复制到下溢节点的开头并用其左兄弟的最后一个元素替换分隔键来完成的。旋转非叶节点时,将左兄弟的最后一个子指针移动到不平衡节点的开头。执行左旋时采用类似的方法。

在下面的示例中,删除15会导致右子节点具有过少的项。这导致右旋。

右旋

如果没有兄弟可以节约元素,则必须将不足的节点与兄弟合并。合并会导致父项失去分隔符元素,因此父项可能会变得不足并需要重新平衡。重新平衡可能会一直持续到根。

在这种情况下,删除15。不能执行右旋,因为它会使左子节点处于赤字状态。因此,解决方案是将节点合并为单个节点。

合并

与上一章类似,我们将开始添加一些辅助函数,并将它们组合成单个函数在collection结构下。

我们的前两个函数是右旋和左旋。这两个函数几乎相同,因此让我们检查向右旋转。

如注释所述,旋转是按照以下方式完成的。给定节点A,我们删除其最后一个项。我们用父节点中正确索引处的项替换它,并将原始父项放在B节点中。

正确的索引是父节点中这两个节点之间的项的索引。如果存在任何子指针,则它们也将移动。

两个旋转的代码类似。区别在于受影响的节点和移动的键值对。

func rotateRight(aNode, pNode, bNode *Node, bNodeIndex int) {
	// 	           p                                    p
	//                 4                                    3
	//	      /        \           ------>         /          \
	//	   a           b (unbalanced)            a        b (unbalanced)
	//      1,2,3             5                     1,2            4,5

	// Get last item and remove it
	aNodeItem := aNode.items[len(aNode.items)-1]
	aNode.items = aNode.items[:len(aNode.items)-1]

	// Get item from parent node and assign the aNodeItem item instead
	pNodeItemIndex := bNodeIndex - 1
	if isFirst(bNodeIndex) {
		pNodeItemIndex = 0
	}
	pNodeItem := pNode.items[pNodeItemIndex]
	pNode.items[pNodeItemIndex] = aNodeItem

	// Assign parent item to b and make it first
	bNode.items = append([]*Item{pNodeItem}, bNode.items...)

	// If it's an inner leaf then move children as well.
	if !aNode.isLeaf() {
		childNodeToShift := aNode.childNodes[len(aNode.childNodes)-1]
		aNode.childNodes = aNode.childNodes[:len(aNode.childNodes)-1]
		bNode.childNodes = append([]pgnum{childNodeToShift}, bNode.childNodes...)
	}
}

func rotateLeft(aNode, pNode, bNode *Node, bNodeIndex int) {
	// 	           p                                     p
	//                 2                                     3
	//	      /        \           ------>         /          \
	//  a(unbalanced)       b                 a(unbalanced)        b
	//   1                3,4,5                   1,2             4,5

	// Get first item and remove it
	bNodeItem := bNode.items[0]
	bNode.items = bNode.items[1:]

	// Get item from parent node and assign the bNodeItem item instead
	pNodeItemIndex := bNodeIndex
	if isLast(bNodeIndex, pNode) {
		pNodeItemIndex = len(pNode.items) - 1
	}
	pNodeItem := pNode.items[pNodeItemIndex]
	pNode.items[pNodeItemIndex] = bNodeItem

	// Assign parent item to a and make it last
	aNode.items = append(aNode.items, pNodeItem)

	// If it's an inner leaf then move children as well.
	if !bNode.isLeaf() {
		childNodeToShift := bNode.childNodes[0]
		bNode.childNodes = bNode.childNodes[1:]
		aNode.childNodes = append(aNode.childNodes, childNodeToShift)
	}
}

接下来是merge。合并函数接收一个节点及其索引。然后,它将该节点与其左兄弟合并,转移其键值对和子指针。函数完成后,它会删除该节点。

func (n *Node) merge(bNode *Node, bNodeIndex int) error {
	// 	               p                                     p
	//                    3,5                                    5
	//	      /        |       \       ------>         /          \
	//          a          b        c                     a            c
	//         1,2         4        6,7                 1,2,3,4         6,7
	aNode, err := n.getNode(n.childNodes[bNodeIndex-1])

	// Take the item from the parent, remove it and add it to the unbalanced node
	pNodeItem := n.items[bNodeIndex-1]
	n.items = append(n.items[:bNodeIndex-1], n.items[bNodeIndex:]...)
	aNode.items = append(aNode.items, pNodeItem)

	aNode.items = append(aNode.items, bNode.items...)
	n.childNodes = append(n.childNodes[:bNodeIndex], n.childNodes[bNodeIndex+1:]...)
	if !aNode.isLeaf() {
		aNode.childNodes = append(aNode.childNodes, bNode.childNodes...)
	}

	n.writeNodes(aNode, n)
	n.dal.deleteNode(bNode.pageNum)
	return nil
}

最后,我们将编写reblanceRemove,以确定树在失衡时应该应用哪个算法。

在每个旋转(第9-19行和第22-32行)中,我们确保不平衡节点不在边缘,因此它具有左/右兄弟。我们还确保兄弟可以节省元素,这意味着在操作之后它的兄弟将不会下溢。如果两个条件都成立,则选择旋转。

如果两个条件都不成立,则执行合并(第36-45行)。合并函数将一个节点与其左兄弟合并。如果我们在最左边的兄弟上,则使用右边的一个节点,因此在第36行上有一个“if”条件。

// rebalanceRemove rebalances the tree after a remove operation. This can be either by rotating to the right, to the
// left or by merging. First, the sibling nodes are checked to see if they have enough items for rebalancing
// (>= minItems+1). If they don't have enough items, then merging with one of the sibling nodes occurs. This may leave
// the parent unbalanced by having too little items so rebalancing has to be checked for all the ancestors.
func (n *Node) rebalanceRemove(unbalancedNode *Node, unbalancedNodeIndex int) error {
	pNode := n

	// Right rotate
	if unbalancedNodeIndex != 0 {
		leftNode, err := n.getNode(pNode.childNodes[unbalancedNodeIndex-1])
		if err != nil {
			return err
		}
		if leftNode.canSpareAnElement() {
			rotateRight(leftNode, pNode, unbalancedNode, unbalancedNodeIndex)
			n.writeNodes(leftNode, pNode, unbalancedNode)
			return nil
		}
	}

	// Left Balance
	if unbalancedNodeIndex != len(pNode.childNodes)-1 {
		rightNode, err := n.getNode(pNode.childNodes[unbalancedNodeIndex+1])
		if err != nil {
			return err
		}
		if rightNode.canSpareAnElement() {
			rotateLeft(unbalancedNode, pNode, rightNode, unbalancedNodeIndex)
			n.writeNodes(unbalancedNode, pNode, rightNode)
			return nil
		}
	}
	// The merge function merges a given node with its node to the right. So by default, we merge an unbalanced node
	// with its right sibling. In the case where the unbalanced node is the leftmost, we have to replace the merge
	// parameters, so the unbalanced node right sibling, will be merged into the unbalanced node.
	if unbalancedNodeIndex == 0 {
		rightNode, err := n.getNode(n.childNodes[unbalancedNodeIndex+1])
		if err != nil {
			return err
		}

		return pNode.merge(rightNode, unbalancedNodeIndex+1)
	}

	return pNode.merge(unbalancedNode, unbalancedNodeIndex)
}

把所有东西放在一起

我们将在集合中添加一个新方法Remove。在第8-11行中,加载根节点,然后在第13-16行中执行算法的第1步,定位要删除的项。如果不存在,则在第18-20行返回。

在第22-30行中,我们根据算法的第2步删除该项。我们扩展我们的祖先列表以跟踪可能受到叶节点删除影响的任何节点。

最后,在第32-47行中,我们按需要跟随ancestorsIndexes列表中可能受影响的节点并重新平衡它们。

最后,在第51-53行中,如果根节点为空(例如,它最初只有一个项,并且已被删除),则我们忽略它并将其子项分配为新根。

// Remove removes a key from the tree. It finds the correct node and the index to remove the item from and removes it.
// When performing the search, the ancestors are returned as well. This way we can iterate over them to check which
// nodes were modified and rebalance by rotating or merging the unbalanced nodes. Rotation is done first. If the
// siblings don't have enough items, then merging occurs. If the root is without items after a split, then the root is
// removed and the tree is one level shorter.
func (c *Collection) Remove(key []byte) error {
	// Find the path to the node where the deletion should happen
	rootNode, err := c.dal.getNode(c.root)
	if err != nil {
		return err
	}

	removeItemIndex, nodeToRemoveFrom, ancestorsIndexes, err := rootNode.findKey(key, true)
	if err != nil {
		return err
	}

	if removeItemIndex == -1 {
		return nil
	}

	if nodeToRemoveFrom.isLeaf() {
		nodeToRemoveFrom.removeItemFromLeaf(removeItemIndex)
	} else {
		affectedNodes, err := nodeToRemoveFrom.removeItemFromInternal(removeItemIndex)
		if err != nil {
			return err
		}
		ancestorsIndexes = append(ancestorsIndexes, affectedNodes...)
	}

	ancestors, err := c.getNodes(ancestorsIndexes)
	if err != nil {
		return err
	}

	// Rebalance the nodes all the way up. Start From one node before the last and go all the way up. Exclude root.
	for i := len(ancestors) - 2; i >= 0; i-- {
		pnode := ancestors[i]
		node := ancestors[i+1]
		if node.isUnderPopulated() {
			err = pnode.rebalanceRemove(node, ancestorsIndexes[i+1])
			if err != nil {
				return err
			}
		}
	}

	rootNode = ancestors[0]
	// If the root has no items after rebalancing, there's no need to save it because we ignore it.
	if len(rootNode.items) == 0 && len(rootNode.childNodes) > 0 {
		c.root = ancestors[1].pageNum
	}

	return nil
}

结论

在下面的代码中,我们通过第29行的删除操作扩展了我们从上一章的程序。

package main

import (
	"fmt"
	"os"
)

func main() {
	options := &Options{
		pageSize: os.Getpagesize(),
		MinFillPercent: 0.0125,
		MaxFillPercent: 0.025,
	}
	dal, _ := newDal("./mainTest", options)

	c := newCollection([]byte("collection1"), dal.root)
	c.dal = dal

	_ = c.Put([]byte("Key1"), []byte("Value1"))
	_ = c.Put([]byte("Key2"), []byte("Value2"))
	_ = c.Put([]byte("Key3"), []byte("Value3"))
	_ = c.Put([]byte("Key4"), []byte("Value4"))
	_ = c.Put([]byte("Key5"), []byte("Value5"))
	_ = c.Put([]byte("Key6"), []byte("Value6"))
	item, _ := c.Find([]byte("Key1"))

	fmt.Printf("key is: %s, value is: %s\n", item.key, item.value)

	_ = c.Remove([]byte("Key1"))
	item, _ = c.Find([]byte("Key1"))

	dal.writeFreelist()
	fmt.Printf("item is: %+v\n", item)
	_ = dal.close()
}

运行后,我们得到了预期的删除后的nil

请记住,在插入6之后发生了拆分。你可以在下面图像的左侧查看其状态。

删除键1导致合并

删除操作后,树将失衡。右侧兄弟只有两个不足以执行左旋的项。因此,选择合并平衡策略。合并操作后的树在右侧。运行hexdump命令,我们在第3页(从00002000开始)得到了预期的结果。所有的配对都在同一个节点上。

需要注意的是,过程并不会在此结束。请记住,在合并过程中,节点会被删除。该节点会被freelist标记为空闲。当更新后的freelist提交时,它会被写入到第5页,这是Insert操作之后5和6所在的位置(请查看上一部分结束时文件的状态)。

提交freelist很棘手,但是在一系列读/写操作之后进行其他操作是必要的。

在下一章中,我们将介绍数据库事务的概念。一个工作单元包括在数据库上执行的一系列操作。它们将处理在一系列操作开始和结束之前需要执行的工作等等。

第6章

在最近的三部分中,我们谈到了B树。我们实现了B-Tree方法,如Find、Insert和Delete。这些方法都接近磁盘级别,因为它们定义了应该在磁盘上保存的页面布局。

事务

今天,我们介绍事务的概念。事务是一个更高级别的概念。一个事务是在数据库上执行的一系列操作的工作单元。

例如,从帐户A转移50美元到B帐户包括:

  • 读取帐户A的余额。
  • 从该金额中扣除50美元,并将其写回到帐户A中。
  • 读取帐户B的余额。
  • 将50美元添加到该金额中,并将其写回到帐户B中。

所有这些操作都是单个事务。

ACID

ACID代表原子性、一致性、隔离性和持久性。这是数据库的一组属性,保证事务可靠地处理。

  • 原子性表示要么所有事务成功,要么所有事务失败。它确保数据库不会处于未定义状态。我们的数据库不是原子的,因为如果事务在中途失败(如果数据库崩溃),则部分事务已提交,部分事务未提交。我构建数据库时使其易于扩展以支持它,但这也涉及更多的代码,超出了我们数据库的范围。
  • 一致性确保写入数据库的数据在事务完成后必须根据所有定义的规则有效。例如,银行中帐户的余额必须为正数。
  • 隔离性是一种事务如何隔离于其他事务的程度的度量。换句话说,事务对其他事务的影响程度。我将在下一节中更加关注此点。
  • 持久性表示一旦提交事务,数据将在系统故障的情况下保留。我们的数据库是持久的,因为数据库保存在磁盘上。与内存(RAM)数据库不同,在崩溃后数据仍然保存在磁盘上。

隔离性

首先,我们将描述多个事务同时运行时可能发生的三种异常。

脏读

当允许事务从尚未提交的另一个正在运行的事务修改的行中读取数据时,就会发生_脏读_。

不可重复读

当在事务过程中两次检索行,并且行内的值在读取之间不同时,就会发生_不可重复读_。

幻读

当在事务过程中,另一个事务通过插入或删除查询向正在读取的记录中添加或删除新行时,就会发生_幻读_。

_脏读_从另一个事务中读取未提交的数据。_不可重复读_从另一个事务的更新查询中读取已提交的数据。_幻读_从另一个事务的插入删除查询中读取已提交的数据。_幻读_是_不可重复读_的更复杂的变体。_幻读_涉及范围扫描,因此问题不在单个文档中,而在多个文档中。

隔离级别

SQL标准根据现象定义了四个隔离级别,如下所示:

摘自维基百科

在我们的数据库中,我们将拥有一个_串行化_隔离级别。每个事务在下一个事务开始之前执行完成。

我们将通过在事务开始之前获取RW锁并在结束时释放它来实现它。一个RW锁可以由任意数量的读取器或单个写入器持有。

这意味着我们的数据库可以支持单个写入器或多个读取器同时进行,但不能同时进行。

开始编码

DB

我们将创建一个名为db.go的新文件。我们将添加一个名为DB的新结构和方法OpenCloseDB包含dal和一个RW锁。它管理对磁盘的访问,并确保并发事务不会导致上述异常。

DB面向用户,因此我们将使用公共方法包装dal层。

type DB struct {
	rwlock   sync.RWMutex
	*dal
}

func Open(path string, options *Options) (*DB, error) {
	var err error

	options.pageSize = os.Getpagesize()
	dal, err := newDal(path, options)
	if err != nil {
		return nil, err
	}

	db := &DB{
		sync.RWMutex{},
		dal,
	}

	return db, nil
}

func (db *DB) Close() error {
	return db.close()
}

我们还将添加两个方法ReadTxWriteTx,用于打开事务并获取锁。

func (db *DB) ReadTx() *tx {
	db.rwlock.RLock()
	return newTx(db, false)
}

func (db *DB) WriteTx() *tx {
	db.rwlock.Lock()
	return newTx(db, true)
}

事务

我们将添加一个名为tx.go的新文件。我们将添加一个新的tx结构和构造函数。

type tx struct {
	dirtyNodes    map[pgnum]*Node
	pagesToDelete []pgnum

	// new pages allocated during the transaction. They will be released if rollback is called.
	allocatedPageNums []pgnum

	write bool

	db   *DB
}

func newTx(db *DB, write bool) *tx {
	return &tx{
		map[pgnum]*Node{},
		make([]pgnum, 0),
		make([]pgnum, 0),
		write,
		db,
	}
}

write标志指示事务是否为只读事务。当只读事务尝试进行更改时,我们将在const.go中添加一个新的错误。

var writeInsideReadTxErr = errors.New("can't perform a write operation inside a read transaction")

然后在先前的剧集中的PutRemove方法中使用它。

func (c *Collection) Put(key []byte, value []byte) error {
	if !c.tx.write {
		return writeInsideReadTxErr
	}
  .
  .
  .
}

func (c *Collection) Remove(key []byte) error {
	if !c.tx.write {
		return writeInsideReadTxErr
	}
  .
  .
  .
}

一旦事务开始,它就要么提交并保存所有修改的数据,要么回滚并忽略所有更改。提交更改很容易,就像我们在之前的部分中所做的那样,但回滚更加复杂。在执行回滚时,与其撤销更改,一个更简单的方法是将修改的节点暂时存储在 RAM 中,并一次性将其传播到磁盘上。然后,要执行回滚,我们什么也不需要做,因为数据没有被提交。

为此,dirtyNodes 保存已修改的节点,等待提交。pagesToDelete 保存等待删除的页面。而 allocatedPageNums 保存由事务期间的 freelist 分配的任何新的 pageNums

如果事务是读取,则执行回滚只需释放互斥锁即可。如果是写入事务,则必须释放由 freelist 给出的页面,将片段标记为 nil 并释放互斥锁。

func (tx *tx) Rollback() {
	if !tx.write {
		tx.db.rwlock.RUnlock()
		return
	}

	tx.dirtyNodes = nil
	tx.pagesToDelete = nil
	for _, pageNum := range tx.allocatedPageNums {
		tx.db.freelist.releasePage(pageNum)
	}
	tx.allocatedPageNums = nil
	tx.db.rwlock.Unlock()
}

执行 Commit 也很简单。在读取事务中,只需释放互斥锁。在写入事务中,保存已修改的节点,删除已删除的节点,并标记任何已获取的页面。最后,释放互斥锁。

func (tx *tx) Commit() error {
	if !tx.write {
		tx.db.rwlock.RUnlock()
		return nil
	}

	for _, node := range tx.dirtyNodes {
		_, err := tx.db.writeNode(node)
		if err != nil {
			return err
		}
	}

	for _, pageNum := range tx.pagesToDelete {
		tx.db.deleteNode(pageNum)
	}
	_, err := tx.db.writeFreelist()
	if err != nil {
		return err
	}

	tx.dirtyNodes = nil
	tx.pagesToDelete = nil
	tx.allocatedPageNums = nil
	tx.db.rwlock.Unlock()
	return nil
}

现在,我们想要将我们在以前的章节中添加的读取/写入/删除操作重定向到 dirtyNodespagesToDelete 中,而不是磁盘。这意味着我们将不得不检查代码并将对 dal 的访问替换为对 tx 的访问。

例如,进行以下更改:

func (n *Node) getNode(pageNum pgnum) (*Node, error) {
	return n.dal.getNode(pageNum)
}
func (n *Node) getNode(pageNum pgnum) (*Node, error) {
	return n.tx.getNode(pageNum)
}

或者在集合文件中执行以下操作:

func (c *Collection) Put(key []byte, value []byte) error {
  .
  .
  .
  
  newRoot, err = c.dal.writeNode(newRoot)
      if err != nil {
        return err
      } 
      
   .
   .
   .
  
}
func (c *Collection) Put(key []byte, value []byte) error {
  .
  .
  .
  
  newRoot = c.tx.writeNode(newRoot)
      
   .
   .
   .
  
}

这个过程很漫长,但你可以在 Github 上检查所有更改。

重要的是要注意,一旦我们写入一个节点,它实际上并没有写入磁盘,而是写入了一个临时缓冲区。如果系统在中途崩溃,我们将无法从崩溃点继续事务。

为解决这个问题,可以使用 WAL 跟踪更改。它代表 write-ahead logging,用于提供原子性和耐久性。它通常实现为一个追加只记录数据库更改的日志文件。

然后,每次调用写节点方法(或删除节点)时,我们可以在单独的日志文件中记录更改(而不是在 B-Tree 中!)。如果系统崩溃,我们可以从上次调用恢复数据库。

这也超出了我们数据库的范围,但我们的数据库构建得非常容易扩展以实现这个改进。

第六章总结

在这一部分中,我们不会展示我们的新功能,但在下一部分中,我们将从底层开始拥有一个可工作的数据库。我们将添加用于处理集合的新方法,这将包括我们数据库的所有关键特性。

第七章

在这一部分中,我们将完成数据库。我们将更深入地研究我们已经看到的集合。这一部分也将是我们的最后一集。

集合

我们已经介绍了集合的概念。它们存储具有类似目的的键值对,相当于 RDBMS 表。

我们讨论了在 给定 集合中添加/删除/搜索,但我们没有讨论如何 创建 它们。

我们的想法是拥有一个保存其他集合的主集合(Root Collection)。它将包含键值对,其中 是集合名称。

在 B 树中,每个节点都是子 B-Tree 的根,因此每个集合本身都将是 B 树,并且是整个 B 树的子 B 树。 将指向作为集合子树根的页面编号。

请记住,meta 页面保存 B 树根的页面编号。root 字段将指向 Root Collection 的根页面。

在数据库中搜索键值对的实际流程如下:

  • 首先检索要执行搜索的所需桶。数据库将在 Root Collection 中搜索给定的集合。
  • 获取集合子树根页面
  • 从根页面开始,在集合子树中查找所需元素。第 1 步和第 3 步都使用我们在第 4 部分中实现的 Find 方法。不同之处在于搜索的集合。下面的图像显示了每个箭头对应于过程中的一步。

开始编码

我们将从实现检索根集合的方法开始。关键部分是将其根页面分配与整个 DB 的根页面相同。

然后,我们将添加上面讨论的 GetCollection。在 Root Collection 中搜索集合名称,反序列化内容并返回。

func (tx *tx) getRootCollection() *Collection {
	rootCollection := newEmptyCollection()
	rootCollection.root = tx.db.root
	rootCollection.tx = tx
	return rootCollection
}

func (tx *tx) GetCollection(name []byte) (*Collection, error) {
	rootCollection := tx.getRootCollection()
	item, err := rootCollection.Find(name)
	if err != nil {
		return nil, err
	}

	if item == nil {
		return nil, nil
	}

	collection := newEmptyCollection()
	collection.deserialize(item)
	collection.tx = tx
	return collection, nil
}

创建和删除集合是类似的。在 Root Collection 上应用 Put/Remove。还要确保调用的事务是写入事务。

神奇数字

我们将添加一个最后的好功能。魔数是用于标识文件格式的常量数字或文本值。程序在前几个字节中存储一个标识符,例如 CA FE BA BE(例如 Java 类文件)。当程序加载时,它将比较第一个字节与预期的签名,并确保它们相等。

我们将定义我们的魔数为 D00DB00D(但它可以是任何你选择的)。在将元页面写入磁盘时,我们将包括魔数。在启动数据库时从磁盘加载它时,我们将比较文件内容与预期签名。

const (
	magicNumber uint32 = 0xD00DB00D
	metaPageNum = 0
)

func (m *meta) serialize(buf []byte) {
	pos := 0
	binary.LittleEndian.PutUint32(buf[pos:], magicNumber)
	pos += magicNumberSize
  .
  .
  .
}

func (m *meta) deserialize(buf []byte) {
	pos := 0
	magicNumberRes := binary.LittleEndian.Uint32(buf[pos:])
	pos += magicNumberSize

	if magicNumberRes != magicNumber {
		panic("The file is not a libra db file")
	}
  .
  .
  .
}

第七章总结

这是一个漫长的旅程,但现在我们的数据库已经完成了。如果你跟随到这个点,请给自己点个赞。

要测试我们的数据库,你可以使用 Github 中已完成的测试之一,或编写你自己的场景。任何并发或非并发的读/写操作混合都可以。

总结和最后的话

第一章,我们介绍了广泛的术语,如 SQL vs. NoSQL、数据结构和基于磁盘的存储。我们谈到了我们的数据库,特别是 dal 层及其组件,如 freelistdatabase pages

第二章 是关于持久化到磁盘的。我们介绍了一个位于文件第一页的特殊页面,称为 meta。它包含我们的数据库启动所需的所有信息。我们还添加了写入磁盘的方法,例如:serialize/deserializemetafreelist 页面的读取/写入。每次向数据库添加新实体时,这些方法都会重新出现。第三、四、五章讲述了B树。在第三章中,我们介绍了二叉搜索树,并讨论了在数据库中使用B树的原理。我们决定使用“分槽页面”技术,通过在页面末尾存储数据并在开头存储固定大小的指针来组织磁盘上的数据。

然后,我们开始编写B树,并在其中添加了搜索操作。

在第四章中,我们使用B树编写了插入到数据库的代码。我们首先看到了保持B树平衡的重要性,并添加了确定树是否不平衡的方法-如果节点中有太多/太少元素。然后,我们编写了“分裂”方法来处理重新平衡和插入方法本身。我们还第一次看到了“collection”实体。

在第五章中,我们通过实现删除和适当的平衡方法-左/右旋转和合并-完成了B树的编写。

第六章与之前的部分不同,因为我们没有处理磁盘操作。我们提出了事务的主题-这是由对数据库执行的一系列操作组成的工作单元。

我们谈论了ACID-一组用于可靠地处理事务的属性。我们还仔细研究了隔离属性,并使用锁来确保并发事务不会互相干扰。

最后,在第七章中,我们添加了集合机制,这是数据库功能中剩余的最后一块,以及完全封装它的魔数。

下面是我们数据库的总体架构。

我开始编写数据库的时间已经一年了,直到文章完成。我希望你们读我的文章的乐趣不亚于我写给你们的乐趣。一定要查看名为LibraDB的完整数据库,它可以在GitHub上找到。

译自:https://betterprogramming.pub/build-a-nosql-database-from-the-scratch-in-1000-lines-of-code-8ed1c15ed924

版权声明:本文内容由TeHub注册用户自发贡献,版权归原作者所有,TeHub社区不拥有其著作权,亦不承担相应法律责任。 如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。

点赞(0)
收藏(0)
菜鸟一只
你就是个黄焖鸡,又黄又闷又垃圾。

评论(0)

添加评论