Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add timestamp support to CompactedDBImpl #10030

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions HISTORY.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
* Add an option, `CompressionOptions::use_zstd_dict_trainer`, to indicate whether zstd dictionary trainer should be used for generating zstd compression dictionaries. The default value of this option is true for backward compatibility. When this option is set to false, zstd API `ZDICT_finalizeDictionary` is used to generate compression dictionaries.
* Seek API which positions itself every LevelIterator on the correct data block in the correct SST file which can be parallelized if ReadOptions.async_io option is enabled.
* Add new stat number_async_seek in PerfContext that indicates number of async calls made by seek to prefetch data.
* Add support for user-defined timestamps to read only DB.

### Bug Fixes
* RocksDB calls FileSystem::Poll API during FilePrefetchBuffer destruction which impacts performance as it waits for read requets completion which is not needed anymore. Calling FileSystem::AbortIO to abort those requests instead fixes that performance issue.
Expand Down
102 changes: 79 additions & 23 deletions db/db_impl/compacted_db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -40,17 +40,45 @@ size_t CompactedDBImpl::FindFile(const Slice& key) {

Status CompactedDBImpl::Get(const ReadOptions& options, ColumnFamilyHandle*,
const Slice& key, PinnableSlice* value) {
return Get(options, /*column_family*/ nullptr, key, value,
/*timestamp*/ nullptr);
}

Status CompactedDBImpl::Get(const ReadOptions& options, ColumnFamilyHandle*,
const Slice& key, PinnableSlice* value,
std::string* timestamp) {
assert(user_comparator_);
if (options.timestamp || user_comparator_->timestamp_size()) {
// TODO: support timestamp
return Status::NotSupported();
if (options.timestamp) {
const Status s =
FailIfTsSizesMismatch(DefaultColumnFamily(), *(options.timestamp));
if (!s.ok()) {
return s;
}
} else {
const Status s = FailIfCfHasTs(DefaultColumnFamily());
if (!s.ok()) {
return s;
}
}
GetWithTimestampReadCallback read_cb(kMaxSequenceNumber);
std::string* ts =
user_comparator_->timestamp_size() > 0 ? timestamp : nullptr;
LookupKey lkey(key, kMaxSequenceNumber, options.timestamp);
GetContext get_context(user_comparator_, nullptr, nullptr, nullptr,
GetContext::kNotFound, key, value, nullptr, nullptr,
nullptr, true, nullptr, nullptr);
LookupKey lkey(key, kMaxSequenceNumber);
Status s = files_.files[FindFile(key)].fd.table_reader->Get(
options, lkey.internal_key(), &get_context, nullptr);
GetContext::kNotFound, lkey.user_key(), value, ts,
nullptr, nullptr, true, nullptr, nullptr, nullptr,
nullptr, &read_cb);

const FdWithKeyRange& f = files_.files[FindFile(lkey.user_key())];
if (user_comparator_->CompareWithoutTimestamp(
key, /*a_has_ts=*/false,
ExtractUserKeyAndStripTimestamp(f.smallest_key,
user_comparator_->timestamp_size()),
/*b_has_ts=*/false) < 0) {
return Status::NotFound();
}
Status s = f.fd.table_reader->Get(options, lkey.internal_key(), &get_context,
nullptr);
if (!s.ok() && !s.IsNotFound()) {
return s;
}
Expand All @@ -60,37 +88,65 @@ Status CompactedDBImpl::Get(const ReadOptions& options, ColumnFamilyHandle*,
return Status::NotFound();
}

std::vector<Status> CompactedDBImpl::MultiGet(const ReadOptions& options,
const std::vector<ColumnFamilyHandle*>&,
std::vector<Status> CompactedDBImpl::MultiGet(
const ReadOptions& options, const std::vector<ColumnFamilyHandle*>&,
const std::vector<Slice>& keys, std::vector<std::string>* values) {
return MultiGet(options, keys, values, /*timestamps*/ nullptr);
}

std::vector<Status> CompactedDBImpl::MultiGet(
const ReadOptions& options, const std::vector<ColumnFamilyHandle*>&,
const std::vector<Slice>& keys, std::vector<std::string>* values,
std::vector<std::string>* timestamps) {
assert(user_comparator_);
if (user_comparator_->timestamp_size() || options.timestamp) {
// TODO: support timestamp
return std::vector<Status>(keys.size(), Status::NotSupported());
size_t num_keys = keys.size();

if (options.timestamp) {
Status s =
FailIfTsSizesMismatch(DefaultColumnFamily(), *(options.timestamp));
if (!s.ok()) {
return std::vector<Status>(num_keys, s);
}
} else {
Status s = FailIfCfHasTs(DefaultColumnFamily());
if (!s.ok()) {
return std::vector<Status>(num_keys, s);
}
}

GetWithTimestampReadCallback read_cb(kMaxSequenceNumber);
autovector<TableReader*, 16> reader_list;
for (const auto& key : keys) {
const FdWithKeyRange& f = files_.files[FindFile(key)];
if (user_comparator_->Compare(key, ExtractUserKey(f.smallest_key)) < 0) {
LookupKey lkey(key, kMaxSequenceNumber, options.timestamp);
const FdWithKeyRange& f = files_.files[FindFile(lkey.user_key())];
if (user_comparator_->CompareWithoutTimestamp(
key, /*a_has_ts=*/false,
ExtractUserKeyAndStripTimestamp(f.smallest_key,
user_comparator_->timestamp_size()),
/*b_has_ts=*/false) < 0) {
reader_list.push_back(nullptr);
} else {
LookupKey lkey(key, kMaxSequenceNumber);
f.fd.table_reader->Prepare(lkey.internal_key());
reader_list.push_back(f.fd.table_reader);
}
}

std::vector<Status> statuses(keys.size(), Status::NotFound());
values->resize(keys.size());
std::vector<Status> statuses(num_keys, Status::NotFound());
values->resize(num_keys);
if (timestamps) {
timestamps->resize(num_keys);
}
int idx = 0;
for (auto* r : reader_list) {
if (r != nullptr) {
PinnableSlice pinnable_val;
std::string& value = (*values)[idx];
GetContext get_context(user_comparator_, nullptr, nullptr, nullptr,
GetContext::kNotFound, keys[idx], &pinnable_val,
nullptr, nullptr, nullptr, true, nullptr, nullptr);
LookupKey lkey(keys[idx], kMaxSequenceNumber);
LookupKey lkey(keys[idx], kMaxSequenceNumber, options.timestamp);
std::string* timestamp = timestamps ? &(*timestamps)[idx] : nullptr;
GetContext get_context(
user_comparator_, nullptr, nullptr, nullptr, GetContext::kNotFound,
lkey.user_key(), &pinnable_val,
user_comparator_->timestamp_size() > 0 ? timestamp : nullptr, nullptr,
nullptr, true, nullptr, nullptr, nullptr, nullptr, &read_cb);
Status s = r->Get(options, lkey.internal_key(), &get_context, nullptr);
assert(static_cast<size_t>(idx) < statuses.size());
if (!s.ok() && !s.IsNotFound()) {
Expand Down
21 changes: 17 additions & 4 deletions db/db_impl/compacted_db_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,25 @@ class CompactedDBImpl : public DBImpl {
virtual Status Get(const ReadOptions& options,
ColumnFamilyHandle* column_family, const Slice& key,
PinnableSlice* value) override;

Status Get(const ReadOptions& options, ColumnFamilyHandle* column_family,
const Slice& key, PinnableSlice* value,
std::string* timestamp) override;

using DB::MultiGet;
// Note that CompactedDBImpl::MultiGet is not the optimized version of
// MultiGet to use.
// TODO: optimize CompactedDBImpl::MultiGet, see DBImpl::MultiGet for details.
virtual std::vector<Status> MultiGet(
const ReadOptions& options,
const std::vector<ColumnFamilyHandle*>&,
const std::vector<Slice>& keys, std::vector<std::string>* values)
override;
const ReadOptions& options, const std::vector<ColumnFamilyHandle*>&,
const std::vector<Slice>& keys,
std::vector<std::string>* values) override;

std::vector<Status> MultiGet(const ReadOptions& options,
jowlyzhang marked this conversation as resolved.
Show resolved Hide resolved
const std::vector<ColumnFamilyHandle*>&,
const std::vector<Slice>& keys,
std::vector<std::string>* values,
std::vector<std::string>* timestamps) override;

using DBImpl::Put;
virtual Status Put(const WriteOptions& /*options*/,
Expand Down
Loading