Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support passing headers and timeout in jrpc calls #621

Merged
merged 8 commits into from Sep 7, 2022
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 2 additions & 4 deletions fhttp/http_client.go
Expand Up @@ -32,9 +32,9 @@ import (
"time"

"fortio.org/fortio/fnet"
"fortio.org/fortio/jrpc"
"fortio.org/fortio/log"
"fortio.org/fortio/stats"
"fortio.org/fortio/version"
"github.com/google/uuid"
)

Expand Down Expand Up @@ -151,8 +151,6 @@ func (h *HTTPOptions) URLSchemeCheck() {
}
}

var userAgent = "fortio.org/fortio-" + version.Short()

const (
retcodeOffset = len("HTTP/1.X ")
// HTTPReqTimeOutDefaultValue is the default timeout value. 3s.
Expand Down Expand Up @@ -199,7 +197,7 @@ func (h *HTTPOptions) ResetHeaders() {
// InitHeaders initialize and/or resets the default headers (ie just User-Agent).
func (h *HTTPOptions) InitHeaders() {
h.ResetHeaders()
h.extraHeaders.Add("User-Agent", userAgent)
h.extraHeaders.Set(jrpc.UserAgentHeader, jrpc.UserAgent)
// No other headers should be added here based on options content as this is called only once
// before command line option -H are parsed/set.
}
Expand Down
5 changes: 3 additions & 2 deletions fhttp/http_forwarder.go
Expand Up @@ -29,6 +29,7 @@ import (
"sync"

"fortio.org/fortio/fnet"
"fortio.org/fortio/jrpc"
"fortio.org/fortio/log"
)

Expand Down Expand Up @@ -97,9 +98,9 @@ func MakeSimpleRequest(url string, r *http.Request, copyAllHeaders bool) *http.R
// Copy only trace headers or all of them:
CopyHeaders(req, r, copyAllHeaders)
if copyAllHeaders {
req.Header.Add("X-Proxy-Agent", userAgent)
req.Header.Add("X-Proxy-Agent", jrpc.UserAgent)
} else {
req.Header.Add("User-Agent", userAgent)
req.Header.Set(jrpc.UserAgentHeader, jrpc.UserAgent)
}
return req
}
Expand Down
9 changes: 5 additions & 4 deletions fhttp/http_test.go
Expand Up @@ -30,6 +30,7 @@ import (
"unicode/utf8"

"fortio.org/fortio/fnet"
"fortio.org/fortio/jrpc"
"fortio.org/fortio/log"
"github.com/google/uuid"
)
Expand Down Expand Up @@ -889,18 +890,18 @@ func TestPayloadForFastClient(t *testing.T) {
"application/json",
[]byte("{\"test\" : \"test\"}"),
fmt.Sprintf("POST / HTTP/1.1\r\nHost: www.google.com\r\nContent-Length: 17\r\nContent-Type: "+
"application/json\r\nUser-Agent: %s\r\n\r\n{\"test\" : \"test\"}", userAgent),
"application/json\r\nUser-Agent: %s\r\n\r\n{\"test\" : \"test\"}", jrpc.UserAgent),
},
{
"application/xml",
[]byte("<test test=\"test\">"),
fmt.Sprintf("POST / HTTP/1.1\r\nHost: www.google.com\r\nContent-Length: 18\r\nContent-Type: "+
"application/xml\r\nUser-Agent: %s\r\n\r\n<test test=\"test\">", userAgent),
"application/xml\r\nUser-Agent: %s\r\n\r\n<test test=\"test\">", jrpc.UserAgent),
},
{
"",
nil,
fmt.Sprintf("GET / HTTP/1.1\r\nHost: www.google.com\r\nUser-Agent: %s\r\n\r\n", userAgent),
fmt.Sprintf("GET / HTTP/1.1\r\nHost: www.google.com\r\nUser-Agent: %s\r\n\r\n", jrpc.UserAgent),
},
}
for _, test := range tests {
Expand Down Expand Up @@ -1133,7 +1134,7 @@ func TestDebugHandlerSortedHeaders(t *testing.T) {
"Ccc: ccc\n"+
"User-Agent: %s\n"+
"Zzz: zzz\n\n"+
"body:\n\n\n", a.Port, userAgent)
"body:\n\n\n", a.Port, jrpc.UserAgent)
if body != expected {
t.Errorf("Get body: %s not as expected: %s", body, expected)
}
Expand Down
89 changes: 76 additions & 13 deletions jrpc/jrpcClient.go
Expand Up @@ -16,8 +16,8 @@
// using generics to serialize/deserialize any type.
package jrpc // import "fortio.org/fortio/jrpc"

