Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add low-level runtime upgrade API #657

Merged
merged 12 commits into from
Sep 22, 2022
3 changes: 3 additions & 0 deletions subxt/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ pub use offline_client::{
OfflineClientT,
};
pub use online_client::{
ClientRuntimeUpdater,
OnlineClient,
OnlineClientT,
RuntimeUpdaterStream,
UpgradeError,
};
136 changes: 118 additions & 18 deletions subxt/src/client/online_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use crate::{
Rpc,
RpcClientT,
RuntimeVersion,
Subscription,
},
storage::StorageClient,
tx::TxClient,
Expand All @@ -33,7 +34,7 @@ pub trait OnlineClientT<T: Config>: OfflineClientT<T> {
}

/// A client that can be used to perform API calls (that is, either those
/// requiriing an [`OfflineClientT`] or those requiring an [`OnlineClientT`]).
/// requiring an [`OfflineClientT`] or those requiring an [`OnlineClientT`]).
#[derive(Derivative)]
#[derivative(Clone(bound = ""))]
pub struct OnlineClient<T: Config> {
Expand Down Expand Up @@ -102,7 +103,7 @@ impl<T: Config> OnlineClient<T> {
})
}

/// Create an object which can be used to keep the runtime uptodate
/// Create an object which can be used to keep the runtime up to date
/// in a separate thread.
///
/// # Example
Expand All @@ -114,10 +115,33 @@ impl<T: Config> OnlineClient<T> {
///
/// let client = OnlineClient::<PolkadotConfig>::new().await.unwrap();
///
/// // high level API.
///
/// let update_task = client.subscribe_to_updates();
/// tokio::spawn(async move {
/// update_task.perform_runtime_updates().await;
/// });
///
///
/// // low level API.
///
/// let updater = client.subscribe_to_updates();
/// tokio::spawn(async move {
/// let mut update_stream = updater.runtime_updates().await.unwrap();
///
/// while let Some(Ok(update)) = update_stream.next().await {
/// let version = update.runtime_version().spec_version;
///
/// match updater.apply_update(update) {
/// Ok(()) => {
/// println!("Upgrade to version: {} successful", version)
/// }
/// Err(e) => {
/// println!("Upgrade to version {} failed {:?}", version, e);
/// }
/// };
/// }
/// });
/// # }
/// ```
pub fn subscribe_to_updates(&self) -> ClientRuntimeUpdater<T> {
Expand Down Expand Up @@ -209,34 +233,110 @@ impl<T: Config> ClientRuntimeUpdater<T> {
&curr.runtime_version != new
}

fn do_update(&self, update: Update) {
let mut writable = self.0.inner.write();
writable.metadata = update.metadata;
writable.runtime_version = update.runtime_version;
}

/// Tries to apply a new update.
pub fn apply_update(&self, update: Update) -> Result<(), UpgradeError> {
if !self.is_runtime_version_different(&update.runtime_version) {
return Err(UpgradeError::SameVersion)
}

self.do_update(update);

Ok(())
}

/// Performs runtime updates indefinitely unless encountering an error.
///
/// *Note:* This will run indefinitely until it errors, so the typical usage
/// would be to run it in a separate background task.
jsdw marked this conversation as resolved.
Show resolved Hide resolved
pub async fn perform_runtime_updates(&self) -> Result<(), Error> {
// Obtain an update subscription to further detect changes in the runtime version of the node.
let mut update_subscription = self.0.rpc.subscribe_runtime_version().await?;

while let Some(new_runtime_version) = update_subscription.next().await {
// The Runtime Version obtained via subscription.
let new_runtime_version = new_runtime_version?;
let mut runtime_version_stream = self.runtime_updates().await?;

// Ignore this update if there is no difference.
if !self.is_runtime_version_different(&new_runtime_version) {
continue
}
while let Some(update) = runtime_version_stream.next().await {
let update = update?;

// Fetch new metadata.
let new_metadata = self.0.rpc.metadata().await?;

// Do the update.
let mut writable = self.0.inner.write();
writable.metadata = new_metadata;
writable.runtime_version = new_runtime_version;
// This only fails if received the runtime version is the same the current runtime version
// which might occur because that runtime subscriptions in substrate sends out the initial
// value when they created and not only when runtime upgrades occurs.
// Thus, fine to ignore here as it strictly speaking isn't really an error
let _ = self.apply_update(update);
}

Ok(())
}

/// Low-level API to get runtime updates as a stream but it's doesn't check if the
/// runtime version is newer or updates the runtime.
///
/// Instead that's up to the user of this API to decide when to update and
/// to perform the actual updating.
pub async fn runtime_updates(&self) -> Result<RuntimeUpdaterStream<T>, Error> {
let stream = self.0.rpc().subscribe_runtime_version().await?;
Ok(RuntimeUpdaterStream {
stream,
client: self.0.clone(),
})
}
}

/// Stream to perform runtime upgrades.
pub struct RuntimeUpdaterStream<T: Config> {
stream: Subscription<RuntimeVersion>,
client: OnlineClient<T>,
}

impl<T: Config> RuntimeUpdaterStream<T> {
/// Get the next element of the stream.
pub async fn next(&mut self) -> Option<Result<Update, Error>> {
let maybe_runtime_version = self.stream.next().await?;

let runtime_version = match maybe_runtime_version {
Ok(runtime_version) => runtime_version,
Err(err) => return Some(Err(err)),
};

let metadata = match self.client.rpc().metadata().await {
Ok(metadata) => metadata,
Err(err) => return Some(Err(err)),
};

Some(Ok(Update {
metadata,
runtime_version,
}))
}
}

/// Error that can occur during upgrade.
#[non_exhaustive]
#[derive(Debug, Clone)]
pub enum UpgradeError {
/// The version is the same as the current version.
SameVersion,
}

/// Represents the state when a runtime upgrade occurred.
pub struct Update {
runtime_version: RuntimeVersion,
metadata: Metadata,
}

impl Update {
/// Get the runtime version.
pub fn runtime_version(&self) -> &RuntimeVersion {
&self.runtime_version
}

/// Get the metadata.
pub fn metadata(&self) -> &Metadata {
&self.metadata
}
}

// helpers for a jsonrpsee specific OnlineClient.
Expand Down