Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add ws reconnection #628

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Expand Up @@ -2,3 +2,4 @@ target
Cargo.lock
*.swp
.idea/
db
85 changes: 85 additions & 0 deletions examples/contract_log_filter_loss_connection.rs
@@ -0,0 +1,85 @@
use std::time::Duration;

use ethabi::Address;
use futures::{future, TryStreamExt};
use hex_literal::hex;
use web3::{
api::BaseFilter,
contract::{Contract, Options},
transports::WebSocket,
types::{Filter, FilterBuilder, Log},
Web3,
};

#[tokio::main]
async fn main() -> web3::contract::Result<()> {
let _ = env_logger::try_init();
let transport = web3::transports::WebSocket::new("ws://localhost:8545").await?;
let web3 = web3::Web3::new(transport);

println!("Calling accounts.");
let accounts = web3.eth().accounts().await?;

let bytecode = include_str!("./res/SimpleEvent.bin");
let contract = Contract::deploy(web3.eth(), include_bytes!("./res/SimpleEvent.abi"))?
.confirmations(1)
.poll_interval(Duration::from_secs(10))
.options(Options::with(|opt| opt.gas = Some(3_000_000u64.into())))
.execute(bytecode, (), accounts[0])
.await
.unwrap();

println!("contract deployed at: {}", contract.address());

tokio::spawn(interval_contract_call(contract.clone(), accounts[0]));

// Filter for Hello event in our contract
let filter = FilterBuilder::default()
.address(vec![contract.address()])
.topics(
Some(vec![hex!(
"d282f389399565f3671145f5916e51652b60eee8e5c759293a2f5771b8ddfd2e"
)
.into()]),
None,
None,
None,
)
.build();

loop {
let filter = get_filter(web3.clone(), &filter).await;
let logs_stream = filter.stream(Duration::from_secs(2));
let res = logs_stream
.try_for_each(|log| {
println!("Get log: {:?}", log);
future::ready(Ok(()))
})
.await;

if let Err(e) = res {
println!("Log Filter Error: {}", e);
}
}
}

async fn interval_contract_call(contract: Contract<WebSocket>, account: Address) {
loop {
match contract.call("hello", (), account, Options::default()).await {
Ok(tx) => println!("got tx: {:?}", tx),
Err(e) => println!("get tx failed: {}", e),
}

tokio::time::sleep(Duration::from_secs(1)).await;
}
}

pub async fn get_filter(web3: Web3<WebSocket>, filter: &Filter) -> BaseFilter<WebSocket, Log> {
loop {
match web3.eth_filter().create_logs_filter(filter.clone()).await {
Err(e) => println!("get filter failed: {}", e),
Ok(filter) => return filter,
}
tokio::time::sleep(Duration::from_secs(1)).await;
}
}
2 changes: 1 addition & 1 deletion examples/readme.md
@@ -1,6 +1,6 @@
First, run ganache

ganache-cli -b 3 -m "hamster coin cup brief quote trick stove draft hobby strong caught unable"
ganache-cli -b 3 -m "hamster coin cup brief quote trick stove draft hobby strong caught unable" --db ./db

Using this mnemonic makes the static account addresses in the example line up

Expand Down
28 changes: 28 additions & 0 deletions examples/transport_ws_loss_connection.rs
@@ -0,0 +1,28 @@
use std::time::Duration;

use ethabi::Address;
use web3::{api::Eth, transports::WebSocket};

#[tokio::main]
async fn main() -> web3::Result<()> {
let _ = env_logger::try_init();
let transport = web3::transports::WebSocket::new("ws://localhost:8545").await?;
let web3 = web3::Web3::new(transport);

println!("Calling accounts.");
let accounts = web3.eth().accounts().await?;

interval_balance(&web3.eth(), accounts[0]).await;

Ok(())
}

async fn interval_balance(eth: &Eth<WebSocket>, account: Address) {
loop {
match eth.balance(account, None).await {
Ok(balance) => println!("Balance of {:?}: {}", account, balance),
Err(e) => println!("Get balance failed: {}", e),
}
tokio::time::sleep(Duration::from_secs(2)).await;
}
}
4 changes: 2 additions & 2 deletions src/api/eth_subscribe.rs
Expand Up @@ -52,7 +52,7 @@ pub struct SubscriptionStream<T: DuplexTransport, I> {
id: SubscriptionId,
#[pin]
rx: T::NotificationStream,
_marker: PhantomData<I>,
_marker: PhantomData<error::Result<I>>,
}

