Skip to content

Commit

Permalink
Merge pull request #252 from ekexium/batch-get-for-update
Browse files Browse the repository at this point in the history
Fix batch_get_for_update
  • Loading branch information
nrc committed Apr 9, 2021
2 parents 9320198 + c61551c commit 4870985
Show file tree
Hide file tree
Showing 8 changed files with 310 additions and 115 deletions.
5 changes: 3 additions & 2 deletions src/request/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,9 @@ use tikv_client_store::{HasError, Request};

pub use self::{
plan::{
Collect, CollectError, DefaultProcessor, Dispatch, ExtractError, Merge, MergeResponse,
MultiRegion, Plan, Process, ProcessResponse, ResolveLock, RetryRegion,
Collect, CollectAndMatchKey, CollectError, DefaultProcessor, Dispatch, ExtractError,
HasKeys, Merge, MergeResponse, MultiRegion, Plan, PreserveKey, Process, ProcessResponse,
ResolveLock, RetryRegion,
},
plan_builder::{PlanBuilder, SingleKey},
shard::Shardable,
Expand Down
124 changes: 118 additions & 6 deletions src/request/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,13 @@ use crate::{
request::{KvRequest, Shardable},
stats::tikv_stats,
transaction::{resolve_locks, HasLocks},
Error, Result,
util::iter::FlatMapOkIterExt,
Error, Key, KvPair, Result, Value,
};
use async_trait::async_trait;
use futures::{prelude::*, stream::StreamExt};
use std::{marker::PhantomData, sync::Arc};
use tikv_client_proto::kvrpcpb;
use tikv_client_store::{HasError, HasRegionError, KvClient};

/// A plan for how to execute a request. A user builds up a plan with various
Expand Down Expand Up @@ -55,6 +57,12 @@ impl<Req: KvRequest> Plan for Dispatch<Req> {
}
}

impl<Req: KvRequest + HasKeys> HasKeys for Dispatch<Req> {
fn get_keys(&self) -> Vec<Key> {
self.request.get_keys()
}
}

pub struct MultiRegion<P: Plan, PdC: PdClient> {
pub(super) inner: P,
pub pd_client: Arc<PdC>,
Expand Down Expand Up @@ -123,6 +131,12 @@ impl<In: Clone + Send + Sync + 'static, P: Plan<Result = Vec<Result<In>>>, M: Me
#[derive(Clone, Copy)]
pub struct Collect;

/// A merge strategy to be used with
/// [`preserve_keys`](super::plan_builder::PlanBuilder::preserve_keys).
/// It matches the keys preserved before and the values returned in the response.
#[derive(Clone, Debug)]
pub struct CollectAndMatchKey;

/// A merge strategy which returns an error if any response is an error and
/// otherwise returns a Vec of the results.
#[derive(Clone, Copy)]
Expand Down Expand Up @@ -256,6 +270,17 @@ where
}
}

impl<P: Plan + HasKeys, PdC: PdClient> HasKeys for ResolveLock<P, PdC> {
fn get_keys(&self) -> Vec<Key> {
self.inner.get_keys()
}
}

/// When executed, the plan extracts errors from its inner plan, and
/// returns an `Err` wrapping the error.
///
/// The errors come from two places: `Err` from inner plans, and `Ok(response)`
/// where `response` contains unresolved errors (`error` and `region_error`).
pub struct ExtractError<P: Plan> {
pub inner: P,
}
Expand All @@ -268,11 +293,6 @@ impl<P: Plan> Clone for ExtractError<P> {
}
}

/// When executed, the plan extracts errors from its inner plan, and
/// returns an `Err` wrapping the error.
///
/// The errors come from two places: `Err` from inner plans, and `Ok(response)`
/// where `response` contains unresolved errors (`error` and `region_error`).
#[async_trait]
impl<P: Plan> Plan for ExtractError<P>
where
Expand All @@ -292,6 +312,98 @@ where
}
}

