Преглед на файлове

Merge pull request #80 from hashicorp/f-accept-ctx

Add ability to cancel Accept by passing a context
Evan Phoenix преди 2 години
родител
ревизия
574fd304fd
променени са 2 файла, в които са добавени 45 реда и са изтрити 0 реда
  1. 17 0
      session.go
  2. 28 0
      session_test.go

+ 17 - 0
session.go

@@ -3,6 +3,7 @@ package yamux
 import (
 	"bufio"
 	"bytes"
+	"context"
 	"fmt"
 	"io"
 	"io/ioutil"
@@ -250,6 +251,22 @@ func (s *Session) AcceptStream() (*Stream, error) {
 	}
 }
 
+// AcceptStream is used to block until the next available stream
+// is ready to be accepted.
+func (s *Session) AcceptStreamWithContext(ctx context.Context) (*Stream, error) {
+	select {
+	case <-ctx.Done():
+		return nil, ctx.Err()
+	case stream := <-s.acceptCh:
+		if err := stream.sendWindowUpdate(); err != nil {
+			return nil, err
+		}
+		return stream, nil
+	case <-s.shutdownCh:
+		return nil, s.shutdownErr
+	}
+}
+
 // Close is used to close the session and all streams.
 // Attempts to send a GoAway before closing the connection.
 func (s *Session) Close() error {

+ 28 - 0
session_test.go

@@ -2,6 +2,7 @@ package yamux
 
 import (
 	"bytes"
+	"context"
 	"fmt"
 	"io"
 	"io/ioutil"
@@ -1534,3 +1535,30 @@ func TestSession_ConnectionWriteTimeout(t *testing.T) {
 
 	wg.Wait()
 }
+
+func TestCancelAccept(t *testing.T) {
+	_, server := testClientServer()
+	defer server.Close()
+
+	ctx, cancel := context.WithCancel(context.Background())
+
+	var wg sync.WaitGroup
+
+	wg.Add(1)
+	go func() {
+		defer wg.Done()
+
+		stream, err := server.AcceptStreamWithContext(ctx)
+		if err != context.Canceled {
+			t.Fatalf("err: %v", err)
+		}
+
+		if stream != nil {
+			defer stream.Close()
+		}
+	}()
+
+	cancel()
+
+	wg.Wait()
+}