Skip to content

Commit

Permalink
feat(rpc module): stream API for SubscriptionSink (#639)
Browse files Browse the repository at this point in the history
* feat(rpc module): add_stream to subscription sink

* fix some nits

* unify parameters to rpc methods

* Update core/src/server/rpc_module.rs

* Update tests/tests/integration_tests.rs

Co-authored-by: David <dvdplm@gmail.com>

* address grumbles

* fix subscription tests

* new type for `SubscriptionCallback` and glue code

* remove unsed code

* remove todo

* add missing feature tokio/macros

* make `add_stream` cancel-safe

* rename add_stream and return status

* fix nits

* rename stream API -> streamify

* Update core/src/server/rpc_module.rs

* provide proper close reason

* spelling

* consume_and_streamify + docs

* fmt

* rename API pipe_from_stream

* improve logging; indicate which subscription method that failed

Co-authored-by: David <dvdplm@gmail.com>
  • Loading branch information
niklasad1 and dvdplm committed Jan 21, 2022
1 parent 429c196 commit 9bd2127
Show file tree
Hide file tree
Showing 9 changed files with 468 additions and 200 deletions.
2 changes: 2 additions & 0 deletions core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ anyhow = "1"
arrayvec = "0.7.1"
async-trait = "0.1"
beef = { version = "0.5.1", features = ["impl_serde"] }
async-channel = { version = "1.6", optional = true }
thiserror = "1"
futures-channel = { version = "0.3.14", default-features = false }
futures-util = { version = "0.3.14", default-features = false, optional = true }
Expand All @@ -29,6 +30,7 @@ tokio = { version = "1.8", features = ["rt"], optional = true }
default = []
http-helpers = ["futures-util"]
server = [
"async-channel",
"futures-util",
"rustc-hash",
"tracing",
Expand Down
2 changes: 1 addition & 1 deletion core/src/server/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use std::io;

use crate::{to_json_raw_value, Error};
use futures_channel::mpsc;
use futures_util::stream::StreamExt;
use futures_util::StreamExt;
use jsonrpsee_types::error::{
CallError, ErrorCode, ErrorObject, ErrorResponse, CALL_EXECUTION_FAILED_CODE, OVERSIZED_RESPONSE_CODE,
OVERSIZED_RESPONSE_MSG, UNKNOWN_ERROR_CODE,
Expand Down

0 comments on commit 9bd2127

Please sign in to comment.