Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix two bugs with insert and delete #253

Merged
merged 1 commit into from
Apr 14, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions rust-toolchain
@@ -0,0 +1 @@
nightly-2021-03-15
58 changes: 35 additions & 23 deletions src/transaction/buffer.rs
@@ -1,17 +1,21 @@
// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.

use crate::{BoundRange, Key, KvPair, Result, Value};
use derive_new::new;
use std::{
collections::{BTreeMap, HashMap},
collections::{btree_map::Entry, BTreeMap, HashMap},
future::Future,
};
use tikv_client_proto::kvrpcpb;
use tokio::sync::{Mutex, MutexGuard};

#[derive(Default)]
#[derive(new)]
struct InnerBuffer {
#[new(default)]
primary_key: Option<Key>,
#[new(default)]
entry_map: BTreeMap<Key, BufferEntry>,
is_pessimistic: bool,
}

impl InnerBuffer {
Expand All @@ -29,16 +33,22 @@ impl InnerBuffer {
}

/// A caching layer which buffers reads and writes in a transaction.
#[derive(Default)]
pub struct Buffer {
mutations: Mutex<InnerBuffer>,
}

impl Buffer {
pub fn new(is_pessimistic: bool) -> Buffer {
Buffer {
mutations: Mutex::new(InnerBuffer::new(is_pessimistic)),
}
}

/// Get the primary key of the buffer.
pub async fn get_primary_key(&self) -> Option<Key> {
self.mutations.lock().await.primary_key.clone()
}

/// Get the primary key of the buffer, if not exists, use `key` as the primary key.
pub async fn get_primary_key_or(&self, key: &Key) -> Key {
self.mutations.lock().await.get_primary_key_or(key).clone()
Expand Down Expand Up @@ -203,28 +213,30 @@ impl Buffer {

/// Mark a value as Insert mutation into the buffer (does not write through).
pub async fn insert(&self, key: Key, value: Value) {
self.mutations
.lock()
.await
.insert(key, BufferEntry::Insert(value));
let mut mutations = self.mutations.lock().await;
let mut entry = mutations.entry_map.entry(key.clone());
match entry {
Entry::Occupied(ref mut o) if matches!(o.get(), BufferEntry::Del) => {
o.insert(BufferEntry::Put(value));
}
_ => mutations.insert(key, BufferEntry::Insert(value)),
}
}

/// Mark a value as deleted.
pub async fn delete(&self, key: Key) {
let mut mutations = self.mutations.lock().await;
let value = mutations
.entry_map
.entry(key.clone())
.or_insert(BufferEntry::Del);

let new_value: BufferEntry;
if let BufferEntry::Insert(_) = value {
new_value = BufferEntry::CheckNotExist
} else {
new_value = BufferEntry::Del
let is_pessimistic = mutations.is_pessimistic;
let mut entry = mutations.entry_map.entry(key.clone());

match entry {
Entry::Occupied(ref mut o)
if matches!(o.get(), BufferEntry::Insert(_)) && !is_pessimistic =>
{
o.insert(BufferEntry::CheckNotExist);
}
_ => mutations.insert(key, BufferEntry::Del),
}

mutations.insert(key, new_value);
}

/// Converts the buffered mutations to the proto buffer version
Expand Down Expand Up @@ -378,7 +390,7 @@ mod tests {
#[tokio::test]
#[allow(unreachable_code)]
async fn set_and_get_from_buffer() {
let buffer = Buffer::default();
let buffer = Buffer::new(false);
buffer
.put(b"key1".to_vec().into(), b"value1".to_vec())
.await;
Expand Down Expand Up @@ -411,7 +423,7 @@ mod tests {
#[tokio::test]
#[allow(unreachable_code)]
async fn insert_and_get_from_buffer() {
let buffer = Buffer::default();
let buffer = Buffer::new(false);
buffer
.insert(b"key1".to_vec().into(), b"value1".to_vec())
.await;
Expand Down Expand Up @@ -453,13 +465,13 @@ mod tests {
let v2: Value = b"value2".to_vec();
let v2_ = v2.clone();

let buffer = Buffer::default();
let buffer = Buffer::new(false);
let r1 = block_on(buffer.get_or_else(k1.clone(), move |_| ready(Ok(Some(v1_)))));
let r2 = block_on(buffer.get_or_else(k1.clone(), move |_| ready(panic!())));
assert_eq!(r1.unwrap().unwrap(), v1);
assert_eq!(r2.unwrap().unwrap(), v1);

let buffer = Buffer::default();
let buffer = Buffer::new(false);
let r1 = block_on(
buffer.batch_get_or_else(vec![k1.clone(), k2.clone()].into_iter(), move |_| {
ready(Ok(vec![(k1_, v1__).into(), (k2_, v2_).into()]))
Expand Down
44 changes: 36 additions & 8 deletions src/transaction/lowering.rs
Expand Up @@ -106,22 +106,50 @@ pub fn new_pessimistic_rollback_request(
)
}

pub trait PessimisticLock: Clone {
fn key(self) -> Key;

fn assertion(&self) -> kvrpcpb::Assertion;
}

impl PessimisticLock for Key {
fn key(self) -> Key {
self
}

fn assertion(&self) -> kvrpcpb::Assertion {
kvrpcpb::Assertion::None
}
}

impl PessimisticLock for (Key, kvrpcpb::Assertion) {
fn key(self) -> Key {
self.0
}

fn assertion(&self) -> kvrpcpb::Assertion {
self.1
}
}

pub fn new_pessimistic_lock_request(
keys: impl Iterator<Item = Key>,
locks: impl Iterator<Item = impl PessimisticLock>,
primary_lock: Key,
start_version: Timestamp,
lock_ttl: u64,
for_update_ts: Timestamp,
need_value: bool,
) -> kvrpcpb::PessimisticLockRequest {
requests::new_pessimistic_lock_request(
keys.map(|key| {
let mut mutation = kvrpcpb::Mutation::default();
mutation.set_op(kvrpcpb::Op::PessimisticLock);
mutation.set_key(key.into());
mutation
})
.collect(),
locks
.map(|pl| {
let mut mutation = kvrpcpb::Mutation::default();
mutation.set_op(kvrpcpb::Op::PessimisticLock);
mutation.set_assertion(pl.assertion());
mutation.set_key(pl.key().into());
mutation
})
.collect(),
primary_lock.into(),
start_version.version(),
lock_ttl,
Expand Down
24 changes: 17 additions & 7 deletions src/transaction/transaction.rs
Expand Up @@ -66,7 +66,7 @@ impl<PdC: PdClient> Transaction<PdC> {
Transaction {
status: Arc::new(RwLock::new(status)),
timestamp,
buffer: Default::default(),
buffer: Buffer::new(options.is_pessimistic()),
rpc,
options,
is_heartbeat_started: false,
Expand Down Expand Up @@ -401,8 +401,11 @@ impl<PdC: PdClient> Transaction<PdC> {
return Err(Error::DuplicateKeyInsertion);
}
if self.is_pessimistic() {
self.pessimistic_lock(iter::once(key.clone()), false)
.await?;
self.pessimistic_lock(
iter::once((key.clone(), kvrpcpb::Assertion::NotExist)),
false,
)
.await?;
}
self.buffer.insert(key, value.into()).await;
Ok(())
Expand Down Expand Up @@ -630,20 +633,20 @@ impl<PdC: PdClient> Transaction<PdC> {
/// Only valid for pessimistic transactions, panics if called on an optimistic transaction.
async fn pessimistic_lock(
&mut self,
keys: impl IntoIterator<Item = Key>,
keys: impl IntoIterator<Item = impl PessimisticLock>,
need_value: bool,
) -> Result<Vec<KvPair>> {
assert!(
matches!(self.options.kind, TransactionKind::Pessimistic(_)),
"`pessimistic_lock` is only valid to use with pessimistic transactions"
);

let keys: Vec<Key> = keys.into_iter().collect();
let keys: Vec<_> = keys.into_iter().collect();
if keys.is_empty() {
return Ok(vec![]);
}

let first_key = keys[0].clone();
let first_key = keys[0].clone().key();
let primary_lock = self.buffer.get_primary_key_or(&first_key).await;
let for_update_ts = self.rpc.clone().get_timestamp().await?;
self.options.push_for_update_ts(for_update_ts.clone());
Expand All @@ -667,7 +670,7 @@ impl<PdC: PdClient> Transaction<PdC> {
self.start_auto_heartbeat().await;

for key in keys {
self.buffer.lock(key).await;
self.buffer.lock(key.key()).await;
}

pairs
Expand Down Expand Up @@ -891,6 +894,13 @@ impl TransactionOptions {
self.auto_heartbeat = false;
self
}

pub fn is_pessimistic(&self) -> bool {
match self.kind {
TransactionKind::Pessimistic(_) => true,
TransactionKind::Optimistic => false,
}
}
}

/// The default TTL of a lock in milliseconds.
Expand Down
43 changes: 43 additions & 0 deletions tests/integration_tests.rs
Expand Up @@ -596,6 +596,49 @@ async fn pessimistic_rollback() -> Result<()> {
Ok(())
}

#[tokio::test]
#[serial]
async fn pessimistic_delete() -> Result<()> {
clear_tikv().await;
let client =
TransactionClient::new_with_config(vec!["127.0.0.1:2379"], Default::default()).await?;

// The transaction will lock the keys and must release the locks on commit, even when values are
// not written to the DB.
let mut txn = client.begin_pessimistic().await?;
txn.put(vec![1], vec![42]).await?;
txn.delete(vec![1]).await?;
txn.insert(vec![2], vec![42]).await?;
txn.delete(vec![2]).await?;
txn.put(vec![3], vec![42]).await?;
txn.commit().await?;

// Check that the keys are not locked.
let mut txn2 = client.begin_optimistic().await?;
txn2.put(vec![1], vec![42]).await?;
txn2.put(vec![2], vec![42]).await?;
txn2.put(vec![3], vec![42]).await?;
txn2.commit().await?;

// As before, but rollback instead of commit.
let mut txn = client.begin_pessimistic().await?;
txn.put(vec![1], vec![42]).await?;
txn.delete(vec![1]).await?;
txn.delete(vec![2]).await?;
txn.insert(vec![2], vec![42]).await?;
txn.delete(vec![2]).await?;
txn.put(vec![3], vec![42]).await?;
txn.rollback().await?;

let mut txn2 = client.begin_optimistic().await?;
txn2.put(vec![1], vec![42]).await?;
txn2.put(vec![2], vec![42]).await?;
txn2.put(vec![3], vec![42]).await?;
txn2.commit().await?;

Ok(())
}

#[tokio::test]
#[serial]
async fn lock_keys() -> Result<()> {
Expand Down