From bb187b83abe1d16240bc0af6cda054a46de14575 Mon Sep 17 00:00:00 2001 From: Andy Pan Date: Wed, 16 Nov 2022 14:31:17 +0800 Subject: [PATCH] Wait for the response of pipelineWork in background and return it to pool --- client.go | 80 +++++++++++++++++++++++++++---------------------------- 1 file changed, 40 insertions(+), 40 deletions(-) diff --git a/client.go b/client.go index e4450919fe..c058f36004 100644 --- a/client.go +++ b/client.go @@ -2376,7 +2376,7 @@ func (c *pipelineConnClient) DoDeadline(req *Request, resp *Response, deadline t req.Header.userAgent = append(req.Header.userAgent[:0], c.getClientName()...) } - w := acquirePipelineWork(&c.workPool, timeout) + w := c.acquirePipelineWork(timeout) w.respCopy.Header.disableNormalizing = c.DisableHeaderNamesNormalizing w.req = &w.reqCopy w.resp = &w.respCopy @@ -2394,7 +2394,7 @@ func (c *pipelineConnClient) DoDeadline(req *Request, resp *Response, deadline t select { case c.chW <- w: case <-w.t.C: - releasePipelineWork(&c.workPool, w) + c.releasePipelineWork(w) return ErrTimeout } } @@ -2408,7 +2408,7 @@ func (c *pipelineConnClient) DoDeadline(req *Request, resp *Response, deadline t swapResponseBody(resp, &w.respCopy) } err = w.err - releasePipelineWork(&c.workPool, w) + c.releasePipelineWork(w) case <-w.t.C: err = ErrTimeout } @@ -2416,6 +2416,40 @@ func (c *pipelineConnClient) DoDeadline(req *Request, resp *Response, deadline t return err } +func (c *pipelineConnClient) acquirePipelineWork(timeout time.Duration) (w *pipelineWork) { + v := c.workPool.Get() + if v != nil { + w = v.(*pipelineWork) + } else { + w = &pipelineWork{ + done: make(chan struct{}, 1), + } + } + if timeout > 0 { + if w.t == nil { + w.t = time.NewTimer(timeout) + } else { + w.t.Reset(timeout) + } + w.deadline = time.Now().Add(timeout) + } else { + w.deadline = zeroTime + } + return w +} + +func (c *pipelineConnClient) releasePipelineWork(w *pipelineWork) { + if w.t != nil { + w.t.Stop() + } + w.reqCopy.Reset() + w.respCopy.Reset() + w.req = nil + w.resp = nil + w.err = nil + c.workPool.Put(w) +} + // Do performs the given http request and sets the corresponding response. // // Request must contain at least non-zero RequestURI with full url (including @@ -2443,7 +2477,7 @@ func (c *pipelineConnClient) Do(req *Request, resp *Response) error { req.Header.userAgent = append(req.Header.userAgent[:0], c.getClientName()...) } - w := acquirePipelineWork(&c.workPool, 0) + w := c.acquirePipelineWork(0) w.req = req if resp != nil { resp.Header.disableNormalizing = c.DisableHeaderNamesNormalizing @@ -2466,7 +2500,7 @@ func (c *pipelineConnClient) Do(req *Request, resp *Response) error { select { case c.chW <- w: default: - releasePipelineWork(&c.workPool, w) + c.releasePipelineWork(w) return ErrPipelineOverflow } } @@ -2475,7 +2509,7 @@ func (c *pipelineConnClient) Do(req *Request, resp *Response) error { <-w.done err := w.err - releasePipelineWork(&c.workPool, w) + c.releasePipelineWork(w) return err } @@ -2852,37 +2886,3 @@ func (c *pipelineConnClient) getClientName() []byte { } var errPipelineConnStopped = errors.New("pipeline connection has been stopped") - -func acquirePipelineWork(pool *sync.Pool, timeout time.Duration) (w *pipelineWork) { - v := pool.Get() - if v != nil { - w = v.(*pipelineWork) - } else { - w = &pipelineWork{ - done: make(chan struct{}, 1), - } - } - if timeout > 0 { - if w.t == nil { - w.t = time.NewTimer(timeout) - } else { - w.t.Reset(timeout) - } - w.deadline = time.Now().Add(timeout) - } else { - w.deadline = zeroTime - } - return w -} - -func releasePipelineWork(pool *sync.Pool, w *pipelineWork) { - if w.t != nil { - w.t.Stop() - } - w.reqCopy.Reset() - w.respCopy.Reset() - w.req = nil - w.resp = nil - w.err = nil - pool.Put(w) -}