// This package is a true self contained library, doesn't rely on our logger nor other packages in fortio/.
// Client side and common code.
// This package is a true self contained library, doesn't rely on our logger nor other packages
ldemailly marked this conversation as resolved.
Show resolved Hide resolved
// in fortio/ outside of version/ (which now also doesn't rely on logger or any other package).
import (
"bytes"
"context"
Expand All @@ -26,13 +26,27 @@ import (
"io"
"net/http"
"time"

"fortio.org/fortio/version"
)

// Client side and common code.

const (
UserAgentHeader = "User-Agent"
)

// Default timeout for Call.
var timeout = 60 * time.Second

// UserAgent is the User-Agent header used by client calls (also used in fhttp/).
var UserAgent = "fortio.org/fortio-" + version.Short()

// SetCallTimeout changes the timeout for further Call calls, returns
// the previous value (default in 60s).
// the previous value (default in 60s). Value is used when a timeout
ldemailly marked this conversation as resolved.
Show resolved Hide resolved
// isn't passed in the options. Note this is not thread safe,
// use Destination.Timeout for changing values outside of main/single
// thread.
func SetCallTimeout(t time.Duration) time.Duration {
previous := timeout
timeout = t
Expand All @@ -50,6 +64,13 @@ type FetchError struct {
Bytes []byte
}

// Destination is the URL and optional additional headers.
type Destination struct {
URL string
Headers *http.Header
Timeout time.Duration
}

func (fe *FetchError) Error() string {
return fmt.Sprintf("%s, code %d: %v (raw reply: %s)", fe.Message, fe.Code, fe.Err, DebugSummary(fe.Bytes, 256))
}
Expand All @@ -61,7 +82,7 @@ func (fe *FetchError) Unwrap() error {
// Call calls the url endpoint, POSTing a serialized as json optional payload
// (pass nil for a GET http request) and returns the result, deserializing
// json into type Q. T can be inferred so we declare Response Q first.
func Call[Q any, T any](url string, payload *T) (*Q, error) {
func Call[Q any, T any](url *Destination, payload *T) (*Q, error) {
var bytes []byte
var err error
if payload != nil {
Expand All @@ -73,23 +94,35 @@ func Call[Q any, T any](url string, payload *T) (*Q, error) {
return CallWithPayload[Q](url, bytes)
}

// CallURL is Call without any options/non default headers, timeout etc and just the URL.
func CallURL[Q any, T any](url string, payload *T) (*Q, error) {
return Call[Q](NewDestination(url), payload)
}

// CallNoPayload is for an API call without json payload.
func CallNoPayload[Q any](url string) (*Q, error) {
func CallNoPayload[Q any](url *Destination) (*Q, error) {
return CallWithPayload[Q](url, []byte{})
}

// CallNoPayloadURL short cut for CallNoPayload with url as a string (default Send()/Destination options).
func CallNoPayloadURL[Q any](url string) (*Q, error) {
return CallWithPayload[Q](NewDestination(url), []byte{})
}

// Serialize serializes the object as json.
func Serialize(obj interface{}) ([]byte, error) {
return json.Marshal(obj)
}

// Deserialize deserializes json as a new object of desired type.
func Deserialize[Q any](bytes []byte) (*Q, error) {
var result Q
err := json.Unmarshal(bytes, &result)
return &result, err // Will return zero object, not nil upon error
}

// CallWithPayload is for cases where the payload is already serialized (or empty).
func CallWithPayload[Q any](url string, bytes []byte) (*Q, error) {
func CallWithPayload[Q any](url *Destination, bytes []byte) (*Q, error) {
code, bytes, err := Send(url, bytes) // returns -1 on other errors
if err != nil {
return nil, err
Expand All @@ -110,24 +143,43 @@ func CallWithPayload[Q any](url string, bytes []byte) (*Q, error) {
return result, nil
}

// SetHeaderIfMissing utility function to not overwrite nor append to existing headers.
func SetHeaderIfMissing(headers http.Header, name, value string) {
if headers.Get(name) != "" {
return
}
headers.Set(name, value)
}

// Send fetches the result from url and sends optional payload as a POST, GET if missing.
// Returns the http code (if no other error before then, -1 if there are errors),
// the bytes from the reply and error if any.
func Send(url string, jsonPayload []byte) (int, []byte, error) {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
func Send(dest *Destination, jsonPayload []byte) (int, []byte, error) {
curTimeout := dest.Timeout
if curTimeout == 0 {
curTimeout = timeout
}
ctx, cancel := context.WithTimeout(context.Background(), curTimeout)
defer cancel()
var req *http.Request
var err error
var res []byte
if len(jsonPayload) > 0 {
req, err = http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewReader(jsonPayload))
req.Header.Set("Content-Type", "application/json; charset=utf-8")
req, err = http.NewRequestWithContext(ctx, http.MethodPost, dest.URL, bytes.NewReader(jsonPayload))
} else {
req, err = http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
req, err = http.NewRequestWithContext(ctx, http.MethodGet, dest.URL, nil)
}
if err != nil {
return -1, res, err
}
if dest.Headers != nil {
req.Header = dest.Headers.Clone()
}
if len(jsonPayload) > 0 {
SetHeaderIfMissing(req.Header, "Content-Type", "application/json; charset=utf-8")
}
SetHeaderIfMissing(req.Header, "Accept", "application/json")
ldemailly marked this conversation as resolved.
Show resolved Hide resolved
SetHeaderIfMissing(req.Header, UserAgentHeader, UserAgent)
ldemailly marked this conversation as resolved.
Show resolved Hide resolved
var resp *http.Response
resp, err = http.DefaultClient.Do(req)
if err != nil {
Expand All @@ -138,8 +190,19 @@ func Send(url string, jsonPayload []byte) (int, []byte, error) {
return resp.StatusCode, res, err
}

// Fetch is Send without a payload.
func Fetch(url string) (int, []byte, error) {
// NewDestination returns a Destination object set for the given url
// (and default/nil replacement headers and default global timeout).
func NewDestination(url string) *Destination {
return &Destination{URL: url}
}

// FetchURL is Send without a payload and no additional options (default timeout and headers).
func FetchURL(url string) (int, []byte, error) {
return Send(NewDestination(url), []byte{})
}

// Fetch is Send without a payload (so will be a GET request).
func Fetch(url *Destination) (int, []byte, error) {
return Send(url, []byte{})
}

Expand Down
20 changes: 19 additions & 1 deletion jrpc/jrpcServer.go
Expand Up @@ -14,18 +14,21 @@

package jrpc // import "fortio.org/fortio/jrpc"

// Server side additional code (compared to restClient.go).
// Server side additional code (compared to jrpcClient.go).
import (
"io"
"net/http"
)

// ServerReply is used to reply errors but can also be the base for Ok replies,
// see the unit tests `Response` and ../rapi for examples of use.
type ServerReply struct {
Error bool `json:"error,omitempty"` // Success if false/omitted, Error/Failure when true
Message string `json:"message,omitempty"`
Exception string `json:"exception,omitempty"`
}

// NewErrorReply creates a new error reply with the message and err error.
func NewErrorReply(message string, err error) *ServerReply {
res := ServerReply{Error: true, Message: message}
if err != nil {
Expand Down Expand Up @@ -53,26 +56,41 @@ func Reply[T any](w http.ResponseWriter, code int, data *T) error {
return err
}

// ReplyNoPayload is a short cut for Reply() with empty (nil) payload.
func ReplyNoPayload(w http.ResponseWriter, code int) error {
return Reply[int](w, code, nil)
}

// ReplyOk is a short cut for Reply() with http.StatusOK as the result code.
func ReplyOk[T any](w http.ResponseWriter, data *T) error {
return Reply(w, http.StatusOK, data)
}

// ReplyClientError is a short cut for Reply() with http.StatusBadRequest as the result code.
func ReplyClientError[T any](w http.ResponseWriter, data *T) error {
return Reply(w, http.StatusBadRequest, data)
}

// ReplyServerError is a short cut for Reply() with http.StatusServiceUnavailable as the result code.
func ReplyServerError[T any](w http.ResponseWriter, data *T) error {
return Reply(w, http.StatusServiceUnavailable, data)
}

// ReplyError is to send back a client error with exception details.
func ReplyError(w http.ResponseWriter, extraMsg string, err error) error {
return ReplyClientError(w, NewErrorReply(extraMsg, err))
}

// HandleCall deserializes the expected type from the request body.
// Sample usage code:
// ```
//
// req, err := jrpc.HandleCall[Request](w, r)
// if err != nil {
// _ = jrpc.ReplyError(w, "request error", err)
// }
//
// ```.
func HandleCall[Q any](w http.ResponseWriter, r *http.Request) (*Q, error) {
data, err := io.ReadAll(r.Body)
if err != nil {
Expand Down