Skip to content

Commit

Permalink
Wait for the response of pipelineWork in background and return it to …
Browse files Browse the repository at this point in the history
…pool (#1436)
  • Loading branch information
panjf2000 committed Nov 17, 2022
1 parent c367454 commit 8f43443
Showing 1 changed file with 40 additions and 40 deletions.
80 changes: 40 additions & 40 deletions client.go
Expand Up @@ -2376,7 +2376,7 @@ func (c *pipelineConnClient) DoDeadline(req *Request, resp *Response, deadline t
req.Header.userAgent = append(req.Header.userAgent[:0], c.getClientName()...)
}

w := acquirePipelineWork(&c.workPool, timeout)
w := c.acquirePipelineWork(timeout)
w.respCopy.Header.disableNormalizing = c.DisableHeaderNamesNormalizing
w.req = &w.reqCopy
w.resp = &w.respCopy
Expand All @@ -2394,7 +2394,7 @@ func (c *pipelineConnClient) DoDeadline(req *Request, resp *Response, deadline t
select {
case c.chW <- w:
case <-w.t.C:
releasePipelineWork(&c.workPool, w)
c.releasePipelineWork(w)
return ErrTimeout
}
}
Expand All @@ -2408,14 +2408,48 @@ func (c *pipelineConnClient) DoDeadline(req *Request, resp *Response, deadline t
swapResponseBody(resp, &w.respCopy)
}
err = w.err
releasePipelineWork(&c.workPool, w)
c.releasePipelineWork(w)
case <-w.t.C:
err = ErrTimeout
}

return err
}

func (c *pipelineConnClient) acquirePipelineWork(timeout time.Duration) (w *pipelineWork) {
v := c.workPool.Get()
if v != nil {
w = v.(*pipelineWork)
} else {
w = &pipelineWork{
done: make(chan struct{}, 1),
}
}
if timeout > 0 {
if w.t == nil {
w.t = time.NewTimer(timeout)
} else {
w.t.Reset(timeout)
}
w.deadline = time.Now().Add(timeout)
} else {
w.deadline = zeroTime
}
return w
}

func (c *pipelineConnClient) releasePipelineWork(w *pipelineWork) {
if w.t != nil {
w.t.Stop()
}
w.reqCopy.Reset()
w.respCopy.Reset()
w.req = nil
w.resp = nil
w.err = nil
c.workPool.Put(w)
}

// Do performs the given http request and sets the corresponding response.
//
// Request must contain at least non-zero RequestURI with full url (including
Expand Down Expand Up @@ -2443,7 +2477,7 @@ func (c *pipelineConnClient) Do(req *Request, resp *Response) error {
req.Header.userAgent = append(req.Header.userAgent[:0], c.getClientName()...)
}

w := acquirePipelineWork(&c.workPool, 0)
w := c.acquirePipelineWork(0)
w.req = req
if resp != nil {
resp.Header.disableNormalizing = c.DisableHeaderNamesNormalizing
Expand All @@ -2466,7 +2500,7 @@ func (c *pipelineConnClient) Do(req *Request, resp *Response) error {
select {
case c.chW <- w:
default:
releasePipelineWork(&c.workPool, w)
c.releasePipelineWork(w)
return ErrPipelineOverflow
}
}
Expand All @@ -2475,7 +2509,7 @@ func (c *pipelineConnClient) Do(req *Request, resp *Response) error {
<-w.done
err := w.err

releasePipelineWork(&c.workPool, w)
c.releasePipelineWork(w)

return err
}
Expand Down Expand Up @@ -2852,37 +2886,3 @@ func (c *pipelineConnClient) getClientName() []byte {
}

var errPipelineConnStopped = errors.New("pipeline connection has been stopped")

func acquirePipelineWork(pool *sync.Pool, timeout time.Duration) (w *pipelineWork) {
v := pool.Get()
if v != nil {
w = v.(*pipelineWork)
} else {
w = &pipelineWork{
done: make(chan struct{}, 1),
}
}
if timeout > 0 {
if w.t == nil {
w.t = time.NewTimer(timeout)
} else {
w.t.Reset(timeout)
}
w.deadline = time.Now().Add(timeout)
} else {
w.deadline = zeroTime
}
return w
}

func releasePipelineWork(pool *sync.Pool, w *pipelineWork) {
if w.t != nil {
w.t.Stop()
}
w.reqCopy.Reset()
w.respCopy.Reset()
w.req = nil
w.resp = nil
w.err = nil
pool.Put(w)
}

0 comments on commit 8f43443

Please sign in to comment.