Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Disable default reply timeout #45

Merged
merged 2 commits into from Oct 3, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
5 changes: 3 additions & 2 deletions api/api.go
Expand Up @@ -86,8 +86,9 @@ type Channel interface {
// buffer is full, the notifications will not be delivered into it.
SubscribeNotification(notifChan chan Message, event Message) (SubscriptionCtx, error)

// SetReplyTimeout sets the timeout for replies from VPP. It represents the maximum time the API waits for a reply
// from VPP before returning an error.
// SetReplyTimeout sets the timeout for replies from VPP. It represents the maximum time the client waits for a reply
// from VPP before returning a timeout error. Setting the reply timeout to 0 disables it. The initial reply timeout is
//set to the value of core.DefaultReplyTimeout.
SetReplyTimeout(timeout time.Duration)

// CheckCompatibility checks the compatiblity for the given messages.
Expand Down
23 changes: 18 additions & 5 deletions core/channel.go
Expand Up @@ -259,6 +259,8 @@ func (sub *subscriptionCtx) Unsubscribe() error {
return fmt.Errorf("subscription for %q not found", sub.event.GetMessageName())
}

const maxInt64 = 1<<63 - 1

// receiveReplyInternal receives a reply from the reply channel into the provided msg structure.
func (ch *Channel) receiveReplyInternal(msg api.Message, expSeqNum uint16) (lastReplyReceived bool, err error) {
if msg == nil {
Expand All @@ -276,7 +278,13 @@ func (ch *Channel) receiveReplyInternal(msg api.Message, expSeqNum uint16) (last
}
}

timer := time.NewTimer(ch.replyTimeout)
slowReplyDur := WarnSlowReplyDuration
timeout := ch.replyTimeout
if timeout <= 0 {
timeout = maxInt64
}
timeoutTimer := time.NewTimer(timeout)
slowTimer := time.NewTimer(slowReplyDur)
for {
select {
// blocks until a reply comes to ReplyChan or until timeout expires
Expand All @@ -290,13 +298,18 @@ func (ch *Channel) receiveReplyInternal(msg api.Message, expSeqNum uint16) (last
continue
}
return lastReplyReceived, err

case <-timer.C:
case <-slowTimer.C:
log.WithFields(logrus.Fields{
"expSeqNum": expSeqNum,
"channel": ch.id,
}).Warnf("reply is taking too long (>%v): %v ", slowReplyDur, msg.GetMessageName())
continue
case <-timeoutTimer.C:
log.WithFields(logrus.Fields{
"expSeqNum": expSeqNum,
"channel": ch.id,
}).Debugf("timeout (%v) waiting for reply: %s", ch.replyTimeout, msg.GetMessageName())
err = fmt.Errorf("no reply received within the timeout period %s", ch.replyTimeout)
}).Debugf("timeout (%v) waiting for reply: %s", timeout, msg.GetMessageName())
err = fmt.Errorf("no reply received within the timeout period %s", timeout)
return false, err
}
}
Expand Down
9 changes: 6 additions & 3 deletions core/connection.go
Expand Up @@ -25,11 +25,10 @@ import (

logger "github.com/sirupsen/logrus"

"go.fd.io/govpp/core/genericpool"

"go.fd.io/govpp/adapter"
"go.fd.io/govpp/api"
"go.fd.io/govpp/codec"
"go.fd.io/govpp/core/genericpool"
)

const (
Expand All @@ -47,7 +46,11 @@ var (
HealthCheckProbeInterval = time.Second // default health check probe interval
HealthCheckReplyTimeout = time.Millisecond * 250 // timeout for reply to a health check probe
HealthCheckThreshold = 2 // number of failed health checks until the error is reported
DefaultReplyTimeout = time.Second // default timeout for replies from VPP
)

var (
DefaultReplyTimeout = time.Duration(0) // default timeout for replies from VPP is disabled
WarnSlowReplyDuration = time.Second * 1 // duration of slow replies after which a warning is printed
)

// ConnectionState represents the current state of the connection to VPP.
Expand Down