Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ADDED] Service api improvements #1160

Merged
merged 14 commits into from Dec 20, 2022
4 changes: 1 addition & 3 deletions js.go
Expand Up @@ -1528,13 +1528,11 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync,
}

// Find the stream mapped to the subject if not bound to a stream already.
if o.stream == _EMPTY_ {
if stream == _EMPTY_ {
stream, err = js.StreamNameBySubject(subj)
if err != nil {
return nil, err
}
} else {
stream = o.stream
}

// With an explicit durable name, we can lookup the consumer first
Expand Down
84 changes: 84 additions & 0 deletions micro/example_package_test.go
@@ -0,0 +1,84 @@
// Copyright 2022 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package micro

import (
"fmt"
"log"
"strconv"
"time"

"github.com/nats-io/nats.go"
)

func Example() {
s := RunServerOnPort(-1)
defer s.Shutdown()

nc, err := nats.Connect(s.ClientURL())
if err != nil {
log.Fatal(err)
}
defer nc.Close()

// Service handler is a function which takes Service.Request as argument.
// req.Respond or req.Error should be used to respond to the request.
incrementHandler := func(req *Request) error {
val, err := strconv.Atoi(string(req.Data))
if err != nil {
req.Error("400", "request data should be a number", nil)
return nil
}

responseData := val + 1
req.Respond([]byte(strconv.Itoa(responseData)))
return nil
}

config := Config{
Name: "IncrementService",
Version: "0.1.0",
Description: "Increment numbers",
Endpoint: Endpoint{
// service handler
Handler: incrementHandler,
// a unique subject serving as a service endpoint
Subject: "numbers.increment",
},
}
// Multiple instances of the servcice with the same name can be created.
// Requests to a service with the same name will be load-balanced.
for i := 0; i < 5; i++ {
svc, err := AddService(nc, config)
if err != nil {
log.Fatal(err)
}
defer svc.Stop()
}

// send a request to a service
resp, err := nc.Request("numbers.increment", []byte("3"), 1*time.Second)
if err != nil {
log.Fatal(err)
}
responseVal, err := strconv.Atoi(string(resp.Data))
if err != nil {
log.Fatal(err)
}
fmt.Println(responseVal)

//
// Output: 4
//
}
266 changes: 266 additions & 0 deletions micro/example_test.go
@@ -0,0 +1,266 @@
// Copyright 2022 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package micro

import (
"fmt"
"log"

"github.com/nats-io/nats.go"
)

func ExampleAddService() {
nc, err := nats.Connect("127.0.0.1:4222")
if err != nil {
log.Fatal(err)
}
defer nc.Close()

echoHandler := func(req *Request) error {
req.Respond(req.Data)
return nil
}

config := Config{
Name: "EchoService",
Version: "v1.0.0",
Description: "Send back what you receive",
Endpoint: Endpoint{
Subject: "echo",
Handler: echoHandler,
},

// DoneHandler can be set to customize behavior on stopping a service.
DoneHandler: func(srv Service) {
info := srv.Info()
fmt.Printf("stopped service %q with ID %q\n", info.Name, info.ID)
},

// ErrorHandler can be used to customize behavior on service execution error.
ErrorHandler: func(srv Service, err *NATSError) {
info := srv.Info()
fmt.Printf("Service %q returned an error on subject %q: %s", info.Name, err.Subject, err.Description)
},
}

srv, err := AddService(nc, config)
if err != nil {
log.Fatal(err)
}
defer srv.Stop()
}

func ExampleService_Info() {
nc, err := nats.Connect("127.0.0.1:4222")
if err != nil {
log.Fatal(err)
}
defer nc.Close()

config := Config{
Name: "EchoService",
Endpoint: Endpoint{
Subject: "echo",
Handler: func(*Request) error { return nil },
},
}

srv, _ := AddService(nc, config)

// service info
info := srv.Info()

fmt.Println(info.ID)
fmt.Println(info.Name)
fmt.Println(info.Description)
fmt.Println(info.Version)
fmt.Println(info.Subject)
}

func ExampleService_Stats() {
nc, err := nats.Connect("127.0.0.1:4222")
if err != nil {
log.Fatal(err)
}
defer nc.Close()

config := Config{
Name: "EchoService",
Version: "0.1.0",
Endpoint: Endpoint{
Subject: "echo",
Handler: func(*Request) error { return nil },
},
}

srv, _ := AddService(nc, config)

// stats of a service instance
stats := srv.Stats()

fmt.Println(stats.AverageProcessingTime)
fmt.Println(stats.ProcessingTime)

}

func ExampleService_Stop() {
nc, err := nats.Connect("127.0.0.1:4222")
if err != nil {
log.Fatal(err)
}
defer nc.Close()

config := Config{
Name: "EchoService",
Version: "0.1.0",
Endpoint: Endpoint{
Subject: "echo",
Handler: func(*Request) error { return nil },
},
}

srv, _ := AddService(nc, config)

// stop a service
err = srv.Stop()
if err != nil {
log.Fatal(err)
}

// stop is idempotent so multiple executions will not return an error
err = srv.Stop()
if err != nil {
log.Fatal(err)
}
}

func ExampleService_Stopped() {
nc, err := nats.Connect("127.0.0.1:4222")
if err != nil {
log.Fatal(err)
}
defer nc.Close()

config := Config{
Name: "EchoService",
Version: "0.1.0",
Endpoint: Endpoint{
Subject: "echo",
Handler: func(*Request) error { return nil },
},
}

srv, _ := AddService(nc, config)

// stop a service
err = srv.Stop()
if err != nil {
log.Fatal(err)
}

if srv.Stopped() {
fmt.Println("service stopped")
}
}

func ExampleService_Reset() {
nc, err := nats.Connect("127.0.0.1:4222")
if err != nil {
log.Fatal(err)
}
defer nc.Close()

config := Config{
Name: "EchoService",
Version: "0.1.0",
Endpoint: Endpoint{
Subject: "echo",
Handler: func(*Request) error { return nil },
},
}

srv, _ := AddService(nc, config)

// reset endpoint stats on this service
srv.Reset()

empty := Stats{
ServiceIdentity: srv.Info().ServiceIdentity,
}
if srv.Stats() != empty {
log.Fatal("Expected endpoint stats to be empty")
}
}

func ExampleControlSubject() {

// subject used to get PING from all services
subjectPINGAll, _ := ControlSubject(PingVerb, "", "")
fmt.Println(subjectPINGAll)

// subject used to get PING from services with provided name
subjectPINGName, _ := ControlSubject(PingVerb, "CoolService", "")
fmt.Println(subjectPINGName)

// subject used to get PING from a service with provided name and ID
subjectPINGInstance, _ := ControlSubject(PingVerb, "CoolService", "123")
fmt.Println(subjectPINGInstance)

// Output:
// $SRV.PING
// $SRV.PING.COOLSERVICE
// $SRV.PING.COOLSERVICE.123
}

func ExampleRequest_Respond() {
handler := func(req *Request) {
// respond to the request
if err := req.Respond(req.Data); err != nil {
log.Fatal(err)
}
}

fmt.Printf("%T", handler)
}

func ExampleRequest_RespondJSON() {
type Point struct {
X int `json:"x"`
Y int `json:"y"`
}

handler := func(req *Request) {
resp := Point{5, 10}
// respond to the request
// response will be serialized to {"x":5,"y":10}
if err := req.RespondJSON(resp); err != nil {
log.Fatal(err)
}
}

fmt.Printf("%T", handler)
}

func ExampleRequest_Error() {
handler := func(req *Request) error {
// respond with an error
// Error sets Nats-Service-Error and Nats-Service-Error-Code headers in the response
if err := req.Error("400", "bad request", []byte(`{"error": "value should be a number"}`)); err != nil {
return err
}
return nil
}

fmt.Printf("%T", handler)
}