/// When executed, the plan clones the keys and execute its inner plan, then
/// returns `(keys, response)`.
///
/// It's useful when the information of keys are lost in the response but needed
/// for processing.
pub struct PreserveKey<P: Plan + HasKeys> {
pub inner: P,
}

impl<P: Plan + HasKeys> Clone for PreserveKey<P> {
fn clone(&self) -> Self {
PreserveKey {
inner: self.inner.clone(),
}
}
}

#[async_trait]
impl<P> Plan for PreserveKey<P>
where
P: Plan + HasKeys,
{
type Result = ResponseAndKeys<P::Result>;

async fn execute(&self) -> Result<Self::Result> {
let keys = self.inner.get_keys();
let res = self.inner.execute().await?;
Ok(ResponseAndKeys(res, keys))
}
}

pub trait HasKeys {
fn get_keys(&self) -> Vec<Key>;
}

// contains a response and the corresponding keys
// currently only used for matching keys and values in pessimistic lock requests
#[derive(Debug, Clone)]
pub struct ResponseAndKeys<Resp>(Resp, Vec<Key>);

impl<Resp: HasError> HasError for ResponseAndKeys<Resp> {
fn error(&mut self) -> Option<Error> {
self.0.error()
}
}

impl<Resp: HasLocks> HasLocks for ResponseAndKeys<Resp> {
fn take_locks(&mut self) -> Vec<tikv_client_proto::kvrpcpb::LockInfo> {
self.0.take_locks()
}
}

impl<Resp: HasRegionError> HasRegionError for ResponseAndKeys<Resp> {
fn region_error(&mut self) -> Option<Error> {
self.0.region_error()
}
}

impl Merge<ResponseAndKeys<kvrpcpb::PessimisticLockResponse>> for CollectAndMatchKey {
type Out = Vec<KvPair>;

fn merge(
&self,
input: Vec<Result<ResponseAndKeys<kvrpcpb::PessimisticLockResponse>>>,
) -> Result<Self::Out> {
input
.into_iter()
.flat_map_ok(|ResponseAndKeys(mut resp, keys)| {
let values = resp.take_values();
let not_founds = resp.take_not_founds();
let v: Vec<_> = if not_founds.is_empty() {
// Legacy TiKV does not distiguish not existing key and existing key
// that with empty value. We assume that key does not exist if value
// is empty.
let values: Vec<Value> = values.into_iter().filter(|v| v.is_empty()).collect();
keys.into_iter().zip(values).map(From::from).collect()
} else {
assert_eq!(values.len(), not_founds.len());
let values: Vec<Value> = values
.into_iter()
.zip(not_founds.into_iter())
.filter_map(|(v, not_found)| if not_found { None } else { Some(v) })
.collect();
keys.into_iter().zip(values).map(From::from).collect()
};
// FIXME sucks to collect and re-iterate, but the iterators have different types
v.into_iter()
})
.collect()
}
}

