Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Disable default reply timeout #45

Merged
merged 2 commits into from Oct 3, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
5 changes: 3 additions & 2 deletions api/api.go
Expand Up @@ -86,8 +86,9 @@ type Channel interface {
// buffer is full, the notifications will not be delivered into it.
SubscribeNotification(notifChan chan Message, event Message) (SubscriptionCtx, error)

// SetReplyTimeout sets the timeout for replies from VPP. It represents the maximum time the API waits for a reply
// from VPP before returning an error.
// SetReplyTimeout sets the timeout for replies from VPP. It represents the maximum time the client waits for a reply
// from VPP before returning a timeout error. Setting the reply timeout to 0 disables it. The initial reply timeout is
//set to the value of core.DefaultReplyTimeout.
SetReplyTimeout(timeout time.Duration)

// CheckCompatibility checks the compatiblity for the given messages.
Expand Down
12 changes: 9 additions & 3 deletions core/channel.go
Expand Up @@ -259,6 +259,8 @@ func (sub *subscriptionCtx) Unsubscribe() error {
return fmt.Errorf("subscription for %q not found", sub.event.GetMessageName())
}

const maxInt64 = 1<<63 - 1

// receiveReplyInternal receives a reply from the reply channel into the provided msg structure.
func (ch *Channel) receiveReplyInternal(msg api.Message, expSeqNum uint16) (lastReplyReceived bool, err error) {
if msg == nil {
Expand All @@ -276,7 +278,11 @@ func (ch *Channel) receiveReplyInternal(msg api.Message, expSeqNum uint16) (last
}
}

timer := time.NewTimer(ch.replyTimeout)
timeout := ch.replyTimeout
if timeout <= 0 {
timeout = maxInt64
}
timer := time.NewTimer(timeout)
for {
select {
// blocks until a reply comes to ReplyChan or until timeout expires
Expand All @@ -295,8 +301,8 @@ func (ch *Channel) receiveReplyInternal(msg api.Message, expSeqNum uint16) (last
log.WithFields(logrus.Fields{
"expSeqNum": expSeqNum,
"channel": ch.id,
}).Debugf("timeout (%v) waiting for reply: %s", ch.replyTimeout, msg.GetMessageName())
err = fmt.Errorf("no reply received within the timeout period %s", ch.replyTimeout)
}).Debugf("timeout (%v) waiting for reply: %s", timeout, msg.GetMessageName())
err = fmt.Errorf("no reply received within the timeout period %s", timeout)
return false, err
}
}
Expand Down
2 changes: 1 addition & 1 deletion core/connection.go
Expand Up @@ -47,7 +47,7 @@ var (
HealthCheckProbeInterval = time.Second // default health check probe interval
HealthCheckReplyTimeout = time.Millisecond * 250 // timeout for reply to a health check probe
HealthCheckThreshold = 2 // number of failed health checks until the error is reported
DefaultReplyTimeout = time.Second // default timeout for replies from VPP
DefaultReplyTimeout = time.Duration(0) // default timeout for replies from VPP is disabled
)

// ConnectionState represents the current state of the connection to VPP.
Expand Down