Skip to content

Commit

Permalink
feat: Add .Response() and .Err() method for all subscriber (#177)
Browse files Browse the repository at this point in the history
  • Loading branch information
GoudanWoo committed Mar 22, 2024
1 parent 242d699 commit a8900f8
Show file tree
Hide file tree
Showing 8 changed files with 136 additions and 0 deletions.
17 changes: 17 additions & 0 deletions rpc/ws/accountSubscribe.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,23 @@ func (sw *AccountSubscription) Recv() (*AccountResult, error) {
}
}

func (sw *AccountSubscription) Err() <-chan error {
return sw.sub.err
}

func (sw *AccountSubscription) Response() <-chan *AccountResult {
typedChan := make(chan *AccountResult, 1)
go func(ch chan *AccountResult) {
// TODO: will this subscription yield more than one result?
d, ok := <-sw.sub.stream
if !ok {
return
}
ch <- d.(*AccountResult)
}(typedChan)
return typedChan
}

func (sw *AccountSubscription) Unsubscribe() {
sw.sub.Unsubscribe()
}
17 changes: 17 additions & 0 deletions rpc/ws/blockSubscribe.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,23 @@ func (sw *BlockSubscription) Recv() (*BlockResult, error) {
}
}

func (sw *BlockSubscription) Err() <-chan error {
return sw.sub.err
}

func (sw *BlockSubscription) Response() <-chan *BlockResult {
typedChan := make(chan *BlockResult, 1)
go func(ch chan *BlockResult) {
// TODO: will this subscription yield more than one result?
d, ok := <-sw.sub.stream
if !ok {
return
}
ch <- d.(*BlockResult)
}(typedChan)
return typedChan
}

func (sw *BlockSubscription) Unsubscribe() {
sw.sub.Unsubscribe()
}
17 changes: 17 additions & 0 deletions rpc/ws/logsSubscribe.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,23 @@ func (sw *LogSubscription) Recv() (*LogResult, error) {
}
}

func (sw *LogSubscription) Err() <-chan error {
return sw.sub.err
}

func (sw *LogSubscription) Response() <-chan *LogResult {
typedChan := make(chan *LogResult, 1)
go func(ch chan *LogResult) {
// TODO: will this subscription yield more than one result?
d, ok := <-sw.sub.stream
if !ok {
return
}
ch <- d.(*LogResult)
}(typedChan)
return typedChan
}

func (sw *LogSubscription) Unsubscribe() {
sw.sub.Unsubscribe()
}
17 changes: 17 additions & 0 deletions rpc/ws/programSubscribe.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,23 @@ func (sw *ProgramSubscription) Recv() (*ProgramResult, error) {
}
}

func (sw *ProgramSubscription) Err() <-chan error {
return sw.sub.err
}

func (sw *ProgramSubscription) Response() <-chan *ProgramResult {
typedChan := make(chan *ProgramResult, 1)
go func(ch chan *ProgramResult) {
// TODO: will this subscription yield more than one result?
d, ok := <-sw.sub.stream
if !ok {
return
}
ch <- d.(*ProgramResult)
}(typedChan)
return typedChan
}

func (sw *ProgramSubscription) Unsubscribe() {
sw.sub.Unsubscribe()
}
17 changes: 17 additions & 0 deletions rpc/ws/rootSubscribe.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,23 @@ func (sw *RootSubscription) Recv() (*RootResult, error) {
}
}

func (sw *RootSubscription) Err() <-chan error {
return sw.sub.err
}

func (sw *RootSubscription) Response() <-chan *RootResult {
typedChan := make(chan *RootResult, 1)
go func(ch chan *RootResult) {
// TODO: will this subscription yield more than one result?
d, ok := <-sw.sub.stream
if !ok {
return
}
ch <- d.(*RootResult)
}(typedChan)
return typedChan
}

func (sw *RootSubscription) Unsubscribe() {
sw.sub.Unsubscribe()
}
17 changes: 17 additions & 0 deletions rpc/ws/slotSubscribe.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,23 @@ func (sw *SlotSubscription) Recv() (*SlotResult, error) {
}
}

func (sw *SlotSubscription) Err() <-chan error {
return sw.sub.err
}

func (sw *SlotSubscription) Response() <-chan *SlotResult {
typedChan := make(chan *SlotResult, 1)
go func(ch chan *SlotResult) {
// TODO: will this subscription yield more than one result?
d, ok := <-sw.sub.stream
if !ok {
return
}
ch <- d.(*SlotResult)
}(typedChan)
return typedChan
}

func (sw *SlotSubscription) Unsubscribe() {
sw.sub.Unsubscribe()
}
17 changes: 17 additions & 0 deletions rpc/ws/slotsUpdatesSubscribe.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,23 @@ func (sw *SlotsUpdatesSubscription) Recv() (*SlotsUpdatesResult, error) {
}
}

func (sw *SlotsUpdatesSubscription) Err() <-chan error {
return sw.sub.err
}

func (sw *SlotsUpdatesSubscription) Response() <-chan *SlotsUpdatesResult {
typedChan := make(chan *SlotsUpdatesResult, 1)
go func(ch chan *SlotsUpdatesResult) {
// TODO: will this subscription yield more than one result?
d, ok := <-sw.sub.stream
if !ok {
return
}
ch <- d.(*SlotsUpdatesResult)
}(typedChan)
return typedChan
}

func (sw *SlotsUpdatesSubscription) Unsubscribe() {
sw.sub.Unsubscribe()
}
17 changes: 17 additions & 0 deletions rpc/ws/voteSubscribe.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,23 @@ func (sw *VoteSubscription) Recv() (*VoteResult, error) {
}
}

func (sw *VoteSubscription) Err() <-chan error {
return sw.sub.err
}

func (sw *VoteSubscription) Response() <-chan *VoteResult {
typedChan := make(chan *VoteResult, 1)
go func(ch chan *VoteResult) {
// TODO: will this subscription yield more than one result?
d, ok := <-sw.sub.stream
if !ok {
return
}
ch <- d.(*VoteResult)
}(typedChan)
return typedChan
}

func (sw *VoteSubscription) Unsubscribe() {
sw.sub.Unsubscribe()
}

0 comments on commit a8900f8

Please sign in to comment.