Sfoglia il codice sorgente

Deadline updates impact blocked reads and writes

This change causes new deadlines to impact currently blocked reads and
writes. This is inline with the documentation of deadlines on a
net.Conn.
Alex Dadgar 4 anni fa
parent
commit
1651c274e4
2 ha cambiato i file con 101 aggiunte e 2 eliminazioni
  1. 97 0
      session_test.go
  2. 4 2
      stream.go

+ 97 - 0
session_test.go

@@ -698,6 +698,50 @@ func TestReadDeadline(t *testing.T) {
 	}
 }
 
+func TestReadDeadline_BlockedRead(t *testing.T) {
+	client, server := testClientServer()
+	defer client.Close()
+	defer server.Close()
+
+	stream, err := client.Open()
+	if err != nil {
+		t.Fatalf("err: %v", err)
+	}
+	defer stream.Close()
+
+	stream2, err := server.Accept()
+	if err != nil {
+		t.Fatalf("err: %v", err)
+	}
+	defer stream2.Close()
+
+	// Start a read that will block
+	errCh := make(chan error, 1)
+	go func() {
+		buf := make([]byte, 4)
+		_, err := stream.Read(buf)
+		errCh <- err
+		close(errCh)
+	}()
+
+	// Wait to ensure the read has started.
+	time.Sleep(5 * time.Millisecond)
+
+	// Update the read deadline
+	if err := stream.SetReadDeadline(time.Now().Add(5 * time.Millisecond)); err != nil {
+		t.Fatalf("err: %v", err)
+	}
+
+	select {
+	case <-time.After(100 * time.Millisecond):
+		t.Fatal("expected read timeout")
+	case err := <-errCh:
+		if err != ErrTimeout {
+			t.Fatalf("expected ErrTimeout; got %v", err)
+		}
+	}
+}
+
 func TestWriteDeadline(t *testing.T) {
 	client, server := testClientServer()
 	defer client.Close()
@@ -731,6 +775,59 @@ func TestWriteDeadline(t *testing.T) {
 	t.Fatalf("Expected timeout")
 }
 
+func TestWriteDeadline_BlockedWrite(t *testing.T) {
+	client, server := testClientServer()
+	defer client.Close()
+	defer server.Close()
+
+	stream, err := client.Open()
+	if err != nil {
+		t.Fatalf("err: %v", err)
+	}
+	defer stream.Close()
+
+	stream2, err := server.Accept()
+	if err != nil {
+		t.Fatalf("err: %v", err)
+	}
+	defer stream2.Close()
+
+	// Start a goroutine making writes that will block
+	errCh := make(chan error, 1)
+	go func() {
+		buf := make([]byte, 512)
+		for i := 0; i < int(initialStreamWindow); i++ {
+			_, err := stream.Write(buf)
+			if err == nil {
+				continue
+			}
+
+			errCh <- err
+			close(errCh)
+			return
+		}
+
+		close(errCh)
+	}()
+
+	// Wait to ensure the write has started.
+	time.Sleep(5 * time.Millisecond)
+
+	// Update the write deadline
+	if err := stream.SetWriteDeadline(time.Now().Add(5 * time.Millisecond)); err != nil {
+		t.Fatalf("err: %v", err)
+	}
+
+	select {
+	case <-time.After(1 * time.Second):
+		t.Fatal("expected write timeout")
+	case err := <-errCh:
+		if err != ErrTimeout {
+			t.Fatalf("expected ErrTimeout; got %v", err)
+		}
+	}
+}
+
 func TestBacklogExceeded(t *testing.T) {
 	client, server := testClientServer()
 	defer client.Close()

+ 4 - 2
stream.go

@@ -446,15 +446,17 @@ func (s *Stream) SetDeadline(t time.Time) error {
 	return nil
 }
 
-// SetReadDeadline sets the deadline for future Read calls.
+// SetReadDeadline sets the deadline for blocked and future Read calls.
 func (s *Stream) SetReadDeadline(t time.Time) error {
 	s.readDeadline.Store(t)
+	asyncNotify(s.recvNotifyCh)
 	return nil
 }
 
-// SetWriteDeadline sets the deadline for future Write calls
+// SetWriteDeadline sets the deadline for blocked and future Write calls
 func (s *Stream) SetWriteDeadline(t time.Time) error {
 	s.writeDeadline.Store(t)
+	asyncNotify(s.sendNotifyCh)
 	return nil
 }