Browse Source

Fixing shared buffers, send locking

Armon Dadgar 10 years ago
parent
commit
3919213693
1 changed files with 16 additions and 10 deletions
  1. 16 10
      stream.go

+ 16 - 10
stream.go

@@ -35,6 +35,9 @@ type Stream struct {
 	recvBuf  bytes.Buffer
 	recvLock sync.Mutex
 
+	controlHdr     header
+	controlHdrLock sync.Mutex
+
 	sendHdr  header
 	sendLock sync.Mutex
 
@@ -51,6 +54,7 @@ func newStream(session *Session, id uint32, state streamState) *Stream {
 		id:         id,
 		session:    session,
 		state:      state,
+		controlHdr: header(make([]byte, headerSize)),
 		sendHdr:    header(make([]byte, headerSize)),
 		recvWindow: initialStreamWindow,
 		sendWindow: initialStreamWindow,
@@ -115,6 +119,8 @@ WAIT:
 
 // Write is used to write to the stream
 func (s *Stream) Write(b []byte) (n int, err error) {
+	s.sendLock.Lock()
+	defer s.sendLock.Unlock()
 	total := 0
 	for total < len(b) {
 		n, err := s.write(b[total:])
@@ -143,12 +149,8 @@ START:
 	}
 	s.stateLock.Unlock()
 
-	// Lock the send
-	s.sendLock.Lock()
-
 	// If there is no data available, block
 	if atomic.LoadUint32(&s.sendWindow) == 0 {
-		s.sendLock.Unlock()
 		goto WAIT
 	}
 
@@ -162,13 +164,11 @@ START:
 	// Send the header
 	s.sendHdr.encode(typeData, flags, s.id, max)
 	if err := s.session.waitForSend(s.sendHdr, body); err != nil {
-		s.sendLock.Unlock()
 		return 0, err
 	}
 
 	// Reduce our send window
 	atomic.AddUint32(&s.sendWindow, ^uint32(max-1))
-	s.sendLock.Unlock()
 
 	// Unlock
 	return int(max), err
@@ -208,6 +208,9 @@ func (s *Stream) sendFlags() uint16 {
 // sendWindowUpdate potentially sends a window update enabling
 // further writes to take place. Must be invoked with the lock.
 func (s *Stream) sendWindowUpdate() error {
+	s.controlHdrLock.Lock()
+	defer s.controlHdrLock.Unlock()
+
 	// Determine the delta update
 	max := s.session.config.MaxStreamWindowSize
 	delta := max - s.recvWindow
@@ -221,8 +224,8 @@ func (s *Stream) sendWindowUpdate() error {
 	}
 
 	// Send the header
-	s.sendHdr.encode(typeWindowUpdate, flags, s.id, delta)
-	if err := s.session.waitForSend(s.sendHdr, nil); err != nil {
+	s.controlHdr.encode(typeWindowUpdate, flags, s.id, delta)
+	if err := s.session.waitForSend(s.controlHdr, nil); err != nil {
 		return err
 	}
 
@@ -233,10 +236,13 @@ func (s *Stream) sendWindowUpdate() error {
 
 // sendClose is used to send a FIN
 func (s *Stream) sendClose() error {
+	s.controlHdrLock.Lock()
+	defer s.controlHdrLock.Unlock()
+
 	flags := s.sendFlags()
 	flags |= flagFIN
-	s.sendHdr.encode(typeWindowUpdate, flags, s.id, 0)
-	if err := s.session.sendNoWait(s.sendHdr); err != nil {
+	s.controlHdr.encode(typeWindowUpdate, flags, s.id, 0)
+	if err := s.session.sendNoWait(s.controlHdr); err != nil {
 		return err
 	}
 	return nil