|
@@ -2,6 +2,7 @@ package yamux
|
|
|
|
|
|
import (
|
|
import (
|
|
"bytes"
|
|
"bytes"
|
|
|
|
+ "errors"
|
|
"io"
|
|
"io"
|
|
"sync"
|
|
"sync"
|
|
"sync/atomic"
|
|
"sync/atomic"
|
|
@@ -200,6 +201,10 @@ START:
|
|
// Send the header
|
|
// Send the header
|
|
s.sendHdr.encode(typeData, flags, s.id, max)
|
|
s.sendHdr.encode(typeData, flags, s.id, max)
|
|
if err = s.session.waitForSendErr(s.sendHdr, body, s.sendErr); err != nil {
|
|
if err = s.session.waitForSendErr(s.sendHdr, body, s.sendErr); err != nil {
|
|
|
|
+ if errors.Is(err, ErrSessionShutdown) || errors.Is(err, ErrConnectionWriteTimeout) {
|
|
|
|
+ // Message left in ready queue, header re-use is unsafe.
|
|
|
|
+ s.sendHdr = header(make([]byte, headerSize))
|
|
|
|
+ }
|
|
return 0, err
|
|
return 0, err
|
|
}
|
|
}
|
|
|
|
|
|
@@ -273,6 +278,10 @@ func (s *Stream) sendWindowUpdate() error {
|
|
// Send the header
|
|
// Send the header
|
|
s.controlHdr.encode(typeWindowUpdate, flags, s.id, delta)
|
|
s.controlHdr.encode(typeWindowUpdate, flags, s.id, delta)
|
|
if err := s.session.waitForSendErr(s.controlHdr, nil, s.controlErr); err != nil {
|
|
if err := s.session.waitForSendErr(s.controlHdr, nil, s.controlErr); err != nil {
|
|
|
|
+ if errors.Is(err, ErrSessionShutdown) || errors.Is(err, ErrConnectionWriteTimeout) {
|
|
|
|
+ // Message left in ready queue, header re-use is unsafe.
|
|
|
|
+ s.controlHdr = header(make([]byte, headerSize))
|
|
|
|
+ }
|
|
return err
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
return nil
|
|
@@ -287,6 +296,10 @@ func (s *Stream) sendClose() error {
|
|
flags |= flagFIN
|
|
flags |= flagFIN
|
|
s.controlHdr.encode(typeWindowUpdate, flags, s.id, 0)
|
|
s.controlHdr.encode(typeWindowUpdate, flags, s.id, 0)
|
|
if err := s.session.waitForSendErr(s.controlHdr, nil, s.controlErr); err != nil {
|
|
if err := s.session.waitForSendErr(s.controlHdr, nil, s.controlErr); err != nil {
|
|
|
|
+ if errors.Is(err, ErrSessionShutdown) || errors.Is(err, ErrConnectionWriteTimeout) {
|
|
|
|
+ // Message left in ready queue, header re-use is unsafe.
|
|
|
|
+ s.controlHdr = header(make([]byte, headerSize))
|
|
|
|
+ }
|
|
return err
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
return nil
|
|
@@ -362,8 +375,9 @@ func (s *Stream) closeTimeout() {
|
|
// Send a RST so the remote side closes too.
|
|
// Send a RST so the remote side closes too.
|
|
s.sendLock.Lock()
|
|
s.sendLock.Lock()
|
|
defer s.sendLock.Unlock()
|
|
defer s.sendLock.Unlock()
|
|
- s.sendHdr.encode(typeWindowUpdate, flagRST, s.id, 0)
|
|
|
|
- s.session.sendNoWait(s.sendHdr)
|
|
|
|
|
|
+ hdr := header(make([]byte, headerSize))
|
|
|
|
+ hdr.encode(typeWindowUpdate, flagRST, s.id, 0)
|
|
|
|
+ s.session.sendNoWait(hdr)
|
|
}
|
|
}
|
|
|
|
|
|
// forceClose is used for when the session is exiting
|
|
// forceClose is used for when the session is exiting
|