Explorar o código

Implement timeouts for OpenStream (#96)

Currently streams can be in the streamSYNSent state for an unbounded
amount of time when the remote is unable to reply and has not closed the
connection via FIN/RST.

This commit adds a deadline to receiving an ACK on calls to OpenStream.

The timeout is configurable per-session with StreamOpenTimeout, and can be
disabled by setting it to zero. When the timeout is triggered, an error 
is logged.

Note that the establishCh is buffered and we avoid closing the channel
because there are multiple call-paths that can invoke it: 
- Stream establishment
- Remote close (FIN/RST)
- Local close

Co-authored-by: Paul Banks <banks@banksco.de>
Freddy %!s(int64=3) %!d(string=hai) anos
pai
achega
26ff87cf94
Modificáronse 4 ficheiros con 72 adicións e 0 borrados
  1. 8 0
      mux.go
  2. 25 0
      session.go
  3. 33 0
      session_test.go
  4. 6 0
      stream.go

+ 8 - 0
mux.go

@@ -31,6 +31,13 @@ type Config struct {
 	// window size that we allow for a stream.
 	MaxStreamWindowSize uint32
 
+	// StreamOpenTimeout is the maximum amount of time that a stream will
+	// be allowed to remain in pending state while waiting for an ack from the peer.
+	// Once the timeout is reached the session will be gracefully closed.
+	// A zero value disables the StreamOpenTimeout allowing unbounded
+	// blocking on OpenStream calls.
+	StreamOpenTimeout time.Duration
+
 	// StreamCloseTimeout is the maximum time that a stream will allowed to
 	// be in a half-closed state when `Close` is called before forcibly
 	// closing the connection. Forcibly closed connections will empty the
@@ -56,6 +63,7 @@ func DefaultConfig() *Config {
 		ConnectionWriteTimeout: 10 * time.Second,
 		MaxStreamWindowSize:    initialStreamWindow,
 		StreamCloseTimeout:     5 * time.Minute,
+		StreamOpenTimeout:      75 * time.Second,
 		LogOutput:              os.Stderr,
 	}
 }

+ 25 - 0
session.go

@@ -184,6 +184,10 @@ GET_ID:
 	s.inflight[id] = struct{}{}
 	s.streamLock.Unlock()
 
+	if s.config.StreamOpenTimeout > 0 {
+		go s.setOpenTimeout(stream)
+	}
+
 	// Send the window update to create
 	if err := stream.sendWindowUpdate(); err != nil {
 		select {
@@ -196,6 +200,27 @@ GET_ID:
 	return stream, nil
 }
 
+// setOpenTimeout implements a timeout for streams that are opened but not established.
+// If the StreamOpenTimeout is exceeded we assume the peer is unable to ACK,
+// and close the session.
+// The number of running timers is bounded by the capacity of the synCh.
+func (s *Session) setOpenTimeout(stream *Stream) {
+	timer := time.NewTimer(s.config.StreamOpenTimeout)
+	defer timer.Stop()
+
+	select {
+	case <-stream.establishCh:
+		return
+	case <-s.shutdownCh:
+		return
+	case <-timer.C:
+		// Timeout reached while waiting for ACK.
+		// Close the session to force connection re-establishment.
+		s.logger.Printf("[ERR] yamux: aborted stream open (destination=%s): %v", s.RemoteAddr().String(), ErrTimeout.err)
+		s.Close()
+	}
+}
+
 // Accept is used to block until the next available stream
 // is ready to be accepted.
 func (s *Session) Accept() (net.Conn, error) {

+ 33 - 0
session_test.go

@@ -274,6 +274,39 @@ func TestAccept(t *testing.T) {
 	}
 }
 
+func TestOpenStreamTimeout(t *testing.T) {
+	const timeout = 25 * time.Millisecond
+
+	cfg := testConf()
+	cfg.StreamOpenTimeout = timeout
+
+	client, server := testClientServerConfig(cfg)
+	defer client.Close()
+	defer server.Close()
+
+	clientLogs := captureLogs(client)
+
+	// Open a single stream without a server to acknowledge it.
+	s, err := client.OpenStream()
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	// Sleep for longer than the stream open timeout.
+	// Since no ACKs are received, the stream and session should be closed.
+	time.Sleep(timeout * 5)
+
+	if !clientLogs.match([]string{"[ERR] yamux: aborted stream open (destination=yamux:remote): i/o deadline reached"}) {
+		t.Fatalf("server log incorect: %v", clientLogs.logs())
+	}
+	if s.state != streamClosed {
+		t.Fatalf("stream should have been closed")
+	}
+	if !client.IsClosed() {
+		t.Fatalf("session should have been closed")
+	}
+}
+
 func TestClose_closeTimeout(t *testing.T) {
 	conf := testConf()
 	conf.StreamCloseTimeout = 10 * time.Millisecond

+ 6 - 0
stream.go

@@ -50,6 +50,9 @@ type Stream struct {
 	readDeadline  atomic.Value // time.Time
 	writeDeadline atomic.Value // time.Time
 
+	// establishCh is notified if the stream is established or being closed.
+	establishCh chan struct{}
+
 	// closeTimer is set with stateLock held to honor the StreamCloseTimeout
 	// setting on Session.
 	closeTimer *time.Timer
@@ -70,6 +73,7 @@ func newStream(session *Session, id uint32, state streamState) *Stream {
 		sendWindow:   initialStreamWindow,
 		recvNotifyCh: make(chan struct{}, 1),
 		sendNotifyCh: make(chan struct{}, 1),
+		establishCh:  make(chan struct{}, 1),
 	}
 	s.readDeadline.Store(time.Time{})
 	s.writeDeadline.Store(time.Time{})
@@ -393,6 +397,7 @@ func (s *Stream) processFlags(flags uint16) error {
 		if s.state == streamSYNSent {
 			s.state = streamEstablished
 		}
+		asyncNotify(s.establishCh)
 		s.session.establishStream(s.id)
 	}
 	if flags&flagFIN == flagFIN {
@@ -425,6 +430,7 @@ func (s *Stream) processFlags(flags uint16) error {
 func (s *Stream) notifyWaiting() {
 	asyncNotify(s.recvNotifyCh)
 	asyncNotify(s.sendNotifyCh)
+	asyncNotify(s.establishCh)
 }
 
 // incrSendWindow updates the size of our send window