Skip to content

Commit

Permalink
feat: add low-level runtime upgrade API (#657)
Browse files Browse the repository at this point in the history
* feat: add low-level runtime upgrade API

* grumbles

* cargo fmt

* dont use apply_update in perform_runtime_upgrades

* fix nit

* remove metadata check

* fix doc tests

* fix final comments

* Update subxt/src/client/online_client.rs

* cargo fmt

* Update subxt/src/client/online_client.rs
  • Loading branch information
niklasad1 committed Sep 22, 2022
1 parent 93cc55f commit e233dd6
Show file tree
Hide file tree
Showing 2 changed files with 121 additions and 18 deletions.
3 changes: 3 additions & 0 deletions subxt/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ pub use offline_client::{
OfflineClientT,
};
pub use online_client::{
ClientRuntimeUpdater,
OnlineClient,
OnlineClientT,
RuntimeUpdaterStream,
UpgradeError,
};
136 changes: 118 additions & 18 deletions subxt/src/client/online_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use crate::{
Rpc,
RpcClientT,
RuntimeVersion,
Subscription,
},
storage::StorageClient,
tx::TxClient,
Expand All @@ -33,7 +34,7 @@ pub trait OnlineClientT<T: Config>: OfflineClientT<T> {
}

/// A client that can be used to perform API calls (that is, either those
/// requiriing an [`OfflineClientT`] or those requiring an [`OnlineClientT`]).
/// requiring an [`OfflineClientT`] or those requiring an [`OnlineClientT`]).
#[derive(Derivative)]
#[derivative(Clone(bound = ""))]
pub struct OnlineClient<T: Config> {
Expand Down Expand Up @@ -102,7 +103,7 @@ impl<T: Config> OnlineClient<T> {
})
}

/// Create an object which can be used to keep the runtime uptodate
/// Create an object which can be used to keep the runtime up to date
/// in a separate thread.
///
/// # Example
Expand All @@ -114,10 +115,33 @@ impl<T: Config> OnlineClient<T> {
///
/// let client = OnlineClient::<PolkadotConfig>::new().await.unwrap();
///
/// // high level API.
///
/// let update_task = client.subscribe_to_updates();
/// tokio::spawn(async move {
/// update_task.perform_runtime_updates().await;
/// });
///
///
/// // low level API.
///
/// let updater = client.subscribe_to_updates();
/// tokio::spawn(async move {
/// let mut update_stream = updater.runtime_updates().await.unwrap();
///
/// while let Some(Ok(update)) = update_stream.next().await {
/// let version = update.runtime_version().spec_version;
///
/// match updater.apply_update(update) {
/// Ok(()) => {
/// println!("Upgrade to version: {} successful", version)
/// }
/// Err(e) => {
/// println!("Upgrade to version {} failed {:?}", version, e);
/// }
/// };
/// }
/// });
/// # }
/// ```
pub fn subscribe_to_updates(&self) -> ClientRuntimeUpdater<T> {
Expand Down Expand Up @@ -209,34 +233,110 @@ impl<T: Config> ClientRuntimeUpdater<T> {
&curr.runtime_version != new
}

fn do_update(&self, update: Update) {
let mut writable = self.0.inner.write();
writable.metadata = update.metadata;
writable.runtime_version = update.runtime_version;
}

/// Tries to apply a new update.
pub fn apply_update(&self, update: Update) -> Result<(), UpgradeError> {
if !self.is_runtime_version_different(&update.runtime_version) {
return Err(UpgradeError::SameVersion)
}

self.do_update(update);

Ok(())
}

/// Performs runtime updates indefinitely unless encountering an error.
///
/// *Note:* This will run indefinitely until it errors, so the typical usage
/// would be to run it in a separate background task.
pub async fn perform_runtime_updates(&self) -> Result<(), Error> {
// Obtain an update subscription to further detect changes in the runtime version of the node.
let mut update_subscription = self.0.rpc.subscribe_runtime_version().await?;

while let Some(new_runtime_version) = update_subscription.next().await {
// The Runtime Version obtained via subscription.
let new_runtime_version = new_runtime_version?;
let mut runtime_version_stream = self.runtime_updates().await?;

// Ignore this update if there is no difference.
if !self.is_runtime_version_different(&new_runtime_version) {
continue
}
while let Some(update) = runtime_version_stream.next().await {
let update = update?;

// Fetch new metadata.
let new_metadata = self.0.rpc.metadata().await?;

// Do the update.
let mut writable = self.0.inner.write();
writable.metadata = new_metadata;
writable.runtime_version = new_runtime_version;
// This only fails if received the runtime version is the same the current runtime version
// which might occur because that runtime subscriptions in substrate sends out the initial
// value when they created and not only when runtime upgrades occurs.
// Thus, fine to ignore here as it strictly speaking isn't really an error
let _ = self.apply_update(update);
}

Ok(())
}

/// Low-level API to get runtime updates as a stream but it's doesn't check if the
/// runtime version is newer or updates the runtime.
///
/// Instead that's up to the user of this API to decide when to update and
/// to perform the actual updating.
pub async fn runtime_updates(&self) -> Result<RuntimeUpdaterStream<T>, Error> {
let stream = self.0.rpc().subscribe_runtime_version().await?;
Ok(RuntimeUpdaterStream {
stream,
client: self.0.clone(),
})
}
}

/// Stream to perform runtime upgrades.
pub struct RuntimeUpdaterStream<T: Config> {
stream: Subscription<RuntimeVersion>,
client: OnlineClient<T>,
}

impl<T: Config> RuntimeUpdaterStream<T> {
/// Get the next element of the stream.
pub async fn next(&mut self) -> Option<Result<Update, Error>> {
let maybe_runtime_version = self.stream.next().await?;

let runtime_version = match maybe_runtime_version {
Ok(runtime_version) => runtime_version,
Err(err) => return Some(Err(err)),
};

let metadata = match self.client.rpc().metadata().await {
Ok(metadata) => metadata,
Err(err) => return Some(Err(err)),
};

Some(Ok(Update {
metadata,
runtime_version,
}))
}
}

/// Error that can occur during upgrade.
#[non_exhaustive]
#[derive(Debug, Clone)]
pub enum UpgradeError {
/// The version is the same as the current version.
SameVersion,
}

/// Represents the state when a runtime upgrade occurred.
pub struct Update {
runtime_version: RuntimeVersion,
metadata: Metadata,
}

impl Update {
/// Get the runtime version.
pub fn runtime_version(&self) -> &RuntimeVersion {
&self.runtime_version
}

/// Get the metadata.
pub fn metadata(&self) -> &Metadata {
&self.metadata
}
}

// helpers for a jsonrpsee specific OnlineClient.
Expand Down

0 comments on commit e233dd6

Please sign in to comment.