Browse Source

Merge pull request #82 from hashicorp/b-deadlines

Deadline updates impact blocked reads and writes
Alex Dadgar 4 years ago
parent
commit
aecfd211c9
2 changed files with 101 additions and 2 deletions
  1. 97 0
      session_test.go
  2. 4 2
      stream.go

+ 97 - 0
session_test.go

@@ -698,6 +698,50 @@ func TestReadDeadline(t *testing.T) {
 	}
 	}
 }
 }
 
 
+func TestReadDeadline_BlockedRead(t *testing.T) {
+	client, server := testClientServer()
+	defer client.Close()
+	defer server.Close()
+
+	stream, err := client.Open()
+	if err != nil {
+		t.Fatalf("err: %v", err)
+	}
+	defer stream.Close()
+
+	stream2, err := server.Accept()
+	if err != nil {
+		t.Fatalf("err: %v", err)
+	}
+	defer stream2.Close()
+
+	// Start a read that will block
+	errCh := make(chan error, 1)
+	go func() {
+		buf := make([]byte, 4)
+		_, err := stream.Read(buf)
+		errCh <- err
+		close(errCh)
+	}()
+
+	// Wait to ensure the read has started.
+	time.Sleep(5 * time.Millisecond)
+
+	// Update the read deadline
+	if err := stream.SetReadDeadline(time.Now().Add(5 * time.Millisecond)); err != nil {
+		t.Fatalf("err: %v", err)
+	}
+
+	select {
+	case <-time.After(100 * time.Millisecond):
+		t.Fatal("expected read timeout")
+	case err := <-errCh:
+		if err != ErrTimeout {
+			t.Fatalf("expected ErrTimeout; got %v", err)
+		}
+	}
+}
+
 func TestWriteDeadline(t *testing.T) {
 func TestWriteDeadline(t *testing.T) {
 	client, server := testClientServer()
 	client, server := testClientServer()
 	defer client.Close()
 	defer client.Close()
@@ -731,6 +775,59 @@ func TestWriteDeadline(t *testing.T) {
 	t.Fatalf("Expected timeout")
 	t.Fatalf("Expected timeout")
 }
 }
 
 
+func TestWriteDeadline_BlockedWrite(t *testing.T) {
+	client, server := testClientServer()
+	defer client.Close()
+	defer server.Close()
+
+	stream, err := client.Open()
+	if err != nil {
+		t.Fatalf("err: %v", err)
+	}
+	defer stream.Close()
+
+	stream2, err := server.Accept()
+	if err != nil {
+		t.Fatalf("err: %v", err)
+	}
+	defer stream2.Close()
+
+	// Start a goroutine making writes that will block
+	errCh := make(chan error, 1)
+	go func() {
+		buf := make([]byte, 512)
+		for i := 0; i < int(initialStreamWindow); i++ {
+			_, err := stream.Write(buf)
+			if err == nil {
+				continue
+			}
+
+			errCh <- err
+			close(errCh)
+			return
+		}
+
+		close(errCh)
+	}()
+
+	// Wait to ensure the write has started.
+	time.Sleep(5 * time.Millisecond)
+
+	// Update the write deadline
+	if err := stream.SetWriteDeadline(time.Now().Add(5 * time.Millisecond)); err != nil {
+		t.Fatalf("err: %v", err)
+	}
+
+	select {
+	case <-time.After(1 * time.Second):
+		t.Fatal("expected write timeout")
+	case err := <-errCh:
+		if err != ErrTimeout {
+			t.Fatalf("expected ErrTimeout; got %v", err)
+		}
+	}
+}
+
 func TestBacklogExceeded(t *testing.T) {
 func TestBacklogExceeded(t *testing.T) {
 	client, server := testClientServer()
 	client, server := testClientServer()
 	defer client.Close()
 	defer client.Close()

+ 4 - 2
stream.go

@@ -446,15 +446,17 @@ func (s *Stream) SetDeadline(t time.Time) error {
 	return nil
 	return nil
 }
 }
 
 
-// SetReadDeadline sets the deadline for future Read calls.
+// SetReadDeadline sets the deadline for blocked and future Read calls.
 func (s *Stream) SetReadDeadline(t time.Time) error {
 func (s *Stream) SetReadDeadline(t time.Time) error {
 	s.readDeadline.Store(t)
 	s.readDeadline.Store(t)
+	asyncNotify(s.recvNotifyCh)
 	return nil
 	return nil
 }
 }
 
 
-// SetWriteDeadline sets the deadline for future Write calls
+// SetWriteDeadline sets the deadline for blocked and future Write calls
 func (s *Stream) SetWriteDeadline(t time.Time) error {
 func (s *Stream) SetWriteDeadline(t time.Time) error {
 	s.writeDeadline.Store(t)
 	s.writeDeadline.Store(t)
+	asyncNotify(s.sendNotifyCh)
 	return nil
 	return nil
 }
 }