|
@@ -377,10 +377,31 @@ func (s *Session) keepalive() {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+func (s *Session) bodyClone(buf []byte, p *sendReady) {
|
|
|
|
+ if buf == nil {
|
|
|
|
+ return // A nil body is ignored.
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ // In the event of session shutdown or connection write timeout,
|
|
|
|
+ // we need to prevent `send` from reading the body buffer after
|
|
|
|
+ // returning from this function since the caller may re-use the
|
|
|
|
+ // underlying array.
|
|
|
|
+ p.mu.Lock()
|
|
|
|
+ defer p.mu.Unlock()
|
|
|
|
+
|
|
|
|
+ if p.Body == nil {
|
|
|
|
+ return // Body was already copied in `send`.
|
|
|
|
+ }
|
|
|
|
+ newBody := make([]byte, len(buf))
|
|
|
|
+ copy(newBody, buf)
|
|
|
|
+ p.Body = newBody
|
|
|
|
+}
|
|
|
|
+
|
|
// waitForSendErr waits to send a header, checking for a potential shutdown
|
|
// waitForSendErr waits to send a header, checking for a potential shutdown
|
|
-func (s *Session) waitForSend(hdr header, body []byte) error {
|
|
|
|
|
|
+func (s *Session) waitForSend(hdr header, body []byte) (err error) {
|
|
errCh := make(chan error, 1)
|
|
errCh := make(chan error, 1)
|
|
- return s.waitForSendErr(hdr, body, errCh)
|
|
|
|
|
|
+ err = s.waitForSendErr(hdr, body, errCh)
|
|
|
|
+ return
|
|
}
|
|
}
|
|
|
|
|
|
// waitForSendErr waits to send a header with optional data, checking for a
|
|
// waitForSendErr waits to send a header with optional data, checking for a
|
|
@@ -398,7 +419,6 @@ func (s *Session) waitForSendErr(hdr header, body []byte, errCh chan error) erro
|
|
}
|
|
}
|
|
timerPool.Put(t)
|
|
timerPool.Put(t)
|
|
}()
|
|
}()
|
|
-
|
|
|
|
ready := &sendReady{Hdr: hdr, Body: body, Err: errCh}
|
|
ready := &sendReady{Hdr: hdr, Body: body, Err: errCh}
|
|
select {
|
|
select {
|
|
case s.sendCh <- ready:
|
|
case s.sendCh <- ready:
|
|
@@ -407,35 +427,14 @@ func (s *Session) waitForSendErr(hdr header, body []byte, errCh chan error) erro
|
|
case <-timer.C:
|
|
case <-timer.C:
|
|
return ErrConnectionWriteTimeout
|
|
return ErrConnectionWriteTimeout
|
|
}
|
|
}
|
|
-
|
|
|
|
- bodyCopy := func() {
|
|
|
|
- if body == nil {
|
|
|
|
- return // A nil body is ignored.
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- // In the event of session shutdown or connection write timeout,
|
|
|
|
- // we need to prevent `send` from reading the body buffer after
|
|
|
|
- // returning from this function since the caller may re-use the
|
|
|
|
- // underlying array.
|
|
|
|
- ready.mu.Lock()
|
|
|
|
- defer ready.mu.Unlock()
|
|
|
|
-
|
|
|
|
- if ready.Body == nil {
|
|
|
|
- return // Body was already copied in `send`.
|
|
|
|
- }
|
|
|
|
- newBody := make([]byte, len(body))
|
|
|
|
- copy(newBody, body)
|
|
|
|
- ready.Body = newBody
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
select {
|
|
select {
|
|
case err := <-errCh:
|
|
case err := <-errCh:
|
|
return err
|
|
return err
|
|
case <-s.shutdownCh:
|
|
case <-s.shutdownCh:
|
|
- bodyCopy()
|
|
|
|
|
|
+ s.bodyClone(body, ready)
|
|
return ErrSessionShutdown
|
|
return ErrSessionShutdown
|
|
case <-timer.C:
|
|
case <-timer.C:
|
|
- bodyCopy()
|
|
|
|
|
|
+ s.bodyClone(body, ready)
|
|
return ErrConnectionWriteTimeout
|
|
return ErrConnectionWriteTimeout
|
|
}
|
|
}
|
|
}
|
|
}
|