leveldb 源码分析与实现

leveldb 源码分析与实现

源代码基于 1.22 之后的版本

特性

leveldb 是一个键值对 library,它的键是有序排列的,用户也可以提供自定义的键比较器,多个操作也可以合并为一起,进行原子操作更新。其架构如下:

编译

mkdir -p build && cd build
cmake -DCMAKE_BUILD_TYPE=Release .. && cmake --build .

如果 Ubuntu 提示 No CMAKE_CXX_COMPILER could be found :

sudo apt-get update && sudo apt-get install build-essential

打开数据库

Options

// Options to control the behavior of a database (passed to DB::Open)
struct LEVELDB_EXPORT Options {
  // Create an Options object with default values for all fields.
  Options();

  // -------------------
  // Parameters that affect behavior

  // Comparator 用来决定 key 在 table 中的顺序.
  // Default: a comparator that uses lexicographic byte-wise ordering
  //
  // REQUIRES: The client must ensure that the comparator supplied
  // here has the same name and orders keys *exactly* the same as the
  // comparator provided to previous open calls on the same DB.
  const Comparator* comparator;

  // If true, the database will be created if it is missing.
  bool create_if_missing = false;

  // If true, an error is raised if the database already exists.
  bool error_if_exists = false;

  // If true, the implementation will do aggressive checking of the
  // data it is processing and will stop early if it detects any
  // errors.  This may have unforeseen ramifications: for example, a
  // corruption of one DB entry may cause a large number of entries to
  // become unreadable or for the entire DB to become unopenable.
  bool paranoid_checks = false;

  // Use the specified object to interact with the environment,
  // e.g. to read/write files, schedule background work, etc.
  // Default: Env::Default()
  Env* env;

  // Any internal progress/error information generated by the db will
  // be written to info_log if it is non-null, or to a file stored
  // in the same directory as the DB contents if info_log is null.
  Logger* info_log = nullptr;

  // -------------------
  // Parameters that affect performance

  // 存储在内存中的数据 (backed by an unsorted log
  // on disk) before converting to a sorted on-disk file.
  //
  // 数值较大有助于提升性能 especially during bulk loads.
  // Up to two write buffers may be held in memory at the same time,
  // so you may wish to adjust this parameter to control memory usage.
  // 不过,数值较大也可能造成在下次打开 leveldb 数据库的时候加载时间过长
  size_t write_buffer_size = 4 * 1024 * 1024;

  // DB 最多可以打开多少文件.  You may need to
  // increase this if your database has a large working set (budget
  // one open file per 2MB of working set).
  int max_open_files = 1000;

  // Control over blocks (user data is stored in a set of blocks, and
  // a block is the unit of reading from disk).

  // If non-null, use the specified cache for blocks.
  // If null, leveldb will automatically create and use an 8MB internal cache.
  Cache* block_cache = nullptr;

  // Approximate size of user data packed per block.  Note that the
  // block size specified here corresponds to uncompressed data.  The
  // actual size of the unit read from disk may be smaller if
  // compression is enabled.  This parameter can be changed dynamically.
  size_t block_size = 4 * 1024;

  // Number of keys between restart points for delta encoding of keys.
  // This parameter can be changed dynamically.  Most clients should
  // leave this parameter alone.
  int block_restart_interval = 16;

  // Leveldb will write up to this amount of bytes to a file before
  // switching to a new one.
  // Most clients should leave this parameter alone.  However if your
  // filesystem is more efficient with larger files, you could
  // consider increasing the value.  The downside will be longer
  // compactions and hence longer latency/performance hiccups.
  // Another reason to increase this parameter might be when you are
  // initially populating a large database.
  size_t max_file_size = 2 * 1024 * 1024;

