From b28542726a3f4fdfccdc947f3ed2dbad785a5232 Mon Sep 17 00:00:00 2001 From: Rinat Baygildin Date: Fri, 21 Oct 2022 13:46:25 +0400 Subject: [PATCH] Fix endless Stream's reply waiting on vpp disconnect Draining of underlaying channel makes waiting endless. Now stream uses timeout to prevent it. Signed-off-by: Rinat Baygildin Change-Id: I9a48bf06a2073cb3e6efbeb530c267bd40ec086b #76 --- core/stream.go | 9 ++++++- core/stream_test.go | 63 +++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 71 insertions(+), 1 deletion(-) create mode 100644 core/stream_test.go diff --git a/core/stream.go b/core/stream.go index 86bb99e2..fe3d1f77 100644 --- a/core/stream.go +++ b/core/stream.go @@ -160,6 +160,11 @@ func (s *Stream) recvReply() (*vppReply, error) { if s.conn == nil { return nil, errors.New("stream closed") } + timeout := s.replyTimeout + if timeout <= 0 { + timeout = maxInt64 + } + timeoutTimer := time.NewTimer(timeout) select { case reply, ok := <-s.channel.replyChan: if !ok { @@ -172,7 +177,9 @@ func (s *Stream) recvReply() (*vppReply, error) { return nil, reply.err } return reply, nil - + case <-timeoutTimer.C: + err := fmt.Errorf("no reply received within the timeout period %s", timeout) + return nil, err case <-s.ctx.Done(): return nil, s.ctx.Err() } diff --git a/core/stream_test.go b/core/stream_test.go new file mode 100644 index 00000000..01b56997 --- /dev/null +++ b/core/stream_test.go @@ -0,0 +1,63 @@ +package core + +import ( + "context" + "testing" + "time" + + . "github.com/onsi/gomega" + "go.fd.io/govpp/adapter/mock" +) + +type streamCtx struct { + mockVpp *mock.VppAdapter + conn *Connection + stream *Stream +} + +func setupStreamTest(t *testing.T) *streamCtx { + RegisterTestingT(t) + + ctx := &streamCtx{ + mockVpp: mock.NewVppAdapter(), + } + + var err error + ctx.conn, err = Connect(ctx.mockVpp) + Expect(err).ShouldNot(HaveOccurred()) + + stream, err := ctx.conn.NewStream(context.TODO()) + Expect(err).ShouldNot(HaveOccurred()) + + ctx.stream = stream.(*Stream) + return ctx +} + +func (ctx *streamCtx) teardownTest() { + err := ctx.stream.Close() + Expect(err).ShouldNot(HaveOccurred()) + ctx.conn.Disconnect() +} + +func TestStreamReply(t *testing.T) { + ctx := setupStreamTest(t) + defer ctx.teardownTest() + + ctx.stream.replyTimeout = time.Millisecond + + // mock reply + ctx.mockVpp.MockReply(&ControlPingReply{}) + + // first one request should work + err := ctx.stream.SendMsg(&ControlPing{}) + Expect(err).ShouldNot(HaveOccurred()) + _, err = ctx.stream.RecvMsg() + Expect(err).ShouldNot(HaveOccurred()) + + // no other reply ready - expect timeout + err = ctx.stream.SendMsg(&ControlPing{}) + Expect(err).ShouldNot(HaveOccurred()) + _, err = ctx.stream.RecvMsg() + Expect(err).Should(HaveOccurred()) + Expect(err.Error()).To(HavePrefix("no reply received within the timeout period")) +}