Skip to content

Commit

Permalink
Add error handling to WebSocket handleMessage function, resolves gagl…
Browse files Browse the repository at this point in the history
  • Loading branch information
theghostmac committed May 8, 2024
1 parent 7c6835c commit 384322f
Show file tree
Hide file tree
Showing 10 changed files with 46 additions and 26 deletions.
8 changes: 4 additions & 4 deletions rpc/ws/accountSubscribe.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,11 @@ type AccountResult struct {

// AccountSubscribe subscribes to an account to receive notifications
// when the lamports or data for a given account public key changes.
func (cl *Client) AccountSubscribe(
func (c *Client) AccountSubscribe(
account solana.PublicKey,
commitment rpc.CommitmentType,
) (*AccountSubscription, error) {
return cl.AccountSubscribeWithOpts(
return c.AccountSubscribeWithOpts(
account,
commitment,
"",
Expand All @@ -43,7 +43,7 @@ func (cl *Client) AccountSubscribe(

// AccountSubscribe subscribes to an account to receive notifications
// when the lamports or data for a given account public key changes.
func (cl *Client) AccountSubscribeWithOpts(
func (c *Client) AccountSubscribeWithOpts(
account solana.PublicKey,
commitment rpc.CommitmentType,
encoding solana.EncodingType,
Expand All @@ -60,7 +60,7 @@ func (cl *Client) AccountSubscribeWithOpts(
conf["encoding"] = encoding
}

genSub, err := cl.subscribe(
genSub, err := c.subscribe(
params,
conf,
"accountSubscribe",
Expand Down
4 changes: 2 additions & 2 deletions rpc/ws/blockSubscribe.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ type BlockSubscribeOpts struct {
// **This subscription is unstable and only available if the validator was started
// with the `--rpc-pubsub-enable-block-subscription` flag. The format of this
// subscription may change in the future**
func (cl *Client) BlockSubscribe(
func (c *Client) BlockSubscribe(
filter BlockSubscribeFilter,
opts *BlockSubscribeOpts,
) (*BlockSubscription, error) {
Expand Down Expand Up @@ -125,7 +125,7 @@ func (cl *Client) BlockSubscribe(
params = append(params, obj)
}
}
genSub, err := cl.subscribe(
genSub, err := c.subscribe(
params,
nil,
"blockSubscribe",
Expand Down
20 changes: 20 additions & 0 deletions rpc/ws/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,17 +172,37 @@ func (c *Client) handleMessage(message []byte) {
// when receiving message with id. the result will be a subscription number.
// that number will be associated to all future message destine to this request

// Check for an error in the message.
if errorCode, errMsg, ok := getJsonRpcError(message); ok {
fmt.Printf("Error received in websocket message: Code: %d, Message: %s\n", errorCode, errMsg)
return
}

// Handle message with ID: this is a subscription response.
requestID, ok := getUint64WithOk(message, "id")
if ok {
subID, _ := getUint64WithOk(message, "result")
c.handleNewSubscriptionMessage(requestID, subID)
return
}

// Handle message associated with a subscription ID.
subID, _ := getUint64WithOk(message, "params", "subscription")
c.handleSubscriptionMessage(subID, message)
}

// getJsonRpcError checks if the message contains a JSON-RPC error.
// Returns the error code, error message, and a boolean indicating if an error was present.
func getJsonRpcError(message []byte) (errorCode int64, errMsg string, ok bool) {
if val, dataType, _, err := jsonparser.Get(message, "error"); err == nil && dataType == jsonparser.Object {
code, _ := jsonparser.GetInt(val, "code")
msg, _ := jsonparser.GetString(val, "message")
return code, msg, true
}

return 0, "", false
}

func (c *Client) handleNewSubscriptionMessage(requestID, subID uint64) {
c.lock.Lock()
defer c.lock.Unlock()
Expand Down
12 changes: 6 additions & 6 deletions rpc/ws/logsSubscribe.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,25 +46,25 @@ const (
)

// LogsSubscribe subscribes to transaction logging.
func (cl *Client) LogsSubscribe(
func (c *Client) LogsSubscribe(
// Filter criteria for the logs to receive results by account type.
filter LogsSubscribeFilterType,
commitment rpc.CommitmentType, // (optional)
) (*LogSubscription, error) {
return cl.logsSubscribe(
return c.logsSubscribe(
filter,
commitment,
)
}

// LogsSubscribe subscribes to all transactions that mention the provided Pubkey.
func (cl *Client) LogsSubscribeMentions(
func (c *Client) LogsSubscribeMentions(
// Subscribe to all transactions that mention the provided Pubkey.
mentions solana.PublicKey,
// (optional)
commitment rpc.CommitmentType,
) (*LogSubscription, error) {
return cl.logsSubscribe(
return c.logsSubscribe(
rpc.M{
"mentions": []string{mentions.String()},
},
Expand All @@ -73,7 +73,7 @@ func (cl *Client) LogsSubscribeMentions(
}

// LogsSubscribe subscribes to transaction logging.
func (cl *Client) logsSubscribe(
func (c *Client) logsSubscribe(
filter interface{},
commitment rpc.CommitmentType,
) (*LogSubscription, error) {
Expand All @@ -84,7 +84,7 @@ func (cl *Client) logsSubscribe(
conf["commitment"] = commitment
}

genSub, err := cl.subscribe(
genSub, err := c.subscribe(
params,
conf,
"logsSubscribe",
Expand Down
8 changes: 4 additions & 4 deletions rpc/ws/programSubscribe.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,11 @@ type ProgramResult struct {

// ProgramSubscribe subscribes to a program to receive notifications
// when the lamports or data for a given account owned by the program changes.
func (cl *Client) ProgramSubscribe(
func (c *Client) ProgramSubscribe(
programID solana.PublicKey,
commitment rpc.CommitmentType,
) (*ProgramSubscription, error) {
return cl.ProgramSubscribeWithOpts(
return c.ProgramSubscribeWithOpts(
programID,
commitment,
"",
Expand All @@ -42,7 +42,7 @@ func (cl *Client) ProgramSubscribe(

// ProgramSubscribe subscribes to a program to receive notifications
// when the lamports or data for a given account owned by the program changes.
func (cl *Client) ProgramSubscribeWithOpts(
func (c *Client) ProgramSubscribeWithOpts(
programID solana.PublicKey,
commitment rpc.CommitmentType,
encoding solana.EncodingType,
Expand All @@ -63,7 +63,7 @@ func (cl *Client) ProgramSubscribeWithOpts(
conf["filters"] = filters
}

genSub, err := cl.subscribe(
genSub, err := c.subscribe(
params,
conf,
"programSubscribe",
Expand Down
4 changes: 2 additions & 2 deletions rpc/ws/rootSubscribe.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ type RootResult uint64

// SignatureSubscribe subscribes to receive notification
// anytime a new root is set by the validator.
func (cl *Client) RootSubscribe() (*RootSubscription, error) {
genSub, err := cl.subscribe(
func (c *Client) RootSubscribe() (*RootSubscription, error) {
genSub, err := c.subscribe(
nil,
nil,
"rootSubscribe",
Expand Down
4 changes: 2 additions & 2 deletions rpc/ws/signatureSubscribe.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ type SignatureResult struct {
// SignatureSubscribe subscribes to a transaction signature to receive
// notification when the transaction is confirmed On signatureNotification,
// the subscription is automatically cancelled
func (cl *Client) SignatureSubscribe(
func (c *Client) SignatureSubscribe(
signature solana.Signature, // Transaction Signature.
commitment rpc.CommitmentType, // (optional)
) (*SignatureSubscription, error) {
Expand All @@ -44,7 +44,7 @@ func (cl *Client) SignatureSubscribe(
conf["commitment"] = commitment
}

genSub, err := cl.subscribe(
genSub, err := c.subscribe(
params,
conf,
"signatureSubscribe",
Expand Down
4 changes: 2 additions & 2 deletions rpc/ws/slotSubscribe.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ type SlotResult struct {
}

// SlotSubscribe subscribes to receive notification anytime a slot is processed by the validator.
func (cl *Client) SlotSubscribe() (*SlotSubscription, error) {
genSub, err := cl.subscribe(
func (c *Client) SlotSubscribe() (*SlotSubscription, error) {
genSub, err := c.subscribe(
nil,
nil,
"slotSubscribe",
Expand Down
4 changes: 2 additions & 2 deletions rpc/ws/slotsUpdatesSubscribe.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,8 @@ const (
//
// This subscription is unstable; the format of this subscription
// may change in the future and it may not always be supported.
func (cl *Client) SlotsUpdatesSubscribe() (*SlotsUpdatesSubscription, error) {
genSub, err := cl.subscribe(
func (c *Client) SlotsUpdatesSubscribe() (*SlotsUpdatesSubscription, error) {
genSub, err := c.subscribe(
nil,
nil,
"slotsUpdatesSubscribe",
Expand Down
4 changes: 2 additions & 2 deletions rpc/ws/voteSubscribe.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ type VoteResult struct {
// This subscription is unstable and only available if the validator
// was started with the --rpc-pubsub-enable-vote-subscription flag.
// The format of this subscription may change in the future.
func (cl *Client) VoteSubscribe() (*VoteSubscription, error) {
genSub, err := cl.subscribe(
func (c *Client) VoteSubscribe() (*VoteSubscription, error) {
genSub, err := c.subscribe(
nil,
nil,
"voteSubscribe",
Expand Down

0 comments on commit 384322f

Please sign in to comment.