  // Compress blocks using the specified compression algorithm.  This
  // parameter can be changed dynamically.
  //
  // Default: kSnappyCompression, which gives lightweight but fast
  // compression.
  //
  // Typical speeds of kSnappyCompression on an Intel(R) Core(TM)2 2.4GHz:
  //    ~200-500MB/s compression
  //    ~400-800MB/s decompression
  // Note that these speeds are significantly faster than most
  // persistent storage speeds, and therefore it is typically never
  // worth switching to kNoCompression.  Even if the input data is
  // incompressible, the kSnappyCompression implementation will
  // efficiently detect that and will switch to uncompressed mode.
  CompressionType compression = kSnappyCompression;

  // EXPERIMENTAL: If true, append to existing MANIFEST and log files
  // when a database is opened.  This can significantly speed up open.
  //
  // Default: currently false, but may become true later.
  bool reuse_logs = false;

  // If non-null, use the specified filter policy to reduce disk reads.
  // Many applications will benefit from passing the result of
  // NewBloomFilterPolicy() here.
  const FilterPolicy* filter_policy = nullptr;
};

VersionEdit

class VersionEdit {

	private:
	 friend class VersionSet;

	 typedef std::set<std::pair<int, uint64_t>> DeletedFileSet;

	 std::string comparator_;
	 uint64_t log_number_;
	 uint64_t prev_log_number_;
	 uint64_t next_file_number_;
	 SequenceNumber last_sequence_;
	 bool has_comparator_;
	 bool has_log_number_;
	 bool has_prev_log_number_;
	 bool has_next_file_number_;
	 bool has_last_sequence_;

	 std::vector<std::pair<int, InternalKey>> compact_pointers_;
	 DeletedFileSet deleted_files_;
	 std::vector<std::pair<int, FileMetaData>> new_files_;

}

Put

Slice

Slice底层数据结构:

class LEVELDB_EXPORT Slice {
 private:
  const char* data_;
  size_t size_;
};

底层存储数据的地方

// db_impl.h
MemTable* mem_; // 缓存
MemTable* imm_ GUARDED_BY(mutex_); // 数据库

写入顺序

// 先写到 Log 文件中
status = log_->AddRecord(WriteBatchInternal::Contents(write_batch));
bool sync_error = false;
if (status.ok() && options.sync) {
  status = logfile_->Sync();
  if (!status.ok()) {
    sync_error = true;
  }
}

// 如果写到 Log 文件成功
if (status.ok()) {
  // 则写到数据库中
  status = WriteBatchInternal::InsertInto(write_batch, mem_);
}

如下图所示,数据先顺序写入到位于磁盘上的 log 文件中,如果写入成功,则再写入到 MemTable 中:

写 Log 日志文件

日志存储在 WriteBatchstd::string rep_ 里面,其存储的内容如下代码所示:

void WriteBatch::Put(const Slice& key, const Slice& value) {
  // ...
  rep_.push_back(static_cast<char>(kTypeValue));
  PutLengthPrefixedSlice(&rep_, key);
  PutLengthPrefixedSlice(&rep_, value);
}

// coding.cc
void PutLengthPrefixedSlice(std::string* dst, const Slice& value) {
  PutVarint32(dst, value.size());
  dst->append(value.data(), value.size());
}

写入 Log 日志的时候,对记录的类型和数据计算 CRC 编码,这个编码作为 buf 数组的前 4 位,buf 数组的后 3 位依次填充上:

buf[4] = static_cast<char>(length & 0xff);
buf[5] = static_cast<char>(length >> 8);
buf[6] = static_cast<char>(t);

buf 数组的内容就是这条记录的的内容,ptr 指向的是这条记录的实际数据:

// log_writer.cc
// Write the header and the payload
Status s = dest_->Append(Slice(buf, kHeaderSize));
if (s.ok()) {
  s = dest_->Append(Slice(ptr, length));
  if (s.ok()) {
    s = dest_->Flush();
  }
}

的长度为 7 位:

// log_format.h
// Header is checksum (4 bytes), length (2 bytes), type (1 byte).
static const int kHeaderSize = 4 + 2 + 1;

