Skip to content

Commit

Permalink
Add SST partitioner interface (rust-rocksdb#512)
Browse files Browse the repository at this point in the history
Add interface for `SstPartitioner`. The interface is to support implementing "compaction guard" to split SST by custom boundaries.

Signed-off-by: Yi Wu <yiwu@pingcap.com>
  • Loading branch information
yiwu-arbug committed Aug 28, 2020
1 parent ee6c08a commit e9b74b8
Show file tree
Hide file tree
Showing 9 changed files with 1,064 additions and 3 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,5 +31,6 @@ librocksdb_sys = { path = "librocksdb_sys" }

[dev-dependencies]
crc = "1.8"
lazy_static = "1.4.0"
rand = "0.7"
tempfile = "3.1"
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ all: format build test

format:
@cargo fmt --all
@librocksdb_sys/crocksdb/format-diff.sh > /dev/null || true
@scripts/format-diff.sh

build:
@cargo build
Expand Down
285 changes: 285 additions & 0 deletions librocksdb_sys/crocksdb/c.cc
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
#include "rocksdb/slice_transform.h"
#include "rocksdb/sst_dump_tool.h"
#include "rocksdb/sst_file_reader.h"
#include "rocksdb/sst_partitioner.h"
#include "rocksdb/statistics.h"
#include "rocksdb/status.h"
#include "rocksdb/table.h"
Expand Down Expand Up @@ -115,6 +116,8 @@ using rocksdb::NewEncryptedEnv;
using rocksdb::NewGenericRateLimiter;
using rocksdb::NewLRUCache;
using rocksdb::Options;
using rocksdb::PartitionerRequest;
using rocksdb::PartitionerResult;
using rocksdb::PinnableSlice;
using rocksdb::RandomAccessFile;
using rocksdb::Range;
Expand All @@ -130,6 +133,8 @@ using rocksdb::SliceTransform;
using rocksdb::Snapshot;
using rocksdb::SstFileReader;
using rocksdb::SstFileWriter;
using rocksdb::SstPartitioner;
using rocksdb::SstPartitionerFactory;
using rocksdb::Status;
using rocksdb::TableProperties;
using rocksdb::TablePropertiesCollection;
Expand Down Expand Up @@ -637,6 +642,24 @@ struct crocksdb_encryption_key_manager_t {
};
#endif

struct crocksdb_sst_partitioner_t {
std::unique_ptr<SstPartitioner> rep;
};

struct crocksdb_sst_partitioner_request_t {
PartitionerRequest* rep;
Slice prev_user_key;
Slice current_user_key;
};

struct crocksdb_sst_partitioner_context_t {
SstPartitioner::Context* rep;
};

struct crocksdb_sst_partitioner_factory_t {
std::shared_ptr<SstPartitionerFactory> rep;
};

static bool SaveError(char** errptr, const Status& s) {
assert(errptr != nullptr);
if (s.ok()) {
Expand Down Expand Up @@ -2490,6 +2513,19 @@ void crocksdb_options_set_max_bytes_for_level_multiplier_additional(
}
}

crocksdb_sst_partitioner_factory_t*
crocksdb_options_get_sst_partitioner_factory(crocksdb_options_t* opt) {
crocksdb_sst_partitioner_factory_t* factory =
new crocksdb_sst_partitioner_factory_t;
factory->rep = opt->rep.sst_partitioner_factory;
return factory;
}

void crocksdb_options_set_sst_partitioner_factory(
crocksdb_options_t* opt, crocksdb_sst_partitioner_factory_t* factory) {
opt->rep.sst_partitioner_factory = factory->rep;
}

void crocksdb_options_enable_statistics(crocksdb_options_t* opt,
unsigned char v) {
if (v) {
Expand Down Expand Up @@ -5631,6 +5667,255 @@ uint64_t crocksdb_iostats_context_logger_nanos(
return ctx->rep.logger_nanos;
}

crocksdb_sst_partitioner_request_t* crocksdb_sst_partitioner_request_create() {
auto* req = new crocksdb_sst_partitioner_request_t;
req->rep =
new PartitionerRequest(req->prev_user_key, req->current_user_key, 0);
return req;
}

void crocksdb_sst_partitioner_request_destroy(
crocksdb_sst_partitioner_request_t* req) {
delete req->rep;
delete req;
}

const char* crocksdb_sst_partitioner_request_prev_user_key(
crocksdb_sst_partitioner_request_t* req, size_t* len) {
const Slice* prev_key = req->rep->prev_user_key;
*len = prev_key->size();
return prev_key->data();
}

const char* crocksdb_sst_partitioner_request_current_user_key(
crocksdb_sst_partitioner_request_t* req, size_t* len) {
const Slice* current_key = req->rep->current_user_key;
*len = current_key->size();
return current_key->data();
}

uint64_t crocksdb_sst_partitioner_request_current_output_file_size(
crocksdb_sst_partitioner_request_t* req) {
return req->rep->current_output_file_size;
}

void crocksdb_sst_partitioner_request_set_prev_user_key(
crocksdb_sst_partitioner_request_t* req, const char* key, size_t len) {
req->prev_user_key = Slice(key, len);
req->rep->prev_user_key = &req->prev_user_key;
}

void crocksdb_sst_partitioner_request_set_current_user_key(
crocksdb_sst_partitioner_request_t* req, const char* key, size_t len) {
req->current_user_key = Slice(key, len);
req->rep->current_user_key = &req->current_user_key;
}

void crocksdb_sst_partitioner_request_set_current_output_file_size(
crocksdb_sst_partitioner_request_t* req,
uint64_t current_output_file_size) {
req->rep->current_output_file_size = current_output_file_size;
}

struct crocksdb_sst_partitioner_impl_t : public SstPartitioner {
void* underlying;
void (*destructor)(void*);
crocksdb_sst_partitioner_should_partition_cb should_partition_cb;
crocksdb_sst_partitioner_can_do_trivial_move_cb can_do_trivial_move_cb;

virtual ~crocksdb_sst_partitioner_impl_t() { destructor(underlying); }

const char* Name() const override { return "crocksdb_sst_partitioner_impl"; }

PartitionerResult ShouldPartition(
const PartitionerRequest& request) override {
crocksdb_sst_partitioner_request_t req;
req.rep = const_cast<PartitionerRequest*>(&request);
return static_cast<PartitionerResult>(
should_partition_cb(underlying, &req));
}

bool CanDoTrivialMove(const Slice& smallest_user_key,
const Slice& largest_user_key) override {
return can_do_trivial_move_cb(
underlying, smallest_user_key.data(), smallest_user_key.size(),
largest_user_key.data(), largest_user_key.size());
}
};

crocksdb_sst_partitioner_t* crocksdb_sst_partitioner_create(
void* underlying, void (*destructor)(void*),
crocksdb_sst_partitioner_should_partition_cb should_partition_cb,
crocksdb_sst_partitioner_can_do_trivial_move_cb can_do_trivial_move_cb) {
crocksdb_sst_partitioner_impl_t* sst_partitioner_impl =
new crocksdb_sst_partitioner_impl_t;
sst_partitioner_impl->underlying = underlying;
sst_partitioner_impl->destructor = destructor;
sst_partitioner_impl->should_partition_cb = should_partition_cb;
sst_partitioner_impl->can_do_trivial_move_cb = can_do_trivial_move_cb;
crocksdb_sst_partitioner_t* sst_partitioner = new crocksdb_sst_partitioner_t;
sst_partitioner->rep.reset(sst_partitioner_impl);
return sst_partitioner;
}

void crocksdb_sst_partitioner_destroy(crocksdb_sst_partitioner_t* partitioner) {
delete partitioner;
}

crocksdb_sst_partitioner_result_t crocksdb_sst_partitioner_should_partition(
crocksdb_sst_partitioner_t* partitioner,
crocksdb_sst_partitioner_request_t* req) {
return static_cast<crocksdb_sst_partitioner_result_t>(
partitioner->rep->ShouldPartition(*req->rep));
}

unsigned char crocksdb_sst_partitioner_can_do_trivial_move(
crocksdb_sst_partitioner_t* partitioner, const char* smallest_user_key,
size_t smallest_user_key_len, const char* largest_user_key,
size_t largest_user_key_len) {
Slice smallest_key(smallest_user_key, smallest_user_key_len);
Slice largest_key(largest_user_key, largest_user_key_len);
return partitioner->rep->CanDoTrivialMove(smallest_key, largest_key);
}

crocksdb_sst_partitioner_context_t* crocksdb_sst_partitioner_context_create() {
auto* rep = new SstPartitioner::Context;
auto* context = new crocksdb_sst_partitioner_context_t;
context->rep = rep;
return context;
}

void crocksdb_sst_partitioner_context_destroy(
crocksdb_sst_partitioner_context_t* context) {
delete context->rep;
delete context;
}

unsigned char crocksdb_sst_partitioner_context_is_full_compaction(
crocksdb_sst_partitioner_context_t* context) {
return context->rep->is_full_compaction;
}

unsigned char crocksdb_sst_partitioner_context_is_manual_compaction(
crocksdb_sst_partitioner_context_t* context) {
return context->rep->is_manual_compaction;
}

int crocksdb_sst_partitioner_context_output_level(
crocksdb_sst_partitioner_context_t* context) {
return context->rep->output_level;
}

const char* crocksdb_sst_partitioner_context_smallest_key(
crocksdb_sst_partitioner_context_t* context, size_t* key_len) {
auto& smallest_key = context->rep->smallest_user_key;
*key_len = smallest_key.size();
return smallest_key.data();
}

const char* crocksdb_sst_partitioner_context_largest_key(
crocksdb_sst_partitioner_context_t* context, size_t* key_len) {
auto& largest_key = context->rep->largest_user_key;
*key_len = largest_key.size();
return largest_key.data();
}

void crocksdb_sst_partitioner_context_set_is_full_compaction(
crocksdb_sst_partitioner_context_t* context,
unsigned char is_full_compaction) {
context->rep->is_full_compaction = is_full_compaction;
}

void crocksdb_sst_partitioner_context_set_is_manual_compaction(
crocksdb_sst_partitioner_context_t* context,
unsigned char is_manual_compaction) {
context->rep->is_manual_compaction = is_manual_compaction;
}

void crocksdb_sst_partitioner_context_set_output_level(
crocksdb_sst_partitioner_context_t* context, int output_level) {
context->rep->output_level = output_level;
}

void crocksdb_sst_partitioner_context_set_smallest_key(
crocksdb_sst_partitioner_context_t* context, const char* smallest_key,
size_t key_len) {
context->rep->smallest_user_key = Slice(smallest_key, key_len);
}

void crocksdb_sst_partitioner_context_set_largest_key(
crocksdb_sst_partitioner_context_t* context, const char* largest_key,
size_t key_len) {
context->rep->largest_user_key = Slice(largest_key, key_len);
}

struct crocksdb_sst_partitioner_factory_impl_t : public SstPartitionerFactory {
void* underlying;
void (*destructor)(void*);
crocksdb_sst_partitioner_factory_name_cb name_cb;
crocksdb_sst_partitioner_factory_create_partitioner_cb create_partitioner_cb;

virtual ~crocksdb_sst_partitioner_factory_impl_t() { destructor(underlying); }

const char* Name() const override { return name_cb(underlying); }

std::unique_ptr<SstPartitioner> CreatePartitioner(
const SstPartitioner::Context& partitioner_context) const override {
crocksdb_sst_partitioner_context_t context;
context.rep = const_cast<SstPartitioner::Context*>(&partitioner_context);
crocksdb_sst_partitioner_t* partitioner =
create_partitioner_cb(underlying, &context);
if (partitioner == nullptr) {
return nullptr;
}
std::unique_ptr<SstPartitioner> rep = std::move(partitioner->rep);
crocksdb_sst_partitioner_destroy(partitioner);
return rep;
}
};

crocksdb_sst_partitioner_factory_t* crocksdb_sst_partitioner_factory_create(
void* underlying, void (*destructor)(void*),
crocksdb_sst_partitioner_factory_name_cb name_cb,
crocksdb_sst_partitioner_factory_create_partitioner_cb
create_partitioner_cb) {
crocksdb_sst_partitioner_factory_impl_t* factory_impl =
new crocksdb_sst_partitioner_factory_impl_t;
factory_impl->underlying = underlying;
factory_impl->destructor = destructor;
factory_impl->name_cb = name_cb;
factory_impl->create_partitioner_cb = create_partitioner_cb;
crocksdb_sst_partitioner_factory_t* factory =
new crocksdb_sst_partitioner_factory_t;
factory->rep.reset(factory_impl);
return factory;
}

void crocksdb_sst_partitioner_factory_destroy(
crocksdb_sst_partitioner_factory_t* factory) {
delete factory;
}

const char* crocksdb_sst_partitioner_factory_name(
crocksdb_sst_partitioner_factory_t* factory) {
return factory->rep->Name();
}

crocksdb_sst_partitioner_t* crocksdb_sst_partitioner_factory_create_partitioner(
crocksdb_sst_partitioner_factory_t* factory,
crocksdb_sst_partitioner_context_t* context) {
std::unique_ptr<SstPartitioner> rep =
factory->rep->CreatePartitioner(*context->rep);
if (rep == nullptr) {
return nullptr;
}
crocksdb_sst_partitioner_t* partitioner = new crocksdb_sst_partitioner_t;
partitioner->rep = std::move(rep);
return partitioner;
}

/* Tools */

void crocksdb_run_ldb_tool(int argc, char** argv,
const crocksdb_options_t* opts) {
LDBTool().Run(argc, argv, opts->rep);
Expand Down

0 comments on commit e9b74b8

Please sign in to comment.