From decd01704a5e8b5d0cede3f11977c67257cf87f2 Mon Sep 17 00:00:00 2001 From: Dan <5727701+dan-j@users.noreply.github.com> Date: Wed, 6 Apr 2022 17:30:39 +0100 Subject: [PATCH] feat(gochan): implement SendCloser on SendReceiver protocol (#768) Signed-off-by: dan-j <5727701+dan-j@users.noreply.github.com> --- v2/protocol/gochan/protocol.go | 7 ++- v2/protocol/gochan/protocol_test.go | 73 +++++++++++++++++++++++++++-- 2 files changed, 76 insertions(+), 4 deletions(-) diff --git a/v2/protocol/gochan/protocol.go b/v2/protocol/gochan/protocol.go index 3d2926f71..847d5ce81 100644 --- a/v2/protocol/gochan/protocol.go +++ b/v2/protocol/gochan/protocol.go @@ -7,6 +7,7 @@ package gochan import ( "context" + "github.com/cloudevents/sdk-go/v2/binding" "github.com/cloudevents/sdk-go/v2/protocol" ) @@ -18,7 +19,7 @@ const ( // SendReceiver is a reference implementation for using the CloudEvents binding // integration. type SendReceiver struct { - sender protocol.Sender + sender protocol.SendCloser receiver protocol.Receiver } @@ -38,3 +39,7 @@ func (sr *SendReceiver) Send(ctx context.Context, in binding.Message, transforme func (sr *SendReceiver) Receive(ctx context.Context) (binding.Message, error) { return sr.receiver.Receive(ctx) } + +func (sr *SendReceiver) Close(ctx context.Context) error { + return sr.sender.Close(ctx) +} diff --git a/v2/protocol/gochan/protocol_test.go b/v2/protocol/gochan/protocol_test.go index ec7092565..8892073ca 100644 --- a/v2/protocol/gochan/protocol_test.go +++ b/v2/protocol/gochan/protocol_test.go @@ -7,13 +7,14 @@ package gochan import ( "context" + "io" + "testing" + "time" + "github.com/cloudevents/sdk-go/v2/binding" "github.com/cloudevents/sdk-go/v2/event" "github.com/google/go-cmp/cmp" "github.com/stretchr/testify/assert" - "io" - "testing" - "time" ) func TestNew(t *testing.T) { @@ -128,6 +129,72 @@ func TestSendReceive(t *testing.T) { } } +func TestSendCloser(t *testing.T) { + testCases := map[string]struct { + numSend int + numReceivePreClose int + numClose int // defaults to 1 + wantErr bool + }{ + "closes none pending": { + numSend: 1, + numReceivePreClose: 1, + }, + "closes still delivers pending": { + numSend: 2, + numReceivePreClose: 1, + }, + "errors on double close": { + numClose: 2, + wantErr: true, + }, + } + for n, tc := range testCases { + for _, p := range protocols(t) { + t.Run(n, func(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Minute) + defer cancel() + for i := 0; i < tc.numSend; i++ { + e := event.New() + if err := p.Send(ctx, binding.ToMessage(&e)); err != nil { + t.Fatalf("failed to send to protocol: %v", err) + } + } + + for i := 0; i < tc.numReceivePreClose; i++ { + _, err := p.Receive(ctx) + if err != nil { + t.Fatalf("failed to receive from protocol: %v", err) + } + } + + if tc.numClose == 0 { + tc.numClose = 1 + } + + var err error + for i := 0; i < tc.numClose; i++ { + err = p.Close(ctx) + } + if tc.wantErr != (err != nil) { + t.Fatalf("failed to close channel, wantErr = %v, got = %v", tc.wantErr, err) + } + + for i := 0; i < tc.numSend-tc.numReceivePreClose; i++ { + _, err := p.Receive(ctx) + if err != nil { + t.Fatalf("failed to receive from protocol: %v", err) + } + } + + if _, err = p.Receive(ctx); err != io.EOF { + t.Fatalf("expected protocol to be closed but got err = %v", err) + } + }) + } + } +} + func ReceiveTest(t *testing.T, p *SendReceiver, ctx context.Context, want binding.Message, wantErr string) { if ctx != nil { var done context.CancelFunc