Skip to content

Commit

Permalink
WritableFileWriter to allow operation after failure when SyncWithoutF…
Browse files Browse the repository at this point in the history
…lush() is involved (#10555)

Summary:
#10489 adds an assertion in most functions in WritableFileWriter to check no previous error. However, it only works without calling SyncWithoutFlush(). The nature of SyncWithoutFlush() makes two concurrent call fails to check status code of each other and causing assertion failure. Fix the problem by skipping the check after SyncWithoutFlush() is called and not check status code in SyncWithoutFlush().

Since the original change was not officially released yet, the fix isn't added to HISTORY.md.

Pull Request resolved: #10555

Test Plan: Make sure existing tests still pass

Reviewed By: anand1976

Differential Revision: D38946208

fbshipit-source-id: 63566732d3f25c8a8342840499cf7b7d745f27c2
  • Loading branch information
siying authored and bowang committed Aug 24, 2022
1 parent 4e634e9 commit e41fa4e
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 48 deletions.
81 changes: 36 additions & 45 deletions file/writable_file_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,6 @@
#include "util/rate_limiter.h"

namespace ROCKSDB_NAMESPACE {
namespace {
IOStatus AssertFalseAndGetStatusForPrevError() {
assert(false);
return IOStatus::IOError("Writer has previous error.");
}
} // namespace

IOStatus WritableFileWriter::Create(const std::shared_ptr<FileSystem>& fs,
const std::string& fname,
const FileOptions& file_opts,
Expand All @@ -51,7 +44,7 @@ IOStatus WritableFileWriter::Create(const std::shared_ptr<FileSystem>& fs,

IOStatus WritableFileWriter::Append(const Slice& data, uint32_t crc32c_checksum,
Env::IOPriority op_rate_limiter_priority) {
if (seen_error_) {
if (seen_error()) {
return AssertFalseAndGetStatusForPrevError();
}

Expand Down Expand Up @@ -97,7 +90,7 @@ IOStatus WritableFileWriter::Append(const Slice& data, uint32_t crc32c_checksum,
if (buf_.CurrentSize() > 0) {
s = Flush(op_rate_limiter_priority);
if (!s.ok()) {
seen_error_ = true;
set_seen_error();
return s;
}
}
Expand Down Expand Up @@ -179,14 +172,14 @@ IOStatus WritableFileWriter::Append(const Slice& data, uint32_t crc32c_checksum,
uint64_t cur_size = filesize_.load(std::memory_order_acquire);
filesize_.store(cur_size + data.size(), std::memory_order_release);
} else {
seen_error_ = true;
set_seen_error();
}
return s;
}

IOStatus WritableFileWriter::Pad(const size_t pad_bytes,
Env::IOPriority op_rate_limiter_priority) {
if (seen_error_) {
if (seen_error()) {
return AssertFalseAndGetStatusForPrevError();
}
assert(pad_bytes < kDefaultPageSize);
Expand All @@ -204,7 +197,7 @@ IOStatus WritableFileWriter::Pad(const size_t pad_bytes,
if (left > 0) {
IOStatus s = Flush(op_rate_limiter_priority);
if (!s.ok()) {
seen_error_ = true;
set_seen_error();
return s;
}
}
Expand All @@ -222,7 +215,7 @@ IOStatus WritableFileWriter::Pad(const size_t pad_bytes,
}

IOStatus WritableFileWriter::Close() {
if (seen_error_) {
if (seen_error()) {
IOStatus interim;
if (writable_file_.get() != nullptr) {
interim = writable_file_->Close(IOOptions(), nullptr);
Expand Down Expand Up @@ -333,7 +326,7 @@ IOStatus WritableFileWriter::Close() {
checksum_finalized_ = true;
}
} else {
seen_error_ = true;
set_seen_error();
}

return s;
Expand All @@ -342,7 +335,7 @@ IOStatus WritableFileWriter::Close() {
// write out the cached data to the OS cache or storage if direct I/O
// enabled
IOStatus WritableFileWriter::Flush(Env::IOPriority op_rate_limiter_priority) {
if (seen_error_) {
if (seen_error()) {
return AssertFalseAndGetStatusForPrevError();
}

Expand Down Expand Up @@ -370,7 +363,7 @@ IOStatus WritableFileWriter::Flush(Env::IOPriority op_rate_limiter_priority) {
}
}
if (!s.ok()) {
seen_error_ = true;
set_seen_error();
return s;
}
}
Expand Down Expand Up @@ -399,7 +392,7 @@ IOStatus WritableFileWriter::Flush(Env::IOPriority op_rate_limiter_priority) {
}

if (!s.ok()) {
seen_error_ = true;
set_seen_error();
return s;
}

Expand Down Expand Up @@ -427,7 +420,7 @@ IOStatus WritableFileWriter::Flush(Env::IOPriority op_rate_limiter_priority) {
offset_sync_to - last_sync_size_ >= bytes_per_sync_) {
s = RangeSync(last_sync_size_, offset_sync_to - last_sync_size_);
if (!s.ok()) {
seen_error_ = true;
set_seen_error();
}
last_sync_size_ = offset_sync_to;
}
Expand Down Expand Up @@ -455,20 +448,20 @@ const char* WritableFileWriter::GetFileChecksumFuncName() const {
}

IOStatus WritableFileWriter::Sync(bool use_fsync) {
if (seen_error_) {
if (seen_error()) {
return AssertFalseAndGetStatusForPrevError();
}

IOStatus s = Flush();
if (!s.ok()) {
seen_error_ = true;
set_seen_error();
return s;
}
TEST_KILL_RANDOM("WritableFileWriter::Sync:0");
if (!use_direct_io() && pending_sync_) {
s = SyncInternal(use_fsync);
if (!s.ok()) {
seen_error_ = true;
set_seen_error();
return s;
}
}
Expand All @@ -478,10 +471,9 @@ IOStatus WritableFileWriter::Sync(bool use_fsync) {
}

IOStatus WritableFileWriter::SyncWithoutFlush(bool use_fsync) {
if (seen_error_) {
if (seen_error()) {
return AssertFalseAndGetStatusForPrevError();
}

if (!writable_file_->IsSyncThreadSafe()) {
return IOStatus::NotSupported(
"Can't WritableFileWriter::SyncWithoutFlush() because "
Expand All @@ -491,16 +483,16 @@ IOStatus WritableFileWriter::SyncWithoutFlush(bool use_fsync) {
IOStatus s = SyncInternal(use_fsync);
TEST_SYNC_POINT("WritableFileWriter::SyncWithoutFlush:2");
if (!s.ok()) {
seen_error_ = true;
#ifndef NDEBUG
sync_without_flush_called_ = true;
#endif // NDEBUG
set_seen_error();
}
return s;
}

IOStatus WritableFileWriter::SyncInternal(bool use_fsync) {
if (seen_error_) {
return AssertFalseAndGetStatusForPrevError();
}

// Caller is supposed to check seen_error_
IOStatus s;
IOSTATS_TIMER_GUARD(fsync_nanos);
TEST_SYNC_POINT("WritableFileWriter::SyncInternal:0");
Expand Down Expand Up @@ -536,14 +528,13 @@ IOStatus WritableFileWriter::SyncInternal(bool use_fsync) {
}
#endif
SetPerfLevel(prev_perf_level);
if (!s.ok()) {
seen_error_ = true;
}

// The caller will be responsible to call set_seen_error() if s is not OK.
return s;
}

IOStatus WritableFileWriter::RangeSync(uint64_t offset, uint64_t nbytes) {
if (seen_error_) {
if (seen_error()) {
return AssertFalseAndGetStatusForPrevError();
}

Expand All @@ -559,7 +550,7 @@ IOStatus WritableFileWriter::RangeSync(uint64_t offset, uint64_t nbytes) {
io_options.rate_limiter_priority = writable_file_->GetIOPriority();
IOStatus s = writable_file_->RangeSync(offset, nbytes, io_options, nullptr);
if (!s.ok()) {
seen_error_ = true;
set_seen_error();
}
#ifndef ROCKSDB_LITE
if (ShouldNotifyListeners()) {
Expand All @@ -578,7 +569,7 @@ IOStatus WritableFileWriter::RangeSync(uint64_t offset, uint64_t nbytes) {
// limiter if available
IOStatus WritableFileWriter::WriteBuffered(
const char* data, size_t size, Env::IOPriority op_rate_limiter_priority) {
if (seen_error_) {
if (seen_error()) {
return AssertFalseAndGetStatusForPrevError();
}

Expand Down Expand Up @@ -653,7 +644,7 @@ IOStatus WritableFileWriter::WriteBuffered(
}
#endif
if (!s.ok()) {
seen_error_ = true;
set_seen_error();
return s;
}
}
Expand All @@ -669,14 +660,14 @@ IOStatus WritableFileWriter::WriteBuffered(
buf_.Size(0);
buffered_data_crc32c_checksum_ = 0;
if (!s.ok()) {
seen_error_ = true;
set_seen_error();
}
return s;
}

IOStatus WritableFileWriter::WriteBufferedWithChecksum(
const char* data, size_t size, Env::IOPriority op_rate_limiter_priority) {
if (seen_error_) {
if (seen_error()) {
return AssertFalseAndGetStatusForPrevError();
}

Expand Down Expand Up @@ -751,7 +742,7 @@ IOStatus WritableFileWriter::WriteBufferedWithChecksum(
// and let caller determine error handling.
buf_.Size(0);
buffered_data_crc32c_checksum_ = 0;
seen_error_ = true;
set_seen_error();
return s;
}
}
Expand All @@ -766,7 +757,7 @@ IOStatus WritableFileWriter::WriteBufferedWithChecksum(
uint64_t cur_size = flushed_size_.load(std::memory_order_acquire);
flushed_size_.store(cur_size + left, std::memory_order_release);
if (!s.ok()) {
seen_error_ = true;
set_seen_error();
}
return s;
}
Expand Down Expand Up @@ -801,7 +792,7 @@ void WritableFileWriter::Crc32cHandoffChecksumCalculation(const char* data,
#ifndef ROCKSDB_LITE
IOStatus WritableFileWriter::WriteDirect(
Env::IOPriority op_rate_limiter_priority) {
if (seen_error_) {
if (seen_error()) {
assert(false);

return IOStatus::IOError("Writer has previous error.");
Expand Down Expand Up @@ -873,7 +864,7 @@ IOStatus WritableFileWriter::WriteDirect(
}
if (!s.ok()) {
buf_.Size(file_advance + leftover_tail);
seen_error_ = true;
set_seen_error();
return s;
}
}
Expand All @@ -898,14 +889,14 @@ IOStatus WritableFileWriter::WriteDirect(
// behind
next_write_offset_ += file_advance;
} else {
seen_error_ = true;
set_seen_error();
}
return s;
}

IOStatus WritableFileWriter::WriteDirectWithChecksum(
Env::IOPriority op_rate_limiter_priority) {
if (seen_error_) {
if (seen_error()) {
return AssertFalseAndGetStatusForPrevError();
}

Expand Down Expand Up @@ -986,7 +977,7 @@ IOStatus WritableFileWriter::WriteDirectWithChecksum(
buf_.Size(file_advance + leftover_tail);
buffered_data_crc32c_checksum_ =
crc32c::Value(buf_.BufferStart(), buf_.CurrentSize());
seen_error_ = true;
set_seen_error();
return s;
}
}
Expand All @@ -1011,7 +1002,7 @@ IOStatus WritableFileWriter::WriteDirectWithChecksum(
// behind
next_write_offset_ += file_advance;
} else {
seen_error_ = true;
set_seen_error();
}
return s;
}
Expand Down
24 changes: 21 additions & 3 deletions file/writable_file_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,14 @@ class WritableFileWriter {
uint64_t next_write_offset_;
#endif // ROCKSDB_LITE
bool pending_sync_;
bool seen_error_;
std::atomic<bool> seen_error_;
#ifndef NDEBUG
// SyncWithoutFlush() is the function that is allowed to be called
// concurrently with other function. One of the concurrent call
// could set seen_error_, and the other one would hit assertion
// in debug mode.
std::atomic<bool> sync_without_flush_called_ = false;
#endif // NDEBUG
uint64_t last_sync_size_;
uint64_t bytes_per_sync_;
RateLimiter* rate_limiter_;
Expand Down Expand Up @@ -290,10 +297,21 @@ class WritableFileWriter {

const char* GetFileChecksumFuncName() const;

bool seen_error() const { return seen_error_; }
bool seen_error() const {
return seen_error_.load(std::memory_order_relaxed);
}
// For options of relaxed consistency, users might hope to continue
// operating on the file after an error happens.
void reset_seen_error() { seen_error_ = false; }
void reset_seen_error() {
seen_error_.store(false, std::memory_order_relaxed);
}
void set_seen_error() { seen_error_.store(true, std::memory_order_relaxed); }

IOStatus AssertFalseAndGetStatusForPrevError() {
// This should only happen if SyncWithoutFlush() was called.
assert(sync_without_flush_called_);
return IOStatus::IOError("Writer has previous error.");
}

private:
// Decide the Rate Limiter priority.
Expand Down

0 comments on commit e41fa4e

Please sign in to comment.