Skip to content

Commit

Permalink
Fix two bugs with insert and delete
Browse files Browse the repository at this point in the history
Fixes #234

Signed-off-by: Nick Cameron <nrc@ncameron.org>
  • Loading branch information
nrc committed Apr 13, 2021
1 parent 4870985 commit 13fabea
Show file tree
Hide file tree
Showing 5 changed files with 132 additions and 38 deletions.
1 change: 1 addition & 0 deletions rust-toolchain
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
nightly-2021-04-10
58 changes: 35 additions & 23 deletions src/transaction/buffer.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,21 @@
// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.

use crate::{BoundRange, Key, KvPair, Result, Value};
use derive_new::new;
use std::{
collections::{BTreeMap, HashMap},
collections::{btree_map::Entry, BTreeMap, HashMap},
future::Future,
};
use tikv_client_proto::kvrpcpb;
use tokio::sync::{Mutex, MutexGuard};

#[derive(Default)]
#[derive(new)]
struct InnerBuffer {
#[new(default)]
primary_key: Option<Key>,
#[new(default)]
entry_map: BTreeMap<Key, BufferEntry>,
is_pessimistic: bool,
}

impl InnerBuffer {
Expand All @@ -29,16 +33,22 @@ impl InnerBuffer {
}

/// A caching layer which buffers reads and writes in a transaction.
#[derive(Default)]
pub struct Buffer {
mutations: Mutex<InnerBuffer>,
}

impl Buffer {
pub fn new(is_pessimistic: bool) -> Buffer {
Buffer {
mutations: Mutex::new(InnerBuffer::new(is_pessimistic)),
}
}

/// Get the primary key of the buffer.
pub async fn get_primary_key(&self) -> Option<Key> {
self.mutations.lock().await.primary_key.clone()
}

/// Get the primary key of the buffer, if not exists, use `key` as the primary key.
pub async fn get_primary_key_or(&self, key: &Key) -> Key {
self.mutations.lock().await.get_primary_key_or(key).clone()
Expand Down Expand Up @@ -203,28 +213,30 @@ impl Buffer {

/// Mark a value as Insert mutation into the buffer (does not write through).
pub async fn insert(&self, key: Key, value: Value) {
self.mutations
.lock()
.await
.insert(key, BufferEntry::Insert(value));
let mut mutations = self.mutations.lock().await;
let mut entry = mutations.entry_map.entry(key.clone());
match entry {
Entry::Occupied(ref mut o) if matches!(o.get(), BufferEntry::Del) => {
o.insert(BufferEntry::Put(value));
}
_ => mutations.insert(key, BufferEntry::Insert(value)),
}
}

/// Mark a value as deleted.
pub async fn delete(&self, key: Key) {
let mut mutations = self.mutations.lock().await;
let value = mutations
.entry_map
.entry(key.clone())
.or_insert(BufferEntry::Del);

let new_value: BufferEntry;
if let BufferEntry::Insert(_) = value {
new_value = BufferEntry::CheckNotExist
} else {
new_value = BufferEntry::Del
let is_pessimistic = mutations.is_pessimistic;
let mut entry = mutations.entry_map.entry(key.clone());

match entry {
Entry::Occupied(ref mut o)
if matches!(o.get(), BufferEntry::Insert(_)) && !is_pessimistic =>
{
o.insert(BufferEntry::CheckNotExist);
}
_ => mutations.insert(key, BufferEntry::Del),
}

mutations.insert(key, new_value);
}

/// Converts the buffered mutations to the proto buffer version
Expand Down Expand Up @@ -378,7 +390,7 @@ mod tests {
#[tokio::test]
#[allow(unreachable_code)]
async fn set_and_get_from_buffer() {
let buffer = Buffer::default();
let buffer = Buffer::new(false);
buffer
.put(b"key1".to_vec().into(), b"value1".to_vec())
.await;
Expand Down Expand Up @@ -411,7 +423,7 @@ mod tests {
#[tokio::test]
#[allow(unreachable_code)]
async fn insert_and_get_from_buffer() {
let buffer = Buffer::default();
let buffer = Buffer::new(false);
buffer
.insert(b"key1".to_vec().into(), b"value1".to_vec())
.await;
Expand Down Expand Up @@ -453,13 +465,13 @@ mod tests {
let v2: Value = b"value2".to_vec();
let v2_ = v2.clone();

let buffer = Buffer::default();
let buffer = Buffer::new(false);
let r1 = block_on(buffer.get_or_else(k1.clone(), move |_| ready(Ok(Some(v1_)))));
let r2 = block_on(buffer.get_or_else(k1.clone(), move |_| ready(panic!())));
assert_eq!(r1.unwrap().unwrap(), v1);
assert_eq!(r2.unwrap().unwrap(), v1);

let buffer = Buffer::default();
let buffer = Buffer::new(false);
let r1 = block_on(
buffer.batch_get_or_else(vec![k1.clone(), k2.clone()].into_iter(), move |_| {
ready(Ok(vec![(k1_, v1__).into(), (k2_, v2_).into()]))
Expand Down
44 changes: 36 additions & 8 deletions src/transaction/lowering.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,22 +106,50 @@ pub fn new_pessimistic_rollback_request(
)
}

pub trait PessimisticLock: Clone {
fn key(self) -> Key;

fn assertion(&self) -> kvrpcpb::Assertion;
}

impl PessimisticLock for Key {
fn key(self) -> Key {
self
}

fn assertion(&self) -> kvrpcpb::Assertion {
kvrpcpb::Assertion::None
}
}

impl PessimisticLock for (Key, kvrpcpb::Assertion) {
fn key(self) -> Key {
self.0
}

fn assertion(&self) -> kvrpcpb::Assertion {
self.1
}
}

pub fn new_pessimistic_lock_request(
keys: impl Iterator<Item = Key>,
locks: impl Iterator<Item = impl PessimisticLock>,
primary_lock: Key,
start_version: Timestamp,
lock_ttl: u64,
for_update_ts: Timestamp,
need_value: bool,
) -> kvrpcpb::PessimisticLockRequest {
requests::new_pessimistic_lock_request(
keys.map(|key| {
let mut mutation = kvrpcpb::Mutation::default();
mutation.set_op(kvrpcpb::Op::PessimisticLock);
mutation.set_key(key.into());
mutation
})
.collect(),
locks
.map(|pl| {
let mut mutation = kvrpcpb::Mutation::default();
mutation.set_op(kvrpcpb::Op::PessimisticLock);
mutation.set_assertion(pl.assertion());
mutation.set_key(pl.key().into());
mutation
})
.collect(),
primary_lock.into(),
start_version.version(),
lock_ttl,
Expand Down
24 changes: 17 additions & 7 deletions src/transaction/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ impl<PdC: PdClient> Transaction<PdC> {
Transaction {
status: Arc::new(RwLock::new(status)),
timestamp,
buffer: Default::default(),
buffer: Buffer::new(options.is_pessimistic()),
rpc,
options,
is_heartbeat_started: false,
Expand Down Expand Up @@ -401,8 +401,11 @@ impl<PdC: PdClient> Transaction<PdC> {
return Err(Error::DuplicateKeyInsertion);
}
if self.is_pessimistic() {
self.pessimistic_lock(iter::once(key.clone()), false)
.await?;
self.pessimistic_lock(
iter::once((key.clone(), kvrpcpb::Assertion::NotExist)),
false,
)
.await?;
}
self.buffer.insert(key, value.into()).await;
Ok(())
Expand Down Expand Up @@ -630,20 +633,20 @@ impl<PdC: PdClient> Transaction<PdC> {
/// Only valid for pessimistic transactions, panics if called on an optimistic transaction.
async fn pessimistic_lock(
&mut self,
keys: impl IntoIterator<Item = Key>,
keys: impl IntoIterator<Item = impl PessimisticLock>,
need_value: bool,
) -> Result<Vec<KvPair>> {
assert!(
matches!(self.options.kind, TransactionKind::Pessimistic(_)),
"`pessimistic_lock` is only valid to use with pessimistic transactions"
);

let keys: Vec<Key> = keys.into_iter().collect();
let keys: Vec<_> = keys.into_iter().collect();
if keys.is_empty() {
return Ok(vec![]);
}

let first_key = keys[0].clone();
let first_key = keys[0].clone().key();
let primary_lock = self.buffer.get_primary_key_or(&first_key).await;
let for_update_ts = self.rpc.clone().get_timestamp().await?;
self.options.push_for_update_ts(for_update_ts.clone());
Expand All @@ -667,7 +670,7 @@ impl<PdC: PdClient> Transaction<PdC> {
self.start_auto_heartbeat().await;

for key in keys {
self.buffer.lock(key).await;
self.buffer.lock(key.key()).await;
}

pairs
Expand Down Expand Up @@ -891,6 +894,13 @@ impl TransactionOptions {
self.auto_heartbeat = false;
self
}

pub fn is_pessimistic(&self) -> bool {
match self.kind {
TransactionKind::Pessimistic(_) => true,
TransactionKind::Optimistic => false,
}
}
}

/// The default TTL of a lock in milliseconds.
Expand Down
43 changes: 43 additions & 0 deletions tests/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -596,6 +596,49 @@ async fn pessimistic_rollback() -> Result<()> {
Ok(())
}

#[tokio::test]
#[serial]
async fn pessimistic_delete() -> Result<()> {
clear_tikv().await;
let client =
TransactionClient::new_with_config(vec!["127.0.0.1:2379"], Default::default()).await?;

// The transaction will lock the keys and must release the locks on commit, even when values are
// not written to the DB.
let mut txn = client.begin_pessimistic().await?;
txn.put(vec![1], vec![42]).await?;
txn.delete(vec![1]).await?;
txn.insert(vec![2], vec![42]).await?;
txn.delete(vec![2]).await?;
txn.put(vec![3], vec![42]).await?;
txn.commit().await?;

// Check that the keys are not locked.
let mut txn2 = client.begin_optimistic().await?;
txn2.put(vec![1], vec![42]).await?;
txn2.put(vec![2], vec![42]).await?;
txn2.put(vec![3], vec![42]).await?;
txn2.commit().await?;

// As before, but rollback instead of commit.
let mut txn = client.begin_pessimistic().await?;
txn.put(vec![1], vec![42]).await?;
txn.delete(vec![1]).await?;
txn.delete(vec![2]).await?;
txn.insert(vec![2], vec![42]).await?;
txn.delete(vec![2]).await?;
txn.put(vec![3], vec![42]).await?;
txn.rollback().await?;

let mut txn2 = client.begin_optimistic().await?;
txn2.put(vec![1], vec![42]).await?;
txn2.put(vec![2], vec![42]).await?;
txn2.put(vec![3], vec![42]).await?;
txn2.commit().await?;

Ok(())
}

#[tokio::test]
#[serial]
async fn lock_keys() -> Result<()> {
Expand Down

0 comments on commit 13fabea

Please sign in to comment.