Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(client): Add option to make receive callback blocking #771

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
124 changes: 108 additions & 16 deletions test/integration/http/blocking_test.go
Expand Up @@ -8,11 +8,12 @@ package http
import (
"context"
"fmt"
"go.uber.org/atomic"
"sync"
"testing"
"time"

"go.uber.org/atomic"

"github.com/cloudevents/sdk-go/v2/client"

"github.com/google/uuid"
Expand All @@ -32,12 +33,19 @@ type BlockingSenderReceiverTest struct {
now time.Time
event *cloudevents.Event
receiverWait time.Duration
timeout time.Duration
want int
}

type BlockingSenderReceiverTestOutput struct {
duration time.Duration
got int
}

type BlockingSenderReceiverTestCases map[string]BlockingSenderReceiverTest

func TestBlockingSenderReceiver(t *testing.T) {
func TestNonBlockingSenderReceiver(t *testing.T) {
t.Parallel()
now := time.Now()

testCases := BlockingSenderReceiverTestCases{
Expand All @@ -54,6 +62,7 @@ func TestBlockingSenderReceiver(t *testing.T) {
},
receiverWait: 1 * time.Second,
want: 10,
timeout: 5 * time.Second,
},
"50 at 5 second": {
now: now,
Expand All @@ -68,6 +77,7 @@ func TestBlockingSenderReceiver(t *testing.T) {
},
receiverWait: 5 * time.Second,
want: 50,
timeout: 15 * time.Second,
},
"100 at 10 seconds": {
now: now,
Expand All @@ -82,6 +92,66 @@ func TestBlockingSenderReceiver(t *testing.T) {
},
receiverWait: 10 * time.Second,
want: 100,
timeout: 30 * time.Second,
},
}

for n, tc := range testCases {
t.Run(n, func(t *testing.T) {
ReceiverNonBlocking(t, tc)
})
}
}

func TestBlockingSenderReceiver(t *testing.T) {
t.Parallel()
now := time.Now()

testCases := BlockingSenderReceiverTestCases{
"10 at 100 milisecond": {
now: now,
event: &cloudevents.Event{
Context: cloudevents.EventContextV1{
Type: "unit.test.client.sent.10.1",
Source: *cloudevents.ParseURIRef("/unit/test/client"),
Subject: strptr("resource"),
DataContentType: cloudevents.StringOfApplicationJSON(),
}.AsV1(),
DataEncoded: toBytes(map[string]interface{}{"hello": "unittest"}),
},
receiverWait: 100 * time.Millisecond,
want: 10,
timeout: 5 * time.Second,
},
"50 at 20 milisecond": {
now: now,
event: &cloudevents.Event{
Context: cloudevents.EventContextV1{
Type: "unit.test.client.sent.50.5",
Source: *cloudevents.ParseURIRef("/unit/test/client"),
Subject: strptr("resource"),
DataContentType: cloudevents.StringOfApplicationJSON(),
}.AsV1(),
DataEncoded: toBytes(map[string]interface{}{"hello": "unittest"}),
},
receiverWait: 20 * time.Millisecond,
want: 50,
timeout: 5 * time.Second,
},
"100 at 10 milisecond": {
now: now,
event: &cloudevents.Event{
Context: cloudevents.EventContextV1{
Type: "unit.test.client.sent.100.10",
Source: *cloudevents.ParseURIRef("/unit/test/client"),
Subject: strptr("resource"),
DataContentType: cloudevents.StringOfApplicationJSON(),
}.AsV1(),
DataEncoded: toBytes(map[string]interface{}{"hello": "unittest"}),
},
receiverWait: 10 * time.Millisecond,
want: 100,
timeout: 5 * time.Second,
},
}

Expand All @@ -94,7 +164,37 @@ func TestBlockingSenderReceiver(t *testing.T) {

const verbose = false

func ReceiverBlocking(t *testing.T, tc BlockingSenderReceiverTest, copts ...client.Option) {
func ReceiverNonBlocking(t *testing.T, tc BlockingSenderReceiverTest) {
output := receive(t, tc, client.WithPollGoroutines(1))

if tc.want != output.got {
t.Errorf("expected %d, got %d", tc.want, output)
}

// Look at how long the test took.
dm := output.duration.Milliseconds()
tw := tc.receiverWait.Milliseconds() * 110 / 100 // 110% budget.
if dm > tw {
t.Errorf("expected test duration to be around ~%d ms, actual %d ms", tc.receiverWait.Milliseconds(), dm)
}
}

func ReceiverBlocking(t *testing.T, tc BlockingSenderReceiverTest) {
output := receive(t, tc, client.WithPollGoroutines(1), client.WithBlockingCallback())

if tc.want != output.got {
t.Errorf("expected %d, got %d", tc.want, output)
}

// Look at how long the test took.
dm := output.duration.Milliseconds()
tw := tc.receiverWait.Milliseconds() * int64(tc.want) // no concurrent processing
if dm < tw {
t.Errorf("expected test duration to be over %d ms, actual %d ms", tw, dm)
}
}

func receive(t *testing.T, tc BlockingSenderReceiverTest, copts ...client.Option) *BlockingSenderReceiverTestOutput {
opts := make([]cehttp.Option, 0)
opts = append(opts, cloudevents.WithPort(0)) // random port

Expand All @@ -113,7 +213,7 @@ func ReceiverBlocking(t *testing.T, tc BlockingSenderReceiverTest, copts ...clie
testID := uuid.New().String()
tc.event.SetExtension(unitTestIDKey, testID)

recvCtx, recvCancel := context.WithTimeout(context.Background(), tc.receiverWait*3)
recvCtx, recvCancel := context.WithTimeout(context.Background(), tc.timeout)
defer recvCancel()

wg := new(sync.WaitGroup)
Expand Down Expand Up @@ -141,7 +241,7 @@ func ReceiverBlocking(t *testing.T, tc BlockingSenderReceiverTest, copts ...clie

then := time.Now()

sendCtx, sendCancel := context.WithTimeout(context.Background(), tc.receiverWait*2)
sendCtx, sendCancel := context.WithTimeout(context.Background(), tc.timeout)
defer sendCancel()
sendCtx = cloudevents.ContextWithTarget(sendCtx, fmt.Sprintf("http://localhost:%d", protocol.GetListeningPort()))

Expand All @@ -159,16 +259,8 @@ func ReceiverBlocking(t *testing.T, tc BlockingSenderReceiverTest, copts ...clie

time.Sleep(tc.receiverWait) // cool off just in case we have some more sleepers.

if int32(tc.want) != got.Load() {
t.Errorf("expected %d, got %d", tc.want, got.Load())
}

// Look at how long the test took.

dm := duration.Milliseconds()
tw := tc.receiverWait.Milliseconds() * 110 / 100 // 110% budget.

if dm > tw {
t.Errorf("expected test duration to be ~%d ms, actual %d ms", tc.receiverWait.Milliseconds(), dm)
return &BlockingSenderReceiverTestOutput{
duration: duration,
got: int(got.Load()),
}
}
19 changes: 14 additions & 5 deletions v2/client/client.go
Expand Up @@ -97,6 +97,7 @@ type ceClient struct {
receiverMu sync.Mutex
eventDefaulterFns []EventDefaulter
pollGoroutines int
blockingCallback bool
}

func (c *ceClient) applyOptions(opts ...Option) error {
Expand Down Expand Up @@ -248,14 +249,22 @@ func (c *ceClient) StartReceiver(ctx context.Context, fn interface{}) error {
continue
}

// Do not block on the invoker.
wg.Add(1)
go func() {
callback := func() {
if err := c.invoker.Invoke(ctx, msg, respFn); err != nil {
cecontext.LoggerFrom(ctx).Warn("Error while handling a message: ", err)
}
wg.Done()
}()
}

if c.blockingCallback {
callback()
} else {
// Do not block on the invoker.
wg.Add(1)
go func() {
defer wg.Done()
callback()
}()
}
}
}()
}
Expand Down
13 changes: 13 additions & 0 deletions v2/client/options.go
Expand Up @@ -8,6 +8,7 @@ package client
import (
"context"
"fmt"

"github.com/cloudevents/sdk-go/v2/binding"
)

Expand Down Expand Up @@ -113,3 +114,15 @@ func WithInboundContextDecorator(dec func(context.Context, binding.Message) cont
return nil
}
}

// WithBlockingCallback makes the callback passed into StartReceiver is executed as a blocking call,
// i.e. in each poll go routine, the next event will not be received until the callback on current event completes.
// To make event processing serialized (no concurrency), use this option along with WithPollGoroutines(1)
func WithBlockingCallback() Option {
return func(i interface{}) error {
if c, ok := i.(*ceClient); ok {
c.blockingCallback = true
}
return nil
}
}