#[cfg(test)]
mod test {
use super::*;
Expand Down
18 changes: 16 additions & 2 deletions src/request/plan_builder.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
// Copyright 2021 TiKV Project Authors. Licensed under Apache-2.0.

use super::PreserveKey;
use crate::{
backoff::Backoff,
pd::PdClient,
request::{
DefaultProcessor, Dispatch, ExtractError, KvRequest, Merge, MergeResponse, MultiRegion,
Plan, Process, ProcessResponse, ResolveLock, RetryRegion, Shardable,
DefaultProcessor, Dispatch, ExtractError, HasKeys, KvRequest, Merge, MergeResponse,
MultiRegion, Plan, Process, ProcessResponse, ResolveLock, RetryRegion, Shardable,
},
store::Store,
transaction::HasLocks,
Expand Down Expand Up @@ -161,6 +162,19 @@ impl<PdC: PdClient, R: KvRequest> PlanBuilder<PdC, Dispatch<R>, NoTarget> {
}
}

impl<PdC: PdClient, P: Plan + HasKeys> PlanBuilder<PdC, P, NoTarget>
where
P::Result: HasError,
{
pub fn preserve_keys(self) -> PlanBuilder<PdC, PreserveKey<P>, NoTarget> {
PlanBuilder {
pd_client: self.pd_client.clone(),
plan: PreserveKey { inner: self.plan },
phantom: PhantomData,
}
}
}

impl<PdC: PdClient, P: Plan> PlanBuilder<PdC, P, Targetted>
where
P::Result: HasError,
Expand Down
47 changes: 23 additions & 24 deletions src/request/shard.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,30 @@

use crate::{
pd::PdClient,
request::{Dispatch, KvRequest, Plan, ResolveLock, RetryRegion},
request::{Dispatch, HasKeys, KvRequest, Plan, PreserveKey, ResolveLock, RetryRegion},
store::Store,
Result,
};
use futures::stream::BoxStream;
use std::sync::Arc;

macro_rules! impl_inner_shardable {
() => {
type Shard = P::Shard;

fn shards(
&self,
pd_client: &Arc<impl PdClient>,
) -> BoxStream<'static, Result<(Self::Shard, Store)>> {
self.inner.shards(pd_client)
}

fn apply_shard(&mut self, shard: Self::Shard, store: &Store) -> Result<()> {
self.inner.apply_shard(shard, store)
}
};
}

pub trait Shardable {
type Shard: Send;

Expand Down Expand Up @@ -37,33 +54,15 @@ impl<Req: KvRequest + Shardable> Shardable for Dispatch<Req> {
}

impl<P: Plan + Shardable, PdC: PdClient> Shardable for ResolveLock<P, PdC> {
type Shard = P::Shard;

fn shards(
&self,
pd_client: &Arc<impl PdClient>,
) -> BoxStream<'static, Result<(Self::Shard, Store)>> {
self.inner.shards(pd_client)
}
impl_inner_shardable!();
}

fn apply_shard(&mut self, shard: Self::Shard, store: &Store) -> Result<()> {
self.inner.apply_shard(shard, store)
}
impl<P: Plan + HasKeys + Shardable> Shardable for PreserveKey<P> {
impl_inner_shardable!();
}

impl<P: Plan + Shardable, PdC: PdClient> Shardable for RetryRegion<P, PdC> {
type Shard = P::Shard;

fn shards(
&self,
pd_client: &Arc<impl PdClient>,
) -> BoxStream<'static, Result<(Self::Shard, Store)>> {
self.inner.shards(pd_client)
}

fn apply_shard(&mut self, shard: Self::Shard, store: &Store) -> Result<()> {
self.inner.apply_shard(shard, store)
}
impl_inner_shardable!();
}

#[macro_export]
Expand Down
13 changes: 12 additions & 1 deletion src/transaction/requests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@

use crate::{
pd::PdClient,
request::{Collect, DefaultProcessor, KvRequest, Merge, Process, Shardable, SingleKey},
request::{
Collect, DefaultProcessor, HasKeys, KvRequest, Merge, Process, Shardable, SingleKey,
},
store::{store_stream_for_keys, store_stream_for_range_by_start_key, Store},
timestamp::TimestampExt,
transaction::HasLocks,
Expand Down Expand Up @@ -359,6 +361,15 @@ impl Shardable for kvrpcpb::PessimisticLockRequest {
}
}

impl HasKeys for kvrpcpb::PessimisticLockRequest {
fn get_keys(&self) -> Vec<Key> {
self.mutations
.iter()
.map(|m| m.key.clone().into())
.collect()
}
}

impl Merge<kvrpcpb::PessimisticLockResponse> for Collect {
// FIXME: PessimisticLockResponse only contains values.
// We need to pair keys and values returned somewhere.
Expand Down

0 comments on commit 4870985

Please sign in to comment.