Append 内部调用了 std:memcpy 函数。

写到 MemTable 内存数据库中

void MemTable::Add(SequenceNumber s, ValueType type, const Slice& key,
                   const Slice& value) {
  // 使用 Arena 分配内存
  char* buf = arena_.Allocate(encoded_len);
  char* p = EncodeVarint32(buf, internal_key_size);

  // 将 key 拷贝到 buf 中
  std::memcpy(p, key.data(), key_size);

  // 将 value 拷贝到 buf 中
  std::memcpy(p, value.data(), val_size);

  // 插入到 table 中
  table_.Insert(buf);
}

如下是 std::memcpy 的签名:

// Copies count bytes from the object pointed to by src to the object pointed to by dest.
void* memcpy( void* dest, const void* src, std::size_t count );

上述代码最后一行出现的 table_.Insert(buf),此处的 Table 其实就是跳表:

// memtable.h
typedef SkipList<const char*, KeyComparator> Table;

因此在 MemTable 中分配的 buf 最终将会存储到跳表中。

Delete

Delete 的底层很像 Put 操作:

// write_batch.cc
void Delete(const Slice& key) override {
  mem_->Add(sequence_, kTypeDeletion, key, Slice());
  sequence_++;
}

只是 key 的被标记为 kTypeDeletion 标签了。

Snapshot 快照

创建快照

VersionSet 中获取最后一个序号 LastSequence,然后根据这最后一个序号创建一个快照:

// db_impl.cc
const Snapshot* DBImpl::GetSnapshot() {
  MutexLock l(&mutex_);
  return snapshots_.New(versions_->LastSequence());
}

其中 versions_ 的定义:

VersionSet* const versions_ GUARDED_BY(mutex_);

数据库文件

数据库所有文件

数据库名字命名的文件夹的目录,包含的文件有如下几种类型:

// filename.h
enum FileType {
  kLogFile, // 000025.log
  kDBLockFile, // LOCK
  kTableFile, // xxxxxx.ldb, *.sst
  kDescriptorFile, // MANIFEST-000023
  kCurrentFile, // CURRENT
  kTempFile, // *.dbtmp
  kInfoLogFile  // LOG, LOG.old 
};

下面展示的是一个示例目录结构:

LOG, LOG.old

文件内容示例:

2020/10/25-15:37:29.918667 140486263023424 Recovering log #25
2020/10/25-15:37:29.918906 140486263023424 Level-0 table #27: started
2020/10/25-15:37:29.923419 140486263023424 Level-0 table #27: 133 bytes OK
2020/10/25-15:37:29.930591 140486263023424 Delete type=0 #25
2020/10/25-15:37:29.930639 140486263023424 Delete type=3 #23
2020/10/25-15:37:29.930944 140486263019264 Compacting [email protected] + [email protected] files
2020/10/25-15:37:29.931234 140486263019264 compacted to: files[ 4 1 0 0 0 0 0 ]

文件内容是通过 Log 方法写入进去的:

Log(options_.info_log, "compacted to: %s", versions_->LevelSummary(&tmp));

*.log

位于 db_impl.h 中的 logfile_log_ 指向的文件就是 log 文件:

// db_impl.h
WritableFile* logfile_;
log::Writer* log_;

log 文件中的内容是是通过 AddRecord 写进去的:

status = log_->AddRecord(WriteBatchInternal::Contents(write_batch));

如下是用 Vim 打开的某个 log 文件所看到的内容:

LOCK

// Lock over the persistent DB state.  Non-null iff successfully acquired.
FileLock* db_lock_;

Compaction

当内存中的 MemTable 达到一定大小的时候,Compaction 可以将其内容保持到磁盘中。

版本管理

环形双端链表

AppendVersion 的过程:

[prev] <-> [dummy]

转为:

[prev] <-> [current] <-> [dummy]

参考