impl<T: DuplexTransport, I> SubscriptionStream<T, I> {
Expand Down Expand Up @@ -90,7 +90,7 @@ where
fn poll_next(self: Pin<&mut Self>, ctx: &mut Context) -> Poll<Option<Self::Item>> {
let this = self.project();
let x = ready!(this.rx.poll_next(ctx));
Poll::Ready(x.map(|result| serde_json::from_value(result).map_err(Into::into)))
Poll::Ready(x.map(|result| result.and_then(|v| serde_json::from_value(v).map_err(Into::into))))
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/lib.rs
Expand Up @@ -75,7 +75,7 @@ pub trait BatchTransport: Transport {
/// A transport implementation supporting pub sub subscriptions.
pub trait DuplexTransport: Transport {
/// The type of stream this transport returns
type NotificationStream: futures::Stream<Item = rpc::Value>;
type NotificationStream: futures::Stream<Item = error::Result<rpc::Value>>;

/// Add a subscription to this transport
fn subscribe(&self, id: api::SubscriptionId) -> error::Result<Self::NotificationStream>;
Expand Down
6 changes: 3 additions & 3 deletions src/transports/either.rs
Expand Up @@ -72,10 +72,10 @@ where
B: DuplexTransport<NotificationStream = BStream>,
A::Out: 'static + Send,
B::Out: 'static + Send,
AStream: futures::Stream<Item = rpc::Value> + 'static + Send,
BStream: futures::Stream<Item = rpc::Value> + 'static + Send,
AStream: futures::Stream<Item = error::Result<rpc::Value>> + 'static + Send,
BStream: futures::Stream<Item = error::Result<rpc::Value>> + 'static + Send,
{
type NotificationStream = BoxStream<'static, rpc::Value>;
type NotificationStream = BoxStream<'static, error::Result<rpc::Value>>;

fn subscribe(&self, id: api::SubscriptionId) -> error::Result<Self::NotificationStream> {
Ok(match *self {
Expand Down
13 changes: 7 additions & 6 deletions src/transports/ipc.rs
@@ -1,8 +1,9 @@
//! IPC transport

use crate::{
api::SubscriptionId, error::TransportError, helpers, BatchTransport, DuplexTransport, Error, RequestId, Result,
Transport,
api::SubscriptionId,
error::{self, TransportError},
helpers, BatchTransport, DuplexTransport, Error, RequestId, Result, Transport,
};
use futures::{
future::{join_all, JoinAll},
Expand Down Expand Up @@ -99,7 +100,7 @@ impl BatchTransport for Ipc {
}

impl DuplexTransport for Ipc {
type NotificationStream = UnboundedReceiverStream<rpc::Value>;
type NotificationStream = UnboundedReceiverStream<crate::error::Result<rpc::Value>>;

fn subscribe(&self, id: SubscriptionId) -> Result<Self::NotificationStream> {
let (tx, rx) = mpsc::unbounded_channel();
Expand Down Expand Up @@ -158,7 +159,7 @@ type TransportRequest = (RequestId, rpc::Call, oneshot::Sender<rpc::Output>);
enum TransportMessage {
Single(TransportRequest),
Batch(Vec<TransportRequest>),
Subscribe(SubscriptionId, mpsc::UnboundedSender<rpc::Value>),
Subscribe(SubscriptionId, mpsc::UnboundedSender<error::Result<rpc::Value>>),
Unsubscribe(SubscriptionId),
}

Expand Down Expand Up @@ -262,7 +263,7 @@ async fn run_server(unix_stream: UnixStream, messages_rx: UnboundedReceiverStrea
}

fn notify(
subscription_txs: &mut BTreeMap<SubscriptionId, mpsc::UnboundedSender<rpc::Value>>,
subscription_txs: &mut BTreeMap<SubscriptionId, mpsc::UnboundedSender<error::Result<rpc::Value>>>,
notification: rpc::Notification,
) -> std::result::Result<(), ()> {
if let rpc::Params::Map(params) = notification.params {
Expand All @@ -272,7 +273,7 @@ fn notify(
if let (Some(&rpc::Value::String(ref id)), Some(result)) = (id, result) {
let id: SubscriptionId = id.clone().into();
if let Some(tx) = subscription_txs.get(&id) {
if let Err(e) = tx.send(result.clone()) {
if let Err(e) = tx.send(Ok(result.clone())) {
log::error!("Error sending notification: {:?} (id: {:?}", e, id);
}
} else {
Expand Down