Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CHANGED] Service API adjustments #1166

Merged
merged 2 commits into from Dec 22, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
7 changes: 3 additions & 4 deletions micro/example_package_test.go
Expand Up @@ -34,16 +34,15 @@ func Example() {

// Service handler is a function which takes Service.Request as argument.
// req.Respond or req.Error should be used to respond to the request.
incrementHandler := func(req *Request) error {
val, err := strconv.Atoi(string(req.Data))
incrementHandler := func(req *Request) {
val, err := strconv.Atoi(string(req.Data()))
if err != nil {
req.Error("400", "request data should be a number", nil)
return nil
return
}

responseData := val + 1
req.Respond([]byte(strconv.Itoa(responseData)))
return nil
}

config := Config{
Expand Down
26 changes: 12 additions & 14 deletions micro/example_test.go
Expand Up @@ -28,9 +28,8 @@ func ExampleAddService() {
}
defer nc.Close()

echoHandler := func(req *Request) error {
req.Respond(req.Data)
return nil
echoHandler := func(req *Request) {
req.Respond(req.Data())
}

config := Config{
Expand Down Expand Up @@ -73,7 +72,7 @@ func ExampleService_Info() {
Name: "EchoService",
Endpoint: Endpoint{
Subject: "echo",
Handler: func(*Request) error { return nil },
Handler: func(*Request) {},
},
}

Expand Down Expand Up @@ -101,7 +100,7 @@ func ExampleService_Stats() {
Version: "0.1.0",
Endpoint: Endpoint{
Subject: "echo",
Handler: func(*Request) error { return nil },
Handler: func(*Request) {},
},
}

Expand All @@ -127,7 +126,7 @@ func ExampleService_Stop() {
Version: "0.1.0",
Endpoint: Endpoint{
Subject: "echo",
Handler: func(*Request) error { return nil },
Handler: func(*Request) {},
},
}

Expand Down Expand Up @@ -158,7 +157,7 @@ func ExampleService_Stopped() {
Version: "0.1.0",
Endpoint: Endpoint{
Subject: "echo",
Handler: func(*Request) error { return nil },
Handler: func(*Request) {},
},
}

Expand Down Expand Up @@ -187,7 +186,7 @@ func ExampleService_Reset() {
Version: "0.1.0",
Endpoint: Endpoint{
Subject: "echo",
Handler: func(*Request) error { return nil },
Handler: func(*Request) {},
},
}

Expand Down Expand Up @@ -220,14 +219,14 @@ func ExampleControlSubject() {

// Output:
// $SRV.PING
// $SRV.PING.COOLSERVICE
// $SRV.PING.COOLSERVICE.123
// $SRV.PING.CoolService
// $SRV.PING.CoolService.123
}

func ExampleRequest_Respond() {
handler := func(req *Request) {
// respond to the request
if err := req.Respond(req.Data); err != nil {
if err := req.Respond(req.Data()); err != nil {
log.Fatal(err)
}
}
Expand All @@ -254,13 +253,12 @@ func ExampleRequest_RespondJSON() {
}

func ExampleRequest_Error() {
handler := func(req *Request) error {
handler := func(req *Request) {
// respond with an error
// Error sets Nats-Service-Error and Nats-Service-Error-Code headers in the response
if err := req.Error("400", "bad request", []byte(`{"error": "value should be a number"}`)); err != nil {
return err
log.Fatal(err)
}
return nil
}

fmt.Printf("%T", handler)
Expand Down
83 changes: 68 additions & 15 deletions micro/request.go
Expand Up @@ -22,17 +22,19 @@ import (
)

type (
// Request represents service request available in the service handler.
// It exposes methods to respond to the request, as well as
// getting the request data and headers.
Request struct {
*nats.Msg
msg *nats.Msg
respondError error
}

// RequestHandler is a function used as a Handler for a service.
// It takes a request, which contains the data (payload and headers) of the request,
// as well as exposes methods to respond to the request.
//
// RequestHandler returns an error - if returned, the request will be accounted form in stats (in num_requests),
// and last_error will be set with the value.
RequestHandler func(*Request) error
RequestHandler func(*Request)

// Headers is a wrapper around [*nats.Header]
Headers nats.Header
)

var (
Expand All @@ -41,27 +43,37 @@ var (
ErrArgRequired = errors.New("argument required")
)

func (r *Request) Respond(response []byte) error {
if err := r.Msg.Respond(response); err != nil {
return fmt.Errorf("%w: %s", ErrRespond, err)
// RespondOpt is a
type RespondOpt func(*nats.Msg)

func (r *Request) Respond(response []byte, opts ...RespondOpt) error {
respMsg := &nats.Msg{
Data: response,
}
for _, opt := range opts {
opt(respMsg)
}

if err := r.msg.RespondMsg(respMsg); err != nil {
r.respondError = fmt.Errorf("%w: %s", ErrRespond, err)
return r.respondError
}

return nil
}

func (r *Request) RespondJSON(response interface{}) error {
func (r *Request) RespondJSON(response interface{}, opts ...RespondOpt) error {
resp, err := json.Marshal(response)
if err != nil {
return ErrMarshalResponse
}

return r.Respond(resp)
return r.Respond(resp, opts...)
}

// Error prepares and publishes error response from a handler.
// A response error should be set containing an error code and description.
// Optionally, data can be set as response payload.
func (r *Request) Error(code, description string, data []byte) error {
func (r *Request) Error(code, description string, data []byte, opts ...RespondOpt) error {
if code == "" {
return fmt.Errorf("%w: error code", ErrArgRequired)
}
Expand All @@ -74,6 +86,47 @@ func (r *Request) Error(code, description string, data []byte) error {
ErrorCodeHeader: []string{code},
},
}
for _, opt := range opts {
opt(response)
}

response.Data = data
return r.RespondMsg(response)
if err := r.msg.RespondMsg(response); err != nil {
r.respondError = err
return err
}
return nil
}

func WithHeaders(headers Headers) RespondOpt {
return func(m *nats.Msg) {
if m.Header == nil {
m.Header = nats.Header(headers)
return
}

for k, v := range headers {
m.Header[k] = v
}
}
}

func (r *Request) Data() []byte {
return r.msg.Data
}

func (r *Request) Headers() Headers {
return Headers(r.msg.Header)
}

// Get gets the first value associated with the given key.
// It is case-sensitive.
func (h Headers) Get(key string) string {
return nats.Header(h).Get(key)
}

// Values returns all values associated with the given key.
// It is case-sensitive.
func (h Headers) Values(key string) []string {
return nats.Header(h).Values(key)
}
44 changes: 24 additions & 20 deletions micro/service.go
Expand Up @@ -18,7 +18,6 @@ import (
"errors"
"fmt"
"regexp"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -252,7 +251,7 @@ func AddService(nc *nats.Conn, config Config) (Service, error) {
var err error

svc.reqSub, err = nc.QueueSubscribe(config.Endpoint.Subject, QG, func(m *nats.Msg) {
svc.reqHandler(&Request{Msg: m})
svc.reqHandler(&Request{msg: m})
})
if err != nil {
svc.asyncDispatcher.close()
Expand All @@ -261,48 +260,44 @@ func AddService(nc *nats.Conn, config Config) (Service, error) {

ping := Ping(svcIdentity)

infoHandler := func(req *Request) error {
infoHandler := func(req *Request) {
response, _ := json.Marshal(svc.Info())
if err := req.Respond(response); err != nil {
if err := req.Error("500", fmt.Sprintf("Error handling INFO request: %s", err), nil); err != nil && config.ErrorHandler != nil {
svc.asyncDispatcher.push(func() { config.ErrorHandler(svc, &NATSError{req.Subject, err.Error()}) })
svc.asyncDispatcher.push(func() { config.ErrorHandler(svc, &NATSError{req.msg.Subject, err.Error()}) })
}
}
return nil
}

pingHandler := func(req *Request) error {
pingHandler := func(req *Request) {
response, _ := json.Marshal(ping)
if err := req.Respond(response); err != nil {
if err := req.Error("500", fmt.Sprintf("Error handling PING request: %s", err), nil); err != nil && config.ErrorHandler != nil {
svc.asyncDispatcher.push(func() { config.ErrorHandler(svc, &NATSError{req.Subject, err.Error()}) })
svc.asyncDispatcher.push(func() { config.ErrorHandler(svc, &NATSError{req.msg.Subject, err.Error()}) })
}
}
return nil
}

statsHandler := func(req *Request) error {
statsHandler := func(req *Request) {
response, _ := json.Marshal(svc.Stats())
if err := req.Respond(response); err != nil {
if err := req.Error("500", fmt.Sprintf("Error handling STATS request: %s", err), nil); err != nil && config.ErrorHandler != nil {
svc.asyncDispatcher.push(func() { config.ErrorHandler(svc, &NATSError{req.Subject, err.Error()}) })
svc.asyncDispatcher.push(func() { config.ErrorHandler(svc, &NATSError{req.msg.Subject, err.Error()}) })
}
}
return nil
}

schema := SchemaResp{
ServiceIdentity: svcIdentity,
Schema: config.Schema,
}
schemaHandler := func(req *Request) error {
schemaHandler := func(req *Request) {
response, _ := json.Marshal(schema)
if err := req.Respond(response); err != nil {
if err := req.Error("500", fmt.Sprintf("Error handling SCHEMA request: %s", err), nil); err != nil && config.ErrorHandler != nil {
svc.asyncDispatcher.push(func() { config.ErrorHandler(svc, &NATSError{req.Subject, err.Error()}) })
svc.asyncDispatcher.push(func() { config.ErrorHandler(svc, &NATSError{req.msg.Subject, err.Error()}) })
}
}
return nil
}

if err := svc.verbHandlers(nc, InfoVerb, infoHandler); err != nil {
Expand Down Expand Up @@ -368,6 +363,8 @@ func (e *Endpoint) valid() error {
}

func (svc *service) setupAsyncCallbacks() {
svc.m.Lock()
defer svc.m.Unlock()
svc.natsHandlers.closed = svc.conn.ClosedHandler()
if svc.natsHandlers.closed != nil {
svc.conn.SetClosedHandler(func(c *nats.Conn) {
Expand All @@ -392,6 +389,10 @@ func (svc *service) setupAsyncCallbacks() {
Description: err.Error(),
})
}
svc.m.Lock()
svc.stats.NumErrors++
svc.stats.LastError = err.Error()
svc.m.Unlock()
svc.Stop()
svc.natsHandlers.asyncErr(c, s, err)
})
Expand All @@ -406,6 +407,10 @@ func (svc *service) setupAsyncCallbacks() {
Description: err.Error(),
})
}
svc.m.Lock()
svc.stats.NumErrors++
svc.stats.LastError = err.Error()
svc.m.Unlock()
svc.Stop()
})
}
Expand Down Expand Up @@ -448,7 +453,7 @@ func (s *service) addInternalHandler(nc *nats.Conn, verb Verb, kind, id, name st
}

s.verbSubs[name], err = nc.Subscribe(subj, func(msg *nats.Msg) {
handler(&Request{Msg: msg})
handler(&Request{msg: msg})
})
if err != nil {
s.Stop()
Expand All @@ -457,19 +462,19 @@ func (s *service) addInternalHandler(nc *nats.Conn, verb Verb, kind, id, name st
return nil
}

// reqHandler itself
// reqHandller invokes the service request handler and modifies service stats
func (s *service) reqHandler(req *Request) {
start := time.Now()
err := s.Endpoint.Handler(req)
s.Endpoint.Handler(req)
s.m.Lock()
s.stats.NumRequests++
s.stats.ProcessingTime += time.Since(start)
avgProcessingTime := s.stats.ProcessingTime.Nanoseconds() / int64(s.stats.NumRequests)
s.stats.AverageProcessingTime = time.Duration(avgProcessingTime)

if err != nil {
if req.respondError != nil {
s.stats.NumErrors++
s.stats.LastError = err.Error()
s.stats.LastError = req.respondError.Error()
}
s.m.Unlock()
}
Expand Down Expand Up @@ -577,7 +582,6 @@ func ControlSubject(verb Verb, name, id string) (string, error) {
if name == "" && id != "" {
return "", ErrServiceNameRequired
}
name = strings.ToUpper(name)
if name == "" && id == "" {
return fmt.Sprintf("%s.%s", APIPrefix, verbStr), nil
}
Expand Down