From b863648faaa3d487d3cbef0e61b5c348376fa410 Mon Sep 17 00:00:00 2001 From: Max Denushev Date: Fri, 12 Apr 2024 10:24:19 +0300 Subject: [PATCH 01/14] fix: propagate body stream error to close function (#1743) --- client.go | 4 ++-- http.go | 40 ++++++++++++++++++++++++---------------- 2 files changed, 26 insertions(+), 18 deletions(-) diff --git a/client.go b/client.go index b5493e927..1f12d4ace 100644 --- a/client.go +++ b/client.go @@ -2975,12 +2975,12 @@ func (t *transport) RoundTrip(hc *HostClient, req *Request, resp *Response) (ret closeConn := resetConnection || req.ConnectionClose() || resp.ConnectionClose() || isConnRST if customStreamBody && resp.bodyStream != nil { rbs := resp.bodyStream - resp.bodyStream = newCloseReader(rbs, func() error { + resp.bodyStream = newCloseReaderWithError(rbs, func(wErr error) error { hc.releaseReader(br) if r, ok := rbs.(*requestStream); ok { releaseRequestStream(r) } - if closeConn || resp.ConnectionClose() { + if closeConn || resp.ConnectionClose() || wErr != nil { hc.closeConn(cc) } else { hc.releaseConn(cc) diff --git a/http.go b/http.go index 1ae8c3760..312e7f51a 100644 --- a/http.go +++ b/http.go @@ -321,26 +321,31 @@ func (resp *Response) BodyStream() io.Reader { } func (resp *Response) CloseBodyStream() error { - return resp.closeBodyStream() + return resp.closeBodyStream(nil) +} + +type ReadCloserWithError interface { + io.Reader + CloseWithError(err error) error } type closeReader struct { io.Reader - closeFunc func() error + closeFunc func(err error) error } -func newCloseReader(r io.Reader, closeFunc func() error) io.ReadCloser { +func newCloseReaderWithError(r io.Reader, closeFunc func(err error) error) ReadCloserWithError { if r == nil { panic(`BUG: reader is nil`) } return &closeReader{Reader: r, closeFunc: closeFunc} } -func (c *closeReader) Close() error { +func (c *closeReader) CloseWithError(err error) error { if c.closeFunc == nil { return nil } - return c.closeFunc() + return c.closeFunc(err) } // BodyWriter returns writer for populating request body. @@ -394,7 +399,7 @@ func (resp *Response) Body() []byte { bodyBuf := resp.bodyBuffer() bodyBuf.Reset() _, err := copyZeroAlloc(bodyBuf, resp.bodyStream) - resp.closeBodyStream() //nolint:errcheck + resp.closeBodyStream(err) //nolint:errcheck if err != nil { bodyBuf.SetString(err.Error()) } @@ -618,7 +623,7 @@ func (req *Request) BodyWriteTo(w io.Writer) error { func (resp *Response) BodyWriteTo(w io.Writer) error { if resp.bodyStream != nil { _, err := copyZeroAlloc(w, resp.bodyStream) - resp.closeBodyStream() //nolint:errcheck + resp.closeBodyStream(err) //nolint:errcheck return err } _, err := w.Write(resp.bodyBytes()) @@ -629,13 +634,13 @@ func (resp *Response) BodyWriteTo(w io.Writer) error { // // It is safe re-using p after the function returns. func (resp *Response) AppendBody(p []byte) { - resp.closeBodyStream() //nolint:errcheck + resp.closeBodyStream(nil) //nolint:errcheck resp.bodyBuffer().Write(p) //nolint:errcheck } // AppendBodyString appends s to response body. func (resp *Response) AppendBodyString(s string) { - resp.closeBodyStream() //nolint:errcheck + resp.closeBodyStream(nil) //nolint:errcheck resp.bodyBuffer().WriteString(s) //nolint:errcheck } @@ -643,7 +648,7 @@ func (resp *Response) AppendBodyString(s string) { // // It is safe re-using body argument after the function returns. func (resp *Response) SetBody(body []byte) { - resp.closeBodyStream() //nolint:errcheck + resp.closeBodyStream(nil) //nolint:errcheck bodyBuf := resp.bodyBuffer() bodyBuf.Reset() bodyBuf.Write(body) //nolint:errcheck @@ -651,7 +656,7 @@ func (resp *Response) SetBody(body []byte) { // SetBodyString sets response body. func (resp *Response) SetBodyString(body string) { - resp.closeBodyStream() //nolint:errcheck + resp.closeBodyStream(nil) //nolint:errcheck bodyBuf := resp.bodyBuffer() bodyBuf.Reset() bodyBuf.WriteString(body) //nolint:errcheck @@ -660,7 +665,7 @@ func (resp *Response) SetBodyString(body string) { // ResetBody resets response body. func (resp *Response) ResetBody() { resp.bodyRaw = nil - resp.closeBodyStream() //nolint:errcheck + resp.closeBodyStream(nil) //nolint:errcheck if resp.body != nil { if resp.keepBodyBuffer { resp.body.Reset() @@ -700,7 +705,7 @@ func (resp *Response) ReleaseBody(size int) { return } if cap(resp.body.B) > size { - resp.closeBodyStream() //nolint:errcheck + resp.closeBodyStream(nil) //nolint:errcheck resp.body = nil } } @@ -734,7 +739,7 @@ func (resp *Response) SwapBody(body []byte) []byte { if resp.bodyStream != nil { bb.Reset() _, err := copyZeroAlloc(bb, resp.bodyStream) - resp.closeBodyStream() //nolint:errcheck + resp.closeBodyStream(err) //nolint:errcheck if err != nil { bb.Reset() bb.SetString(err.Error()) @@ -2061,7 +2066,7 @@ func (resp *Response) writeBodyStream(w *bufio.Writer, sendBody bool) (err error } } } - errc := resp.closeBodyStream() + errc := resp.closeBodyStream(err) if err == nil { err = errc } @@ -2083,7 +2088,7 @@ func (req *Request) closeBodyStream() error { return err } -func (resp *Response) closeBodyStream() error { +func (resp *Response) closeBodyStream(wErr error) error { if resp.bodyStream == nil { return nil } @@ -2091,6 +2096,9 @@ func (resp *Response) closeBodyStream() error { if bsc, ok := resp.bodyStream.(io.Closer); ok { err = bsc.Close() } + if bsc, ok := resp.bodyStream.(ReadCloserWithError); ok { + err = bsc.CloseWithError(wErr) + } if bsr, ok := resp.bodyStream.(*requestStream); ok { releaseRequestStream(bsr) } From e821d30ead81bc75cff017024c2955e5e38e7d30 Mon Sep 17 00:00:00 2001 From: Max Denushev Date: Mon, 22 Apr 2024 13:29:55 +0500 Subject: [PATCH 02/14] feat: add address in ErrDialTimeout --- tcpdialer.go | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/tcpdialer.go b/tcpdialer.go index e8430cb9c..f1e6384e3 100644 --- a/tcpdialer.go +++ b/tcpdialer.go @@ -3,6 +3,7 @@ package fasthttp import ( "context" "errors" + "fmt" "net" "strconv" "sync" @@ -302,7 +303,7 @@ func (d *TCPDialer) dial(addr string, dualStack bool, timeout time.Duration) (ne if err == nil { return conn, nil } - if err == ErrDialTimeout { + if errors.Is(err, ErrDialTimeout) { return nil, err } idx++ @@ -316,7 +317,7 @@ func (d *TCPDialer) tryDial( ) (net.Conn, error) { timeout := time.Until(deadline) if timeout <= 0 { - return nil, ErrDialTimeout + return nil, fmt.Errorf("error when dialing %s: %w", addr, ErrDialTimeout) } if concurrencyCh != nil { @@ -332,7 +333,7 @@ func (d *TCPDialer) tryDial( } ReleaseTimer(tc) if isTimeout { - return nil, ErrDialTimeout + return nil, fmt.Errorf("error when dialing %s: %w", addr, ErrDialTimeout) } } defer func() { <-concurrencyCh }() @@ -347,7 +348,7 @@ func (d *TCPDialer) tryDial( defer cancelCtx() conn, err := dialer.DialContext(ctx, network, addr) if err != nil && ctx.Err() == context.DeadlineExceeded { - return nil, ErrDialTimeout + return nil, fmt.Errorf("error when dialing %s: %w", addr, ErrDialTimeout) } return conn, err } From f57e2628e8777b88e0c5ddf8cefa78a14a560f41 Mon Sep 17 00:00:00 2001 From: Max Denushev Date: Mon, 22 Apr 2024 13:39:28 +0500 Subject: [PATCH 03/14] feat: add address in any `tryDial` error --- tcpdialer.go | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/tcpdialer.go b/tcpdialer.go index f1e6384e3..2f6c694fa 100644 --- a/tcpdialer.go +++ b/tcpdialer.go @@ -347,10 +347,13 @@ func (d *TCPDialer) tryDial( ctx, cancelCtx := context.WithDeadline(context.Background(), deadline) defer cancelCtx() conn, err := dialer.DialContext(ctx, network, addr) - if err != nil && ctx.Err() == context.DeadlineExceeded { - return nil, fmt.Errorf("error when dialing %s: %w", addr, ErrDialTimeout) + if err != nil { + if ctx.Err() == context.DeadlineExceeded { + return nil, fmt.Errorf("error when dialing %s: %w", addr, ErrDialTimeout) + } + return nil, fmt.Errorf("error when dialing %s: %w", addr, err) } - return conn, err + return conn, nil } // ErrDialTimeout is returned when TCP dialing is timed out. From 05e68b1b60154ebadbe81aa27f01736fb84f8743 Mon Sep 17 00:00:00 2001 From: Max Denushev Date: Mon, 22 Apr 2024 17:43:04 +0500 Subject: [PATCH 04/14] feat: use struct to wrap error with upstream info --- tcpdialer.go | 29 +++++++++++++++++++++++++---- 1 file changed, 25 insertions(+), 4 deletions(-) diff --git a/tcpdialer.go b/tcpdialer.go index 2f6c694fa..6d017e15e 100644 --- a/tcpdialer.go +++ b/tcpdialer.go @@ -317,7 +317,7 @@ func (d *TCPDialer) tryDial( ) (net.Conn, error) { timeout := time.Until(deadline) if timeout <= 0 { - return nil, fmt.Errorf("error when dialing %s: %w", addr, ErrDialTimeout) + return nil, wrapDialWithUpstream(ErrDialTimeout, addr) } if concurrencyCh != nil { @@ -333,7 +333,7 @@ func (d *TCPDialer) tryDial( } ReleaseTimer(tc) if isTimeout { - return nil, fmt.Errorf("error when dialing %s: %w", addr, ErrDialTimeout) + return nil, wrapDialWithUpstream(ErrDialTimeout, addr) } } defer func() { <-concurrencyCh }() @@ -349,9 +349,9 @@ func (d *TCPDialer) tryDial( conn, err := dialer.DialContext(ctx, network, addr) if err != nil { if ctx.Err() == context.DeadlineExceeded { - return nil, fmt.Errorf("error when dialing %s: %w", addr, ErrDialTimeout) + return nil, wrapDialWithUpstream(ErrDialTimeout, addr) } - return nil, fmt.Errorf("error when dialing %s: %w", addr, err) + return nil, wrapDialWithUpstream(err, addr) } return conn, nil } @@ -359,6 +359,27 @@ func (d *TCPDialer) tryDial( // ErrDialTimeout is returned when TCP dialing is timed out. var ErrDialTimeout = errors.New("dialing to the given TCP address timed out") +// ErrDialWithUpstream wraps dial error with upstream info +type ErrDialWithUpstream struct { + Upstream string + wrapErr error +} + +func (e *ErrDialWithUpstream) Error() string { + return fmt.Sprintf("error when dialing %s: %s", e.Upstream, e.wrapErr) +} + +func (e *ErrDialWithUpstream) Unwrap() error { + return e.wrapErr +} + +func wrapDialWithUpstream(err error, upstream string) error { + return &ErrDialWithUpstream{ + Upstream: upstream, + wrapErr: err, + } +} + // DefaultDialTimeout is timeout used by Dial and DialDualStack // for establishing TCP connections. const DefaultDialTimeout = 3 * time.Second From 3ab2f91b72b2bb1a7f5dd8e384c0aaa979d4fbe9 Mon Sep 17 00:00:00 2001 From: Max Denushev Date: Mon, 22 Apr 2024 17:50:15 +0500 Subject: [PATCH 05/14] fix: lint --- tcpdialer.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tcpdialer.go b/tcpdialer.go index 6d017e15e..0ab88cad6 100644 --- a/tcpdialer.go +++ b/tcpdialer.go @@ -359,7 +359,7 @@ func (d *TCPDialer) tryDial( // ErrDialTimeout is returned when TCP dialing is timed out. var ErrDialTimeout = errors.New("dialing to the given TCP address timed out") -// ErrDialWithUpstream wraps dial error with upstream info +// ErrDialWithUpstream wraps dial error with upstream info. type ErrDialWithUpstream struct { Upstream string wrapErr error From 2ba2e3471198d44481547c386fd3064b8daa89f4 Mon Sep 17 00:00:00 2001 From: Max Denushev Date: Mon, 22 Apr 2024 17:54:45 +0500 Subject: [PATCH 06/14] fix: wrapped Error() method --- tcpdialer.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tcpdialer.go b/tcpdialer.go index 0ab88cad6..5f6081502 100644 --- a/tcpdialer.go +++ b/tcpdialer.go @@ -366,7 +366,7 @@ type ErrDialWithUpstream struct { } func (e *ErrDialWithUpstream) Error() string { - return fmt.Sprintf("error when dialing %s: %s", e.Upstream, e.wrapErr) + return fmt.Sprintf("error when dialing %s: %s", e.Upstream, e.wrapErr.Error()) } func (e *ErrDialWithUpstream) Unwrap() error { From 05fbdc792849b8fbfc4cbbd8b79fa853a482f0a4 Mon Sep 17 00:00:00 2001 From: Max Denushev Date: Tue, 23 Apr 2024 09:53:40 +0500 Subject: [PATCH 07/14] docs: add example to ErrDialWithUpstream --- tcpdialer.go | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/tcpdialer.go b/tcpdialer.go index 5f6081502..f6212b5d8 100644 --- a/tcpdialer.go +++ b/tcpdialer.go @@ -360,6 +360,14 @@ func (d *TCPDialer) tryDial( var ErrDialTimeout = errors.New("dialing to the given TCP address timed out") // ErrDialWithUpstream wraps dial error with upstream info. +// +// Should use errors.As to get upstream information from error: +// +// var dialErr *fasthttp.ErrDialWithUpstream +// +// if errors.As(err, &dialErr) { +// upstream = dialErr.Upstream +// } type ErrDialWithUpstream struct { Upstream string wrapErr error From ba48784a965fb257c7dfc8cbac0b3fc94c54f770 Mon Sep 17 00:00:00 2001 From: Max Denushev Date: Mon, 22 Apr 2024 13:29:55 +0500 Subject: [PATCH 08/14] feat: add address in ErrDialTimeout --- tcpdialer.go | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/tcpdialer.go b/tcpdialer.go index e8430cb9c..f1e6384e3 100644 --- a/tcpdialer.go +++ b/tcpdialer.go @@ -3,6 +3,7 @@ package fasthttp import ( "context" "errors" + "fmt" "net" "strconv" "sync" @@ -302,7 +303,7 @@ func (d *TCPDialer) dial(addr string, dualStack bool, timeout time.Duration) (ne if err == nil { return conn, nil } - if err == ErrDialTimeout { + if errors.Is(err, ErrDialTimeout) { return nil, err } idx++ @@ -316,7 +317,7 @@ func (d *TCPDialer) tryDial( ) (net.Conn, error) { timeout := time.Until(deadline) if timeout <= 0 { - return nil, ErrDialTimeout + return nil, fmt.Errorf("error when dialing %s: %w", addr, ErrDialTimeout) } if concurrencyCh != nil { @@ -332,7 +333,7 @@ func (d *TCPDialer) tryDial( } ReleaseTimer(tc) if isTimeout { - return nil, ErrDialTimeout + return nil, fmt.Errorf("error when dialing %s: %w", addr, ErrDialTimeout) } } defer func() { <-concurrencyCh }() @@ -347,7 +348,7 @@ func (d *TCPDialer) tryDial( defer cancelCtx() conn, err := dialer.DialContext(ctx, network, addr) if err != nil && ctx.Err() == context.DeadlineExceeded { - return nil, ErrDialTimeout + return nil, fmt.Errorf("error when dialing %s: %w", addr, ErrDialTimeout) } return conn, err } From 0f421eb4a871af969aafefb53d7cf4aabceba704 Mon Sep 17 00:00:00 2001 From: Max Denushev Date: Mon, 22 Apr 2024 13:39:28 +0500 Subject: [PATCH 09/14] feat: add address in any `tryDial` error --- tcpdialer.go | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/tcpdialer.go b/tcpdialer.go index f1e6384e3..2f6c694fa 100644 --- a/tcpdialer.go +++ b/tcpdialer.go @@ -347,10 +347,13 @@ func (d *TCPDialer) tryDial( ctx, cancelCtx := context.WithDeadline(context.Background(), deadline) defer cancelCtx() conn, err := dialer.DialContext(ctx, network, addr) - if err != nil && ctx.Err() == context.DeadlineExceeded { - return nil, fmt.Errorf("error when dialing %s: %w", addr, ErrDialTimeout) + if err != nil { + if ctx.Err() == context.DeadlineExceeded { + return nil, fmt.Errorf("error when dialing %s: %w", addr, ErrDialTimeout) + } + return nil, fmt.Errorf("error when dialing %s: %w", addr, err) } - return conn, err + return conn, nil } // ErrDialTimeout is returned when TCP dialing is timed out. From b03bf567e45453603f86fc9175fd4828bc8c9239 Mon Sep 17 00:00:00 2001 From: Max Denushev Date: Mon, 22 Apr 2024 17:43:04 +0500 Subject: [PATCH 10/14] feat: use struct to wrap error with upstream info --- tcpdialer.go | 29 +++++++++++++++++++++++++---- 1 file changed, 25 insertions(+), 4 deletions(-) diff --git a/tcpdialer.go b/tcpdialer.go index 2f6c694fa..6d017e15e 100644 --- a/tcpdialer.go +++ b/tcpdialer.go @@ -317,7 +317,7 @@ func (d *TCPDialer) tryDial( ) (net.Conn, error) { timeout := time.Until(deadline) if timeout <= 0 { - return nil, fmt.Errorf("error when dialing %s: %w", addr, ErrDialTimeout) + return nil, wrapDialWithUpstream(ErrDialTimeout, addr) } if concurrencyCh != nil { @@ -333,7 +333,7 @@ func (d *TCPDialer) tryDial( } ReleaseTimer(tc) if isTimeout { - return nil, fmt.Errorf("error when dialing %s: %w", addr, ErrDialTimeout) + return nil, wrapDialWithUpstream(ErrDialTimeout, addr) } } defer func() { <-concurrencyCh }() @@ -349,9 +349,9 @@ func (d *TCPDialer) tryDial( conn, err := dialer.DialContext(ctx, network, addr) if err != nil { if ctx.Err() == context.DeadlineExceeded { - return nil, fmt.Errorf("error when dialing %s: %w", addr, ErrDialTimeout) + return nil, wrapDialWithUpstream(ErrDialTimeout, addr) } - return nil, fmt.Errorf("error when dialing %s: %w", addr, err) + return nil, wrapDialWithUpstream(err, addr) } return conn, nil } @@ -359,6 +359,27 @@ func (d *TCPDialer) tryDial( // ErrDialTimeout is returned when TCP dialing is timed out. var ErrDialTimeout = errors.New("dialing to the given TCP address timed out") +// ErrDialWithUpstream wraps dial error with upstream info +type ErrDialWithUpstream struct { + Upstream string + wrapErr error +} + +func (e *ErrDialWithUpstream) Error() string { + return fmt.Sprintf("error when dialing %s: %s", e.Upstream, e.wrapErr) +} + +func (e *ErrDialWithUpstream) Unwrap() error { + return e.wrapErr +} + +func wrapDialWithUpstream(err error, upstream string) error { + return &ErrDialWithUpstream{ + Upstream: upstream, + wrapErr: err, + } +} + // DefaultDialTimeout is timeout used by Dial and DialDualStack // for establishing TCP connections. const DefaultDialTimeout = 3 * time.Second From c186a1e6f20eae4a3c1ab5c4fa088d5a40a6a9be Mon Sep 17 00:00:00 2001 From: Max Denushev Date: Mon, 22 Apr 2024 17:50:15 +0500 Subject: [PATCH 11/14] fix: lint --- tcpdialer.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tcpdialer.go b/tcpdialer.go index 6d017e15e..0ab88cad6 100644 --- a/tcpdialer.go +++ b/tcpdialer.go @@ -359,7 +359,7 @@ func (d *TCPDialer) tryDial( // ErrDialTimeout is returned when TCP dialing is timed out. var ErrDialTimeout = errors.New("dialing to the given TCP address timed out") -// ErrDialWithUpstream wraps dial error with upstream info +// ErrDialWithUpstream wraps dial error with upstream info. type ErrDialWithUpstream struct { Upstream string wrapErr error From c0cfa3115570b7436be8ab4c0974cabd077d8674 Mon Sep 17 00:00:00 2001 From: Max Denushev Date: Mon, 22 Apr 2024 17:54:45 +0500 Subject: [PATCH 12/14] fix: wrapped Error() method --- tcpdialer.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tcpdialer.go b/tcpdialer.go index 0ab88cad6..5f6081502 100644 --- a/tcpdialer.go +++ b/tcpdialer.go @@ -366,7 +366,7 @@ type ErrDialWithUpstream struct { } func (e *ErrDialWithUpstream) Error() string { - return fmt.Sprintf("error when dialing %s: %s", e.Upstream, e.wrapErr) + return fmt.Sprintf("error when dialing %s: %s", e.Upstream, e.wrapErr.Error()) } func (e *ErrDialWithUpstream) Unwrap() error { From 18c9bbfff6e23ae099ed7ed70c7b3d11b0425ff0 Mon Sep 17 00:00:00 2001 From: Max Denushev Date: Tue, 23 Apr 2024 09:53:40 +0500 Subject: [PATCH 13/14] docs: add example to ErrDialWithUpstream --- tcpdialer.go | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/tcpdialer.go b/tcpdialer.go index 5f6081502..f6212b5d8 100644 --- a/tcpdialer.go +++ b/tcpdialer.go @@ -360,6 +360,14 @@ func (d *TCPDialer) tryDial( var ErrDialTimeout = errors.New("dialing to the given TCP address timed out") // ErrDialWithUpstream wraps dial error with upstream info. +// +// Should use errors.As to get upstream information from error: +// +// var dialErr *fasthttp.ErrDialWithUpstream +// +// if errors.As(err, &dialErr) { +// upstream = dialErr.Upstream +// } type ErrDialWithUpstream struct { Upstream string wrapErr error From e38d6f10b8e6ea753f985ccf992b846d6366ae31 Mon Sep 17 00:00:00 2001 From: Max Denushev Date: Thu, 2 May 2024 10:06:09 +0700 Subject: [PATCH 14/14] docs: fix example for ErrDialWithUpstream --- tcpdialer.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/tcpdialer.go b/tcpdialer.go index f6212b5d8..e5f06bd01 100644 --- a/tcpdialer.go +++ b/tcpdialer.go @@ -363,10 +363,12 @@ var ErrDialTimeout = errors.New("dialing to the given TCP address timed out") // // Should use errors.As to get upstream information from error: // -// var dialErr *fasthttp.ErrDialWithUpstream +// hc := fasthttp.HostClient{Addr: "foo.com,bar.com"} +// err := hc.Do(req, res) // +// var dialErr *fasthttp.ErrDialWithUpstream // if errors.As(err, &dialErr) { -// upstream = dialErr.Upstream +// upstream = dialErr.Upstream // 34.206.39.153:80 // } type ErrDialWithUpstream struct { Upstream string