瀏覽代碼

Fixing potential data races

Armon Dadgar 11 年之前
父節點
當前提交
036941d2aa
共有 3 個文件被更改,包括 15 次插入5 次删除
  1. 5 1
      session.go
  2. 5 0
      session_test.go
  3. 5 4
      stream.go

+ 5 - 1
session.go

@@ -185,7 +185,11 @@ func (s *Session) Close() error {
 // exitErr is used to handle an error that is causing the
 // session to terminate.
 func (s *Session) exitErr(err error) {
-	s.shutdownErr = err
+	s.shutdownLock.Lock()
+	if s.shutdownErr == nil {
+		s.shutdownErr = err
+	}
+	s.shutdownLock.Unlock()
 	s.Close()
 }
 

+ 5 - 0
session_test.go

@@ -600,9 +600,14 @@ func TestKeepAlive(t *testing.T) {
 	time.Sleep(200 * time.Millisecond)
 
 	// Ping value should increase
+	client.pingLock.Lock()
+	defer client.pingLock.Unlock()
 	if client.pingID == 0 {
 		t.Fatalf("should ping")
 	}
+
+	server.pingLock.Lock()
+	defer server.pingLock.Unlock()
 	if server.pingID == 0 {
 		t.Fatalf("should ping")
 	}

+ 5 - 4
stream.go

@@ -185,7 +185,8 @@ START:
 	s.stateLock.Unlock()
 
 	// If there is no data available, block
-	if atomic.LoadUint32(&s.sendWindow) == 0 {
+	window := atomic.LoadUint32(&s.sendWindow)
+	if window == 0 {
 		goto WAIT
 	}
 
@@ -193,7 +194,7 @@ START:
 	flags = s.sendFlags()
 
 	// Send up to our send window
-	max = min(s.sendWindow, uint32(len(b)))
+	max = min(window, uint32(len(b)))
 	body = bytes.NewReader(b[:max])
 
 	// Send the header
@@ -248,7 +249,7 @@ func (s *Stream) sendWindowUpdate() error {
 
 	// Determine the delta update
 	max := s.session.config.MaxStreamWindowSize
-	delta := max - s.recvWindow
+	delta := max - atomic.LoadUint32(&s.recvWindow)
 
 	// Determine the flags if any
 	flags := s.sendFlags()
@@ -265,7 +266,7 @@ func (s *Stream) sendWindowUpdate() error {
 	}
 
 	// Update our window
-	s.recvWindow += delta
+	atomic.AddUint32(&s.recvWindow, delta)
 	return nil
 }