|
@@ -64,7 +64,7 @@ type Session struct {
|
|
|
|
|
|
// sendCh is used to mark a stream as ready to send,
|
|
// sendCh is used to mark a stream as ready to send,
|
|
// or to send a header out directly.
|
|
// or to send a header out directly.
|
|
- sendCh chan sendReady
|
|
|
|
|
|
+ sendCh chan *sendReady
|
|
|
|
|
|
// recvDoneCh is closed when recv() exits to avoid a race
|
|
// recvDoneCh is closed when recv() exits to avoid a race
|
|
// between stream registration and stream shutdown
|
|
// between stream registration and stream shutdown
|
|
@@ -81,7 +81,7 @@ type Session struct {
|
|
// or to directly send a header
|
|
// or to directly send a header
|
|
type sendReady struct {
|
|
type sendReady struct {
|
|
Hdr []byte
|
|
Hdr []byte
|
|
- mu *sync.Mutex // Protects Body from unsafe reads.
|
|
|
|
|
|
+ mu sync.Mutex // Protects Body from unsafe reads.
|
|
Body []byte
|
|
Body []byte
|
|
Err chan error
|
|
Err chan error
|
|
}
|
|
}
|
|
@@ -103,7 +103,7 @@ func newSession(config *Config, conn io.ReadWriteCloser, client bool) *Session {
|
|
inflight: make(map[uint32]struct{}),
|
|
inflight: make(map[uint32]struct{}),
|
|
synCh: make(chan struct{}, config.AcceptBacklog),
|
|
synCh: make(chan struct{}, config.AcceptBacklog),
|
|
acceptCh: make(chan *Stream, config.AcceptBacklog),
|
|
acceptCh: make(chan *Stream, config.AcceptBacklog),
|
|
- sendCh: make(chan sendReady, 64),
|
|
|
|
|
|
+ sendCh: make(chan *sendReady, 64),
|
|
recvDoneCh: make(chan struct{}),
|
|
recvDoneCh: make(chan struct{}),
|
|
shutdownCh: make(chan struct{}),
|
|
shutdownCh: make(chan struct{}),
|
|
}
|
|
}
|
|
@@ -375,7 +375,7 @@ func (s *Session) waitForSendErr(hdr header, body []byte, errCh chan error) erro
|
|
timerPool.Put(t)
|
|
timerPool.Put(t)
|
|
}()
|
|
}()
|
|
|
|
|
|
- ready := sendReady{Hdr: hdr, mu: &sync.Mutex{}, Body: body, Err: errCh}
|
|
|
|
|
|
+ ready := &sendReady{Hdr: hdr, Body: body, Err: errCh}
|
|
select {
|
|
select {
|
|
case s.sendCh <- ready:
|
|
case s.sendCh <- ready:
|
|
case <-s.shutdownCh:
|
|
case <-s.shutdownCh:
|
|
@@ -433,7 +433,7 @@ func (s *Session) sendNoWait(hdr header) error {
|
|
}()
|
|
}()
|
|
|
|
|
|
select {
|
|
select {
|
|
- case s.sendCh <- sendReady{Hdr: hdr}:
|
|
|
|
|
|
+ case s.sendCh <- &sendReady{Hdr: hdr}:
|
|
return nil
|
|
return nil
|
|
case <-s.shutdownCh:
|
|
case <-s.shutdownCh:
|
|
return ErrSessionShutdown
|
|
return ErrSessionShutdown
|
|
@@ -461,24 +461,22 @@ func (s *Session) send() {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
- if ready.mu != nil {
|
|
|
|
- ready.mu.Lock()
|
|
|
|
- if ready.Body != nil {
|
|
|
|
- // Copy the body into the buffer to avoid
|
|
|
|
- // holding a mutex lock during the write.
|
|
|
|
- _, err := bodyBuf.Write(ready.Body)
|
|
|
|
- if err != nil {
|
|
|
|
- ready.Body = nil
|
|
|
|
- ready.mu.Unlock()
|
|
|
|
- s.logger.Printf("[ERR] yamux: Failed to copy body into buffer: %v", err)
|
|
|
|
- asyncSendErr(ready.Err, err)
|
|
|
|
- s.exitErr(err)
|
|
|
|
- return
|
|
|
|
- }
|
|
|
|
|
|
+ ready.mu.Lock()
|
|
|
|
+ if ready.Body != nil {
|
|
|
|
+ // Copy the body into the buffer to avoid
|
|
|
|
+ // holding a mutex lock during the write.
|
|
|
|
+ _, err := bodyBuf.Write(ready.Body)
|
|
|
|
+ if err != nil {
|
|
ready.Body = nil
|
|
ready.Body = nil
|
|
|
|
+ ready.mu.Unlock()
|
|
|
|
+ s.logger.Printf("[ERR] yamux: Failed to copy body into buffer: %v", err)
|
|
|
|
+ asyncSendErr(ready.Err, err)
|
|
|
|
+ s.exitErr(err)
|
|
|
|
+ return
|
|
}
|
|
}
|
|
- ready.mu.Unlock()
|
|
|
|
|
|
+ ready.Body = nil
|
|
}
|
|
}
|
|
|
|
+ ready.mu.Unlock()
|
|
|
|
|
|
if bodyBuf.Len() > 0 {
|
|
if bodyBuf.Len() > 0 {
|
|
// Send data from a body if given
|
|
// Send data from a body if given
|