Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add low-level runtime upgrade API #657

Merged
merged 12 commits into from
Sep 22, 2022
3 changes: 3 additions & 0 deletions subxt/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ pub use offline_client::{
OfflineClientT,
};
pub use online_client::{
ClientRuntimeUpdater,
OnlineClient,
OnlineClientT,
RuntimeUpdaterStream,
UpgradeError,
};
158 changes: 120 additions & 38 deletions subxt/src/client/online_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,15 @@
// This file is dual-licensed as Apache-2.0 or GPL-3.0.
// see LICENSE for license details.

use super::{
OfflineClient,
OfflineClientT,
};
use super::{OfflineClient, OfflineClientT};
use crate::{
constants::ConstantsClient,
error::Error,
events::EventsClient,
rpc::{
Rpc,
RpcClientT,
RuntimeVersion,
},
rpc::{Rpc, RpcClientT, RuntimeVersion, Subscription},
storage::StorageClient,
tx::TxClient,
Config,
Metadata,
Config, Metadata,
};
use derivative::Derivative;
use futures::future;
Expand All @@ -33,7 +25,7 @@ pub trait OnlineClientT<T: Config>: OfflineClientT<T> {
}

/// A client that can be used to perform API calls (that is, either those
/// requiriing an [`OfflineClientT`] or those requiring an [`OnlineClientT`]).
/// requiring an [`OfflineClientT`] or those requiring an [`OnlineClientT`]).
#[derive(Derivative)]
#[derivative(Clone(bound = ""))]
pub struct OnlineClient<T: Config> {
Expand Down Expand Up @@ -102,7 +94,7 @@ impl<T: Config> OnlineClient<T> {
})
}

/// Create an object which can be used to keep the runtime uptodate
/// Create an object which can be used to keep the runtime up to date
/// in a separate thread.
///
/// # Example
Expand All @@ -114,10 +106,34 @@ impl<T: Config> OnlineClient<T> {
///
/// let client = OnlineClient::<PolkadotConfig>::new().await.unwrap();
///
/// // high level API.
/// // no possible to know when an upgrade occurs just that it occurs.
niklasad1 marked this conversation as resolved.
Show resolved Hide resolved
niklasad1 marked this conversation as resolved.
Show resolved Hide resolved
///
/// let update_task = client.subscribe_to_updates();
/// tokio::spawn(async move {
/// update_task.perform_runtime_updates().await;
/// });
///
///
/// // low level API.
///
/// let updater = client.subscribe_to_updates();
/// tokio::spawn(async move {
/// let mut update_stream = updater.runtime_updates().await.unwrap();
///
/// while let Some(Ok(update)) = update_stream.next().await {
/// let version = update.runtime_version().spec_version;
///
/// match updater.apply_update(update) {
/// Ok(()) => {
/// println!("Upgrade to version: {} successful", version)
/// }
/// Err(e) => {
/// println!("Upgrade to version {} failed {:?}", version, e);
/// }
/// };
/// }
/// });
/// # }
/// ```
pub fn subscribe_to_updates(&self) -> ClientRuntimeUpdater<T> {
Expand Down Expand Up @@ -209,52 +225,118 @@ impl<T: Config> ClientRuntimeUpdater<T> {
&curr.runtime_version != new
}

fn do_update(&self, update: Update) {
let mut writable = self.0.inner.write();
writable.metadata = update.metadata;
writable.runtime_version = update.runtime_version;
}

/// Tries to apply a new update.
pub fn apply_update(&self, update: Update) -> Result<(), UpgradeError> {
if !self.is_runtime_version_different(&update.runtime_version) {
return Err(UpgradeError::SameVersion);
}

self.do_update(update);

Ok(())
}

/// Performs runtime updates indefinitely unless encountering an error.
///
/// *Note:* This will run indefinitely until it errors, so the typical usage
/// would be to run it in a separate background task.
jsdw marked this conversation as resolved.
Show resolved Hide resolved
pub async fn perform_runtime_updates(&self) -> Result<(), Error> {
// Obtain an update subscription to further detect changes in the runtime version of the node.
let mut update_subscription = self.0.rpc.subscribe_runtime_version().await?;

while let Some(new_runtime_version) = update_subscription.next().await {
// The Runtime Version obtained via subscription.
let new_runtime_version = new_runtime_version?;
let mut runtime_version_stream = self.runtime_updates().await?;

// Ignore this update if there is no difference.
if !self.is_runtime_version_different(&new_runtime_version) {
continue
}
while let Some(update) = runtime_version_stream.next().await {
let update = update?;

// Fetch new metadata.
let new_metadata = self.0.rpc.metadata().await?;

// Do the update.
let mut writable = self.0.inner.write();
writable.metadata = new_metadata;
writable.runtime_version = new_runtime_version;
// Ignore if the update fails.
niklasad1 marked this conversation as resolved.
Show resolved Hide resolved
let _ = self.apply_update(update);
niklasad1 marked this conversation as resolved.
Show resolved Hide resolved
}

Ok(())
}

/// Low-level API to get runtime updates as a stream but it's doesn't check if the
/// runtime version is newer or updates the runtime.
///
/// Instead that's up to the user of this API to decide when to update and
/// to perform the actual updating.
pub async fn runtime_updates(&self) -> Result<RuntimeUpdaterStream<T>, Error> {
let stream = self.0.rpc().subscribe_runtime_version().await?;
Ok(RuntimeUpdaterStream {
stream,
client: self.0.clone(),
})
}
}

/// Stream to perform runtime upgrades.
pub struct RuntimeUpdaterStream<T: Config> {
stream: Subscription<RuntimeVersion>,
client: OnlineClient<T>,
}

impl<T: Config> RuntimeUpdaterStream<T> {
/// Get the next element of the stream.
pub async fn next(&mut self) -> Option<Result<Update, Error>> {
let maybe_runtime_version = self.stream.next().await?;

let runtime_version = match maybe_runtime_version {
Ok(runtime_version) => runtime_version,
Err(err) => return Some(Err(err)),
};

let metadata = match self.client.rpc().metadata().await {
Ok(metadata) => metadata,
Err(err) => return Some(Err(err)),
};

Some(Ok(Update {
metadata,
runtime_version,
}))
}
}

/// Error that can occur during upgrade.
#[non_exhaustive]
#[derive(Debug, Clone)]
pub enum UpgradeError {
/// The version is the same as the current version.
SameVersion,
}

/// Represents the state when a runtime upgrade occurred.
pub struct Update {
runtime_version: RuntimeVersion,
metadata: Metadata,
}

impl Update {
/// Get the runtime version.
pub fn runtime_version(&self) -> &RuntimeVersion {
&self.runtime_version
}

/// Get the metadata.
pub fn metadata(&self) -> &Metadata {
&self.metadata
}
}

// helpers for a jsonrpsee specific OnlineClient.
#[cfg(feature = "jsonrpsee")]
mod jsonrpsee_helpers {
pub use jsonrpsee::{
client_transport::ws::{
InvalidUri,
Receiver,
Sender,
Uri,
WsTransportClientBuilder,
InvalidUri, Receiver, Sender, Uri, WsTransportClientBuilder,
},
core::{
client::{
Client,
ClientBuilder,
},
client::{Client, ClientBuilder},
Error,
},
};
Expand Down