leveldb1.2源码剖析--插入记录(Put)

Posted by gao-xiao-long on October 4, 2016

LevelDB插入一条记录时,采用WAL(write ahead logging)方式,先顺序写到磁盘日志中,然后写入Memtable。这种方式可以保证程序异常结束后数据不丢失,并且由于采用了顺序写+内存写的方式,写性能很高。

LevelDB支持单条写以及批量写的操作,用法分别如下:

一次插入单条记录

db->Put(leveldb::WriteOptions(), key2, value);

WriteBatch形式插入多条记录

leveldb::WriteBatch batch;
batch.Delete(key1);
batch.Put(key2, value);
s = db->Write(leveldb::WriteOptions(), &batch);

在底层实现上,Put操作也是通过将记录写入WriteBatch进行的。

Status DB::Put(const WriteOptions& opt, const Slice& key, const Slice& value) {
  WriteBatch batch;
  batch.Put(key, value);
  return Write(opt, &batch);
}

来看下WriteBatch的定义及实现

WriteBatch将一系列的操作原子性的应用到数据库中,WriteBatch中的所有的更新按照他们被添加到WriteBatch中的顺序执行,比如,当下面的操作执行完成后,”key”的值将会为”v3”:

batch.Put("key", "v1");
batch.Delete("key");
batch.Put("key", "v2");
batch.Put("key", "v3");

WriteBatch的定义如下:

class WriteBatch {
 public:
  WriteBatch();
  ~WriteBatch();

  // Store the mapping "key->value" in the database.
  void Put(const Slice& key, const Slice& value);

  // If the database contains a mapping for "key", erase it.  Else do nothing.
  void Delete(const Slice& key);

  // Clear all updates buffered in this batch.
  void Clear();

  // Support for iterating over the contents of a batch.
  class Handler {
   public:
    virtual ~Handler();
    virtual void Put(const Slice& key, const Slice& value) = 0;
    virtual void Delete(const Slice& key) = 0;
  };
  Status Iterate(Handler* handler) const;

 private:
  friend class WriteBatchInternal;

  std::string rep_;  // See comment in write_batch.cc for the format of rep_

  // Intentionally copyable
};

从定义上看到,WriteBatch通过一个string(rep_)来保存所有的记录,WriteBatch它的结构如下:

write-batch

说明:

  1. sequence_number: 版本号,值=(全局最新版本号+1),在调用Write时填充( WriteBatchInternal::SetSequence(updates, last_sequence + 1) ),用于为WriteBatch中的所有操作生成一个版本号。WriteBatch里的每一次Put或者Delete操作的版本号都是在这个这个sequence_number基础上加1生成。当WriteBatch中的所有操作都执行完成后,全局版本号会增加count(last_sequence += WriteBatchInternal::Count(updates))
  2. count: 此WriteBatch中记录的个数
  3. key_type: 为kTypeDeletion 或 kTypeValue 中的一种。当key_type为kTypeDeletion时,value_length及value_data为空。

当调用Write()函数将WriteBatch中的一系列操作写入数据库时,会先将数据以上述的格式写入Log中,以便数据恢复。 下面看下Write函数逻辑:

Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) {
  Writer w(&mutex_);
  w.batch = my_batch;
  w.sync = options.sync;
  w.done = false;

  MutexLock l(&mutex_);
  writers_.push_back(&w);
  while (!w.done && &w != writers_.front()) {
    w.cv.Wait();
  }
  if (w.done) {
    return w.status;
  }

  // May temporarily unlock and wait.
  Status status = MakeRoomForWrite(my_batch == NULL);
  uint64_t last_sequence = versions_->LastSequence();
  Writer* last_writer = &w;
  if (status.ok() && my_batch != NULL) {  // NULL batch is for compactions
    WriteBatch* updates = BuildBatchGroup(&last_writer);
    WriteBatchInternal::SetSequence(updates, last_sequence + 1);
    last_sequence += WriteBatchInternal::Count(updates);

    // Add to log and apply to memtable.  We can release the lock
    // during this phase since &w is currently responsible for logging
    // and protects against concurrent loggers and concurrent writes
    // into mem_.
    {
      mutex_.Unlock();
      status = log_->AddRecord(WriteBatchInternal::Contents(updates));
      bool sync_error = false;
      if (status.ok() && options.sync) {
        status = logfile_->Sync();
        if (!status.ok()) {
          sync_error = true;
        }
      }
      if (status.ok()) {
        status = WriteBatchInternal::InsertInto(updates, mem_);
      }
      mutex_.Lock();
      if (sync_error) {
        // The state of the log file is indeterminate: the log record we
        // just added may or may not show up when the DB is re-opened.
        // So we force the DB into a mode where all future writes fail.
        RecordBackgroundError(status);
      }
    }
    if (updates == tmp_batch_) tmp_batch_->Clear();

    versions_->SetLastSequence(last_sequence);
  }

  while (true) {
    Writer* ready = writers_.front();
    writers_.pop_front();
    if (ready != &w) {
      ready->status = status;
      ready->done = true;
      ready->cv.Signal();
    }
    if (ready == last_writer) break;
  }

  // Notify new head of write queue
  if (!writers_.empty()) {
    writers_.front()->cv.Signal();
  }

  return status;
}

Write实现上针对多线程并发写做了优化。它没有使用一个Mutex锁住整个Write函数,而是采用了“条件变量+写队列+一个写线程代理其他写线程”方式来减小锁的粒度,提高写性能: 每次执行写操作时,LevelDB首先将待写入的WriteBatch放到writers队列中(定义:std::deque<Writer*> writers_)然后判断队列中是否存在由其它线程放入的待写入DB的WriteBatch,如果存在的话则将此写线程设置为等待条件变量激活(这时候mutex_锁已经被释放,其他写线程可以继续往writers队列中添加任务,增加写线程的并发性),如果队列中不存在其它待写入DB的WriteBatch,表示当前此DB只有一个写线程,则不需要等待条件变量,直接往下执行。写线程被激活后(或者只有一个写线程)会先判断是否需要“MakeRoomForWrite“(21行,延迟写、生成新memtable、唤醒compaction等,后面文章会详细讲解), 然后将writers队列中的WriteBatch按照一定策略(1. 避免每次写入过多数据占用磁盘带宽; 2.只合并options.sync设置相同的WriteBatch)合并到一起(BuildBatchGroup()),分别写到Log及Memtable中(第31行及40行)。最后将已经写入到DB的WriteBatch状态设置为done并激活等待条件变量的线程(第55-69行)。

写操作基本流程分析完毕。


Custom Theme JavaScript