Skip to content

Commit

Permalink
Revert open logic changes in #9634 (#9906)
Browse files Browse the repository at this point in the history
Summary:
Left HISTORY.md and unit tests.
Added a new unit test to repro the corruption scenario that this PR fixes, and HISTORY.md line for that.

Pull Request resolved: #9906

Reviewed By: riversand963

Differential Revision: D35940093

Pulled By: ajkr

fbshipit-source-id: 9816f99e1ce405ba36f316beb4f6378c37c8c86b
  • Loading branch information
ajkr authored and akankshamahajan15 committed Apr 26, 2022
1 parent 34681a1 commit 3620e69
Show file tree
Hide file tree
Showing 9 changed files with 176 additions and 240 deletions.
1 change: 1 addition & 0 deletions HISTORY.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
## Unreleased

### Bug Fixes
* Fixed a bug where RocksDB could corrupt DBs with `avoid_flush_during_recovery == true` by removing valid WALs, leading to `Status::Corruption` with message like "SST file is ahead of WALs" when attempting to reopen.
* RocksDB calls FileSystem::Poll API during FilePrefetchBuffer destruction which impacts performance as it waits for read requets completion which is not needed anymore. Calling FileSystem::AbortIO to abort those requests instead fixes that performance issue.

## 7.2.0 (04/15/2022)
Expand Down
62 changes: 62 additions & 0 deletions db/corruption_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,68 @@ TEST_F(CorruptionTest, Recovery) {
Check(36, 36);
}

TEST_F(CorruptionTest, PostPITRCorruptionWALsRetained) {
// Repro for bug where WALs following the point-in-time recovery were not
// retained leading to the next recovery failing.
CloseDb();

options_.wal_recovery_mode = WALRecoveryMode::kPointInTimeRecovery;

const std::string test_cf_name = "test_cf";
std::vector<ColumnFamilyDescriptor> cf_descs;
cf_descs.emplace_back(kDefaultColumnFamilyName, ColumnFamilyOptions());
cf_descs.emplace_back(test_cf_name, ColumnFamilyOptions());

uint64_t log_num;
{
options_.create_missing_column_families = true;
std::vector<ColumnFamilyHandle*> cfhs;
ASSERT_OK(DB::Open(options_, dbname_, cf_descs, &cfhs, &db_));

ASSERT_OK(db_->Put(WriteOptions(), cfhs[0], "k", "v"));
ASSERT_OK(db_->Put(WriteOptions(), cfhs[1], "k", "v"));
ASSERT_OK(db_->Put(WriteOptions(), cfhs[0], "k2", "v2"));
std::vector<uint64_t> file_nums;
GetSortedWalFiles(file_nums);
log_num = file_nums.back();
for (auto* cfh : cfhs) {
delete cfh;
}
CloseDb();
}

CorruptFileWithTruncation(FileType::kWalFile, log_num,
/*bytes_to_truncate=*/1);

{
// Recover "k" -> "v" for both CFs. "k2" -> "v2" is lost due to truncation.
options_.avoid_flush_during_recovery = true;
std::vector<ColumnFamilyHandle*> cfhs;
ASSERT_OK(DB::Open(options_, dbname_, cf_descs, &cfhs, &db_));
// Flush one but not both CFs and write some data so there's a seqno gap
// between the PITR corruption and the next DB session's first WAL.
ASSERT_OK(db_->Put(WriteOptions(), cfhs[1], "k2", "v2"));
ASSERT_OK(db_->Flush(FlushOptions(), cfhs[1]));

for (auto* cfh : cfhs) {
delete cfh;
}
CloseDb();
}

// With the bug, this DB open would remove the WALs following the PITR
// corruption. Then, the next recovery would fail.
for (int i = 0; i < 2; ++i) {
std::vector<ColumnFamilyHandle*> cfhs;
ASSERT_OK(DB::Open(options_, dbname_, cf_descs, &cfhs, &db_));

for (auto* cfh : cfhs) {
delete cfh;
}
CloseDb();
}
}

TEST_F(CorruptionTest, RecoverWriteError) {
env_->writable_file_error_ = true;
Status s = TryReopen();
Expand Down
69 changes: 7 additions & 62 deletions db/db_impl/db_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -1240,43 +1240,6 @@ class DBImpl : public DB {

std::atomic<bool> shutting_down_;

// RecoveryContext struct stores the context about version edits along
// with corresponding column_family_data and column_family_options.
class RecoveryContext {
public:
~RecoveryContext() {
for (auto& edit_list : edit_lists_) {
for (auto* edit : edit_list) {
delete edit;
}
edit_list.clear();
}
cfds_.clear();
mutable_cf_opts_.clear();
edit_lists_.clear();
files_to_delete_.clear();
}

void UpdateVersionEdits(ColumnFamilyData* cfd, const VersionEdit& edit) {
if (map_.find(cfd->GetID()) == map_.end()) {
uint32_t size = static_cast<uint32_t>(map_.size());
map_.emplace(cfd->GetID(), size);
cfds_.emplace_back(cfd);
mutable_cf_opts_.emplace_back(cfd->GetLatestMutableCFOptions());
edit_lists_.emplace_back(autovector<VersionEdit*>());
}
uint32_t i = map_[cfd->GetID()];
edit_lists_[i].emplace_back(new VersionEdit(edit));
}

std::unordered_map<uint32_t, uint32_t> map_; // cf_id to index;
autovector<ColumnFamilyData*> cfds_;
autovector<const MutableCFOptions*> mutable_cf_opts_;
autovector<autovector<VersionEdit*>> edit_lists_;
// files_to_delete_ contains sst files
std::set<std::string> files_to_delete_;
};

// Except in DB::Open(), WriteOptionsFile can only be called when:
// Persist options to options file.
// If need_mutex_lock = false, the method will lock DB mutex.
Expand Down Expand Up @@ -1393,19 +1356,16 @@ class DBImpl : public DB {
// be made to the descriptor are added to *edit.
// recovered_seq is set to less than kMaxSequenceNumber if the log's tail is
// skipped.
// recovery_ctx stores the context about version edits and all those
// edits are persisted to new Manifest after successfully syncing the new WAL.
virtual Status Recover(
const std::vector<ColumnFamilyDescriptor>& column_families,
bool read_only = false, bool error_if_wal_file_exists = false,
bool error_if_data_exists_in_wals = false,
uint64_t* recovered_seq = nullptr,
RecoveryContext* recovery_ctx = nullptr);
uint64_t* recovered_seq = nullptr);

virtual bool OwnTablesAndLogs() const { return true; }

// Set DB identity file, and write DB ID to manifest if necessary.
Status SetDBId(bool read_only, RecoveryContext* recovery_ctx);
Status SetDBId(bool read_only);

// REQUIRES: db mutex held when calling this function, but the db mutex can
// be released and re-acquired. Db mutex will be held when the function
Expand All @@ -1414,15 +1374,12 @@ class DBImpl : public DB {
// not referenced in the MANIFEST (e.g.
// 1. It's best effort recovery;
// 2. The VersionEdits referencing the SST files are appended to
// RecoveryContext, DB crashes when syncing the MANIFEST, the VersionEdits are
// MANIFEST, DB crashes when syncing the MANIFEST, the VersionEdits are
// still not synced to MANIFEST during recovery.)
// It stores the SST files to be deleted in RecoveryContext. In the
// We delete these SST files. In the
// meantime, we find out the largest file number present in the paths, and
// bump up the version set's next_file_number_ to be 1 + largest_file_number.
// recovery_ctx stores the context about version edits and files to be
// deleted. All those edits are persisted to new Manifest after successfully
// syncing the new WAL.
Status DeleteUnreferencedSstFiles(RecoveryContext* recovery_ctx);
Status DeleteUnreferencedSstFiles();

// SetDbSessionId() should be called in the constuctor DBImpl()
// to ensure that db_session_id_ gets updated every time the DB is opened
Expand All @@ -1432,11 +1389,6 @@ class DBImpl : public DB {
Status FailIfTsSizesMismatch(const ColumnFamilyHandle* column_family,
const Slice& ts) const;

// recovery_ctx stores the context about version edits and
// LogAndApplyForRecovery persist all those edits to new Manifest after
// successfully syncing new WAL.
Status LogAndApplyForRecovery(const RecoveryContext& recovery_ctx);

private:
friend class DB;
friend class ErrorHandler;
Expand Down Expand Up @@ -1691,10 +1643,9 @@ class DBImpl : public DB {

// REQUIRES: log_numbers are sorted in ascending order
// corrupted_log_found is set to true if we recover from a corrupted log file.
Status RecoverLogFiles(std::vector<uint64_t>& log_numbers,
Status RecoverLogFiles(const std::vector<uint64_t>& log_numbers,
SequenceNumber* next_sequence, bool read_only,
bool* corrupted_log_found,
RecoveryContext* recovery_ctx);
bool* corrupted_log_found);

// The following two methods are used to flush a memtable to
// storage. The first one is used at database RecoveryTime (when the
Expand All @@ -1704,12 +1655,6 @@ class DBImpl : public DB {
Status WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd,
MemTable* mem, VersionEdit* edit);

// Move all the WAL files starting from corrupted WAL found to
// max_wal_number to avoid column family inconsistency error on recovery. It
// also removes the deleted file from the vector wal_numbers.
void MoveCorruptedWalFiles(std::vector<uint64_t>& wal_numbers,
uint64_t corrupted_wal_number);

// Get the size of a log file and, if truncate is true, truncate the
// log file to its actual size, thereby freeing preallocated space.
// Return success even if truncate fails
Expand Down
36 changes: 25 additions & 11 deletions db/db_impl/db_impl_files.cc
Original file line number Diff line number Diff line change
Expand Up @@ -863,7 +863,7 @@ uint64_t PrecomputeMinLogNumberToKeep2PC(
return min_log_number_to_keep;
}

Status DBImpl::SetDBId(bool read_only, RecoveryContext* recovery_ctx) {
Status DBImpl::SetDBId(bool read_only) {
Status s;
// Happens when immutable_db_options_.write_dbid_to_manifest is set to true
// the very first time.
Expand All @@ -890,22 +890,22 @@ Status DBImpl::SetDBId(bool read_only, RecoveryContext* recovery_ctx) {
}
s = GetDbIdentityFromIdentityFile(&db_id_);
if (immutable_db_options_.write_dbid_to_manifest && s.ok()) {
assert(!read_only);
assert(recovery_ctx != nullptr);
assert(versions_->GetColumnFamilySet() != nullptr);
VersionEdit edit;
edit.SetDBId(db_id_);
Options options;
MutableCFOptions mutable_cf_options(options);
versions_->db_id_ = db_id_;
recovery_ctx->UpdateVersionEdits(
versions_->GetColumnFamilySet()->GetDefault(), edit);
s = versions_->LogAndApply(versions_->GetColumnFamilySet()->GetDefault(),
mutable_cf_options, &edit, &mutex_, nullptr,
/* new_descriptor_log */ false);
}
} else if (!read_only) {
s = SetIdentityFile(env_, dbname_, db_id_);
}
return s;
}

Status DBImpl::DeleteUnreferencedSstFiles(RecoveryContext* recovery_ctx) {
Status DBImpl::DeleteUnreferencedSstFiles() {
mutex_.AssertHeld();
std::vector<std::string> paths;
paths.push_back(NormalizePath(dbname_ + std::string(1, kFilePathSeparator)));
Expand All @@ -925,6 +925,7 @@ Status DBImpl::DeleteUnreferencedSstFiles(RecoveryContext* recovery_ctx) {

uint64_t next_file_number = versions_->current_next_file_number();
uint64_t largest_file_number = next_file_number;
std::set<std::string> files_to_delete;
Status s;
for (const auto& path : paths) {
std::vector<std::string> files;
Expand All @@ -942,9 +943,8 @@ Status DBImpl::DeleteUnreferencedSstFiles(RecoveryContext* recovery_ctx) {
const std::string normalized_fpath = path + fname;
largest_file_number = std::max(largest_file_number, number);
if (type == kTableFile && number >= next_file_number &&
recovery_ctx->files_to_delete_.find(normalized_fpath) ==
recovery_ctx->files_to_delete_.end()) {
recovery_ctx->files_to_delete_.insert(normalized_fpath);
files_to_delete.find(normalized_fpath) == files_to_delete.end()) {
files_to_delete.insert(normalized_fpath);
}
}
}
Expand All @@ -961,7 +961,21 @@ Status DBImpl::DeleteUnreferencedSstFiles(RecoveryContext* recovery_ctx) {
assert(versions_->GetColumnFamilySet());
ColumnFamilyData* default_cfd = versions_->GetColumnFamilySet()->GetDefault();
assert(default_cfd);
recovery_ctx->UpdateVersionEdits(default_cfd, edit);
s = versions_->LogAndApply(
default_cfd, *default_cfd->GetLatestMutableCFOptions(), &edit, &mutex_,
directories_.GetDbDir(), /*new_descriptor_log*/ false);
if (!s.ok()) {
return s;
}

mutex_.Unlock();
for (const auto& fname : files_to_delete) {
s = env_->DeleteFile(fname);
if (!s.ok()) {
break;
}
}
mutex_.Lock();
return s;
}

Expand Down
Loading

0 comments on commit 3620e69

Please sign in to comment.