session_test.go 29 KB


  1. package yamux
  2. import (
  3. "bytes"
  4. "fmt"
  5. "io"
  6. "io/ioutil"
  7. "log"
  8. "net"
  9. "reflect"
  10. "runtime"
  11. "strings"
  12. "sync"
  13. "testing"
  14. "time"
  15. )
  16. type logCapture struct{ bytes.Buffer }
  17. func (l *logCapture) logs() []string {
  18. return strings.Split(strings.TrimSpace(l.String()), "\n")
  19. }
  20. func (l *logCapture) match(expect []string) bool {
  21. return reflect.DeepEqual(l.logs(), expect)
  22. }
  23. func captureLogs(s *Session) *logCapture {
  24. buf := new(logCapture)
  25. s.logger = log.New(buf, "", 0)
  26. return buf
  27. }
  28. type pipeConn struct {
  29. reader *io.PipeReader
  30. writer *io.PipeWriter
  31. writeBlocker sync.Mutex
  32. }
  33. func (p *pipeConn) Read(b []byte) (int, error) {
  34. return p.reader.Read(b)
  35. }
  36. func (p *pipeConn) Write(b []byte) (int, error) {
  37. p.writeBlocker.Lock()
  38. defer p.writeBlocker.Unlock()
  39. return p.writer.Write(b)
  40. }
  41. func (p *pipeConn) Close() error {
  42. p.reader.Close()
  43. return p.writer.Close()
  44. }
  45. func testConn() (io.ReadWriteCloser, io.ReadWriteCloser) {
  46. read1, write1 := io.Pipe()
  47. read2, write2 := io.Pipe()
  48. conn1 := &pipeConn{reader: read1, writer: write2}
  49. conn2 := &pipeConn{reader: read2, writer: write1}
  50. return conn1, conn2
  51. }
  52. func testConf() *Config {
  53. conf := DefaultConfig()
  54. conf.AcceptBacklog = 64
  55. conf.KeepAliveInterval = 100 * time.Millisecond
  56. conf.ConnectionWriteTimeout = 250 * time.Millisecond
  57. return conf
  58. }
  59. func testConfNoKeepAlive() *Config {
  60. conf := testConf()
  61. conf.EnableKeepAlive = false
  62. return conf
  63. }
  64. func testClientServer() (*Session, *Session) {
  65. return testClientServerConfig(testConf())
  66. }
  67. func testClientServerConfig(conf *Config) (*Session, *Session) {
  68. conn1, conn2 := testConn()
  69. client, _ := Client(conn1, conf)
  70. server, _ := Server(conn2, conf)
  71. return client, server
  72. }
  73. func TestPing(t *testing.T) {
  74. client, server := testClientServer()
  75. defer client.Close()
  76. defer server.Close()
  77. rtt, err := client.Ping()
  78. if err != nil {
  79. t.Fatalf("err: %v", err)
  80. }
  81. if rtt == 0 {
  82. t.Fatalf("bad: %v", rtt)
  83. }
  84. rtt, err = server.Ping()
  85. if err != nil {
  86. t.Fatalf("err: %v", err)
  87. }
  88. if rtt == 0 {
  89. t.Fatalf("bad: %v", rtt)
  90. }
  91. }
  92. func TestPing_Timeout(t *testing.T) {
  93. client, server := testClientServerConfig(testConfNoKeepAlive())
  94. defer client.Close()
  95. defer server.Close()
  96. // Prevent the client from responding
  97. clientConn := client.conn.(*pipeConn)
  98. clientConn.writeBlocker.Lock()
  99. errCh := make(chan error, 1)
  100. go func() {
  101. _, err := server.Ping() // Ping via the server session
  102. errCh <- err
  103. }()
  104. select {
  105. case err := <-errCh:
  106. if err != ErrTimeout {
  107. t.Fatalf("err: %v", err)
  108. }
  109. case <-time.After(client.config.ConnectionWriteTimeout * 2):
  110. t.Fatalf("failed to timeout within expected %v", client.config.ConnectionWriteTimeout)
  111. }
  112. // Verify that we recover, even if we gave up
  113. clientConn.writeBlocker.Unlock()
  114. go func() {
  115. _, err := server.Ping() // Ping via the server session
  116. errCh <- err
  117. }()
  118. select {
  119. case err := <-errCh:
  120. if err != nil {
  121. t.Fatalf("err: %v", err)
  122. }
  123. case <-time.After(client.config.ConnectionWriteTimeout):
  124. t.Fatalf("timeout")
  125. }
  126. }
  127. func TestCloseBeforeAck(t *testing.T) {
  128. cfg := testConf()
  129. cfg.AcceptBacklog = 8
  130. client, server := testClientServerConfig(cfg)
  131. defer client.Close()
  132. defer server.Close()
  133. for i := 0; i < 8; i++ {
  134. s, err := client.OpenStream()
  135. if err != nil {
  136. t.Fatal(err)
  137. }
  138. s.Close()
  139. }
  140. for i := 0; i < 8; i++ {
  141. s, err := server.AcceptStream()
  142. if err != nil {
  143. t.Fatal(err)
  144. }
  145. s.Close()
  146. }
  147. done := make(chan struct{})
  148. go func() {
  149. defer close(done)
  150. s, err := client.OpenStream()
  151. if err != nil {
  152. t.Fatal(err)
  153. }
  154. s.Close()
  155. }()
  156. select {
  157. case <-done:
  158. case <-time.After(time.Second * 5):
  159. t.Fatal("timed out trying to open stream")
  160. }
  161. }
  162. func TestAccept(t *testing.T) {
  163. client, server := testClientServer()
  164. defer client.Close()
  165. defer server.Close()
  166. if client.NumStreams() != 0 {
  167. t.Fatalf("bad")
  168. }
  169. if server.NumStreams() != 0 {
  170. t.Fatalf("bad")
  171. }
  172. wg := &sync.WaitGroup{}
  173. wg.Add(4)
  174. go func() {
  175. defer wg.Done()
  176. stream, err := server.AcceptStream()
  177. if err != nil {
  178. t.Fatalf("err: %v", err)
  179. }
  180. if id := stream.StreamID(); id != 1 {
  181. t.Fatalf("bad: %v", id)
  182. }
  183. if err := stream.Close(); err != nil {
  184. t.Fatalf("err: %v", err)
  185. }
  186. }()
  187. go func() {
  188. defer wg.Done()
  189. stream, err := client.AcceptStream()
  190. if err != nil {
  191. t.Fatalf("err: %v", err)
  192. }
  193. if id := stream.StreamID(); id != 2 {
  194. t.Fatalf("bad: %v", id)
  195. }
  196. if err := stream.Close(); err != nil {
  197. t.Fatalf("err: %v", err)
  198. }
  199. }()
  200. go func() {
  201. defer wg.Done()
  202. stream, err := server.OpenStream()
  203. if err != nil {
  204. t.Fatalf("err: %v", err)
  205. }
  206. if id := stream.StreamID(); id != 2 {
  207. t.Fatalf("bad: %v", id)
  208. }
  209. if err := stream.Close(); err != nil {
  210. t.Fatalf("err: %v", err)
  211. }
  212. }()
  213. go func() {
  214. defer wg.Done()
  215. stream, err := client.OpenStream()
  216. if err != nil {
  217. t.Fatalf("err: %v", err)
  218. }
  219. if id := stream.StreamID(); id != 1 {
  220. t.Fatalf("bad: %v", id)
  221. }
  222. if err := stream.Close(); err != nil {
  223. t.Fatalf("err: %v", err)
  224. }
  225. }()
  226. doneCh := make(chan struct{})
  227. go func() {
  228. wg.Wait()
  229. close(doneCh)
  230. }()
  231. select {
  232. case <-doneCh:
  233. case <-time.After(time.Second):
  234. panic("timeout")
  235. }
  236. }
  237. func TestOpenStreamTimeout(t *testing.T) {
  238. const timeout = 25 * time.Millisecond
  239. cfg := testConf()
  240. cfg.StreamOpenTimeout = timeout
  241. client, server := testClientServerConfig(cfg)
  242. defer client.Close()
  243. defer server.Close()
  244. clientLogs := captureLogs(client)
  245. // Open a single stream without a server to acknowledge it.
  246. s, err := client.OpenStream()
  247. if err != nil {
  248. t.Fatal(err)
  249. }
  250. // Sleep for longer than the stream open timeout.
  251. // Since no ACKs are received, the stream and session should be closed.
  252. time.Sleep(timeout * 5)
  253. if !clientLogs.match([]string{"[ERR] yamux: aborted stream open (destination=yamux:remote): i/o deadline reached"}) {
  254. t.Fatalf("server log incorect: %v", clientLogs.logs())
  255. }
  256. if s.state != streamClosed {
  257. t.Fatalf("stream should have been closed")
  258. }
  259. if !client.IsClosed() {
  260. t.Fatalf("session should have been closed")
  261. }
  262. }
  263. func TestClose_closeTimeout(t *testing.T) {
  264. conf := testConf()
  265. conf.StreamCloseTimeout = 10 * time.Millisecond
  266. client, server := testClientServerConfig(conf)
  267. defer client.Close()
  268. defer server.Close()
  269. if client.NumStreams() != 0 {
  270. t.Fatalf("bad")
  271. }
  272. if server.NumStreams() != 0 {
  273. t.Fatalf("bad")
  274. }
  275. wg := &sync.WaitGroup{}
  276. wg.Add(2)
  277. // Open a stream on the client but only close it on the server.
  278. // We want to see if the stream ever gets cleaned up on the client.
  279. var clientStream *Stream
  280. go func() {
  281. defer wg.Done()
  282. var err error
  283. clientStream, err = client.OpenStream()
  284. if err != nil {
  285. t.Fatalf("err: %v", err)
  286. }
  287. }()
  288. go func() {
  289. defer wg.Done()
  290. stream, err := server.AcceptStream()
  291. if err != nil {
  292. t.Fatalf("err: %v", err)
  293. }
  294. if err := stream.Close(); err != nil {
  295. t.Fatalf("err: %v", err)
  296. }
  297. }()
  298. doneCh := make(chan struct{})
  299. go func() {
  300. wg.Wait()
  301. close(doneCh)
  302. }()
  303. select {
  304. case <-doneCh:
  305. case <-time.After(time.Second):
  306. panic("timeout")
  307. }
  308. // We should have zero streams after our timeout period
  309. time.Sleep(100 * time.Millisecond)
  310. if v := server.NumStreams(); v > 0 {
  311. t.Fatalf("should have zero streams: %d", v)
  312. }
  313. if v := client.NumStreams(); v > 0 {
  314. t.Fatalf("should have zero streams: %d", v)
  315. }
  316. if _, err := clientStream.Write([]byte("hello")); err == nil {
  317. t.Fatal("should error on write")
  318. } else if err.Error() != "connection reset" {
  319. t.Fatalf("expected connection reset, got %q", err)
  320. }
  321. }
  322. func TestNonNilInterface(t *testing.T) {
  323. _, server := testClientServer()
  324. server.Close()
  325. conn, err := server.Accept()
  326. if err != nil && conn != nil {
  327. t.Error("bad: accept should return a connection of nil value")
  328. }
  329. conn, err = server.Open()
  330. if err != nil && conn != nil {
  331. t.Error("bad: open should return a connection of nil value")
  332. }
  333. }
  334. func TestSendData_Small(t *testing.T) {
  335. client, server := testClientServer()
  336. defer client.Close()
  337. defer server.Close()
  338. wg := &sync.WaitGroup{}
  339. wg.Add(2)
  340. go func() {
  341. defer wg.Done()
  342. stream, err := server.AcceptStream()
  343. if err != nil {
  344. t.Fatalf("err: %v", err)
  345. }
  346. if server.NumStreams() != 1 {
  347. t.Fatalf("bad")
  348. }
  349. buf := make([]byte, 4)
  350. for i := 0; i < 1000; i++ {
  351. n, err := stream.Read(buf)
  352. if err != nil {
  353. t.Fatalf("err: %v", err)
  354. }
  355. if n != 4 {
  356. t.Fatalf("short read: %d", n)
  357. }
  358. if string(buf) != "test" {
  359. t.Fatalf("bad: %s", buf)
  360. }
  361. }
  362. if err := stream.Close(); err != nil {
  363. t.Fatalf("err: %v", err)
  364. }
  365. }()
  366. go func() {
  367. defer wg.Done()
  368. stream, err := client.Open()
  369. if err != nil {
  370. t.Fatalf("err: %v", err)
  371. }
  372. if client.NumStreams() != 1 {
  373. t.Fatalf("bad")
  374. }
  375. for i := 0; i < 1000; i++ {
  376. n, err := stream.Write([]byte("test"))
  377. if err != nil {
  378. t.Fatalf("err: %v", err)
  379. }
  380. if n != 4 {
  381. t.Fatalf("short write %d", n)
  382. }
  383. }
  384. if err := stream.Close(); err != nil {
  385. t.Fatalf("err: %v", err)
  386. }
  387. }()
  388. doneCh := make(chan struct{})
  389. go func() {
  390. wg.Wait()
  391. close(doneCh)
  392. }()
  393. select {
  394. case <-doneCh:
  395. case <-time.After(time.Second):
  396. panic("timeout")
  397. }
  398. if client.NumStreams() != 0 {
  399. t.Fatalf("bad")
  400. }
  401. if server.NumStreams() != 0 {
  402. t.Fatalf("bad")
  403. }
  404. }
  405. func TestSendData_Large(t *testing.T) {
  406. client, server := testClientServer()
  407. defer client.Close()
  408. defer server.Close()
  409. const (
  410. sendSize = 250 * 1024 * 1024
  411. recvSize = 4 * 1024
  412. )
  413. data := make([]byte, sendSize)
  414. for idx := range data {
  415. data[idx] = byte(idx % 256)
  416. }
  417. wg := &sync.WaitGroup{}
  418. wg.Add(2)
  419. go func() {
  420. defer wg.Done()
  421. stream, err := server.AcceptStream()
  422. if err != nil {
  423. t.Fatalf("err: %v", err)
  424. }
  425. var sz int
  426. buf := make([]byte, recvSize)
  427. for i := 0; i < sendSize/recvSize; i++ {
  428. n, err := stream.Read(buf)
  429. if err != nil {
  430. t.Fatalf("err: %v", err)
  431. }
  432. if n != recvSize {
  433. t.Fatalf("short read: %d", n)
  434. }
  435. sz += n
  436. for idx := range buf {
  437. if buf[idx] != byte(idx%256) {
  438. t.Fatalf("bad: %v %v %v", i, idx, buf[idx])
  439. }
  440. }
  441. }
  442. if err := stream.Close(); err != nil {
  443. t.Fatalf("err: %v", err)
  444. }
  445. t.Logf("cap=%d, n=%d\n", stream.recvBuf.Cap(), sz)
  446. }()
  447. go func() {
  448. defer wg.Done()
  449. stream, err := client.Open()
  450. if err != nil {
  451. t.Fatalf("err: %v", err)
  452. }
  453. n, err := stream.Write(data)
  454. if err != nil {
  455. t.Fatalf("err: %v", err)
  456. }
  457. if n != len(data) {
  458. t.Fatalf("short write %d", n)
  459. }
  460. if err := stream.Close(); err != nil {
  461. t.Fatalf("err: %v", err)
  462. }
  463. }()
  464. doneCh := make(chan struct{})
  465. go func() {
  466. wg.Wait()
  467. close(doneCh)
  468. }()
  469. select {
  470. case <-doneCh:
  471. case <-time.After(5 * time.Second):
  472. panic("timeout")
  473. }
  474. }
  475. func TestGoAway(t *testing.T) {
  476. client, server := testClientServer()
  477. defer client.Close()
  478. defer server.Close()
  479. if err := server.GoAway(); err != nil {
  480. t.Fatalf("err: %v", err)
  481. }
  482. _, err := client.Open()
  483. if err != ErrRemoteGoAway {
  484. t.Fatalf("err: %v", err)
  485. }
  486. }
  487. func TestManyStreams(t *testing.T) {
  488. client, server := testClientServer()
  489. defer client.Close()
  490. defer server.Close()
  491. wg := &sync.WaitGroup{}
  492. acceptor := func(i int) {
  493. defer wg.Done()
  494. stream, err := server.AcceptStream()
  495. if err != nil {
  496. t.Fatalf("err: %v", err)
  497. }
  498. defer stream.Close()
  499. buf := make([]byte, 512)
  500. for {
  501. n, err := stream.Read(buf)
  502. if err == io.EOF {
  503. return
  504. }
  505. if err != nil {
  506. t.Fatalf("err: %v", err)
  507. }
  508. if n == 0 {
  509. t.Fatalf("err: %v", err)
  510. }
  511. }
  512. }
  513. sender := func(i int) {
  514. defer wg.Done()
  515. stream, err := client.Open()
  516. if err != nil {
  517. t.Fatalf("err: %v", err)
  518. }
  519. defer stream.Close()
  520. msg := fmt.Sprintf("%08d", i)
  521. for i := 0; i < 1000; i++ {
  522. n, err := stream.Write([]byte(msg))
  523. if err != nil {
  524. t.Fatalf("err: %v", err)
  525. }
  526. if n != len(msg) {
  527. t.Fatalf("short write %d", n)
  528. }
  529. }
  530. }
  531. for i := 0; i < 50; i++ {
  532. wg.Add(2)
  533. go acceptor(i)
  534. go sender(i)
  535. }
  536. wg.Wait()
  537. }
  538. func TestManyStreams_PingPong(t *testing.T) {
  539. client, server := testClientServer()
  540. defer client.Close()
  541. defer server.Close()
  542. wg := &sync.WaitGroup{}
  543. ping := []byte("ping")
  544. pong := []byte("pong")
  545. acceptor := func(i int) {
  546. defer wg.Done()
  547. stream, err := server.AcceptStream()
  548. if err != nil {
  549. t.Fatalf("err: %v", err)
  550. }
  551. defer stream.Close()
  552. buf := make([]byte, 4)
  553. for {
  554. // Read the 'ping'
  555. n, err := stream.Read(buf)
  556. if err == io.EOF {
  557. return
  558. }
  559. if err != nil {
  560. t.Fatalf("err: %v", err)
  561. }
  562. if n != 4 {
  563. t.Fatalf("err: %v", err)
  564. }
  565. if !bytes.Equal(buf, ping) {
  566. t.Fatalf("bad: %s", buf)
  567. }
  568. // Shrink the internal buffer!
  569. stream.Shrink()
  570. // Write out the 'pong'
  571. n, err = stream.Write(pong)
  572. if err != nil {
  573. t.Fatalf("err: %v", err)
  574. }
  575. if n != 4 {
  576. t.Fatalf("err: %v", err)
  577. }
  578. }
  579. }
  580. sender := func(i int) {
  581. defer wg.Done()
  582. stream, err := client.OpenStream()
  583. if err != nil {
  584. t.Fatalf("err: %v", err)
  585. }
  586. defer stream.Close()
  587. buf := make([]byte, 4)
  588. for i := 0; i < 1000; i++ {
  589. // Send the 'ping'
  590. n, err := stream.Write(ping)
  591. if err != nil {
  592. t.Fatalf("err: %v", err)
  593. }
  594. if n != 4 {
  595. t.Fatalf("short write %d", n)
  596. }
  597. // Read the 'pong'
  598. n, err = stream.Read(buf)
  599. if err != nil {
  600. t.Fatalf("err: %v", err)
  601. }
  602. if n != 4 {
  603. t.Fatalf("err: %v", err)
  604. }
  605. if !bytes.Equal(buf, pong) {
  606. t.Fatalf("bad: %s", buf)
  607. }
  608. // Shrink the buffer
  609. stream.Shrink()
  610. }
  611. }
  612. for i := 0; i < 50; i++ {
  613. wg.Add(2)
  614. go acceptor(i)
  615. go sender(i)
  616. }
  617. wg.Wait()
  618. }
  619. func TestHalfClose(t *testing.T) {
  620. client, server := testClientServer()
  621. defer client.Close()
  622. defer server.Close()
  623. stream, err := client.Open()
  624. if err != nil {
  625. t.Fatalf("err: %v", err)
  626. }
  627. if _, err = stream.Write([]byte("a")); err != nil {
  628. t.Fatalf("err: %v", err)
  629. }
  630. stream2, err := server.Accept()
  631. if err != nil {
  632. t.Fatalf("err: %v", err)
  633. }
  634. stream2.Close() // Half close
  635. buf := make([]byte, 4)
  636. n, err := stream2.Read(buf)
  637. if err != nil {
  638. t.Fatalf("err: %v", err)
  639. }
  640. if n != 1 {
  641. t.Fatalf("bad: %v", n)
  642. }
  643. // Send more
  644. if _, err = stream.Write([]byte("bcd")); err != nil {
  645. t.Fatalf("err: %v", err)
  646. }
  647. stream.Close()
  648. // Read after close
  649. n, err = stream2.Read(buf)
  650. if err != nil {
  651. t.Fatalf("err: %v", err)
  652. }
  653. if n != 3 {
  654. t.Fatalf("bad: %v", n)
  655. }
  656. // EOF after close
  657. n, err = stream2.Read(buf)
  658. if err != io.EOF {
  659. t.Fatalf("err: %v", err)
  660. }
  661. if n != 0 {
  662. t.Fatalf("bad: %v", n)
  663. }
  664. }
  665. func TestReadDeadline(t *testing.T) {
  666. client, server := testClientServer()
  667. defer client.Close()
  668. defer server.Close()
  669. stream, err := client.Open()
  670. if err != nil {
  671. t.Fatalf("err: %v", err)
  672. }
  673. defer stream.Close()
  674. stream2, err := server.Accept()
  675. if err != nil {
  676. t.Fatalf("err: %v", err)
  677. }
  678. defer stream2.Close()
  679. if err := stream.SetReadDeadline(time.Now().Add(5 * time.Millisecond)); err != nil {
  680. t.Fatalf("err: %v", err)
  681. }
  682. buf := make([]byte, 4)
  683. _, err = stream.Read(buf)
  684. if err != ErrTimeout {
  685. t.Fatalf("err: %v", err)
  686. }
  687. // See https://github.com/hashicorp/yamux/issues/90
  688. // The standard library's http server package will read from connections in
  689. // the background to detect if they are alive.
  690. //
  691. // It sets a read deadline on connections and detect if the returned error
  692. // is a network timeout error which implements net.Error.
  693. //
  694. // The HTTP server will cancel all server requests if it isn't timeout error
  695. // from the connection.
  696. //
  697. // We assert that we return an error meeting the interface to avoid
  698. // accidently breaking yamux session compatability with the standard
  699. // library's http server implementation.
  700. if netErr, ok := err.(net.Error); !ok || !netErr.Timeout() {
  701. t.Fatalf("reading timeout error is expected to implement net.Error and return true when calling Timeout()")
  702. }
  703. }
  704. func TestReadDeadline_BlockedRead(t *testing.T) {
  705. client, server := testClientServer()
  706. defer client.Close()
  707. defer server.Close()
  708. stream, err := client.Open()
  709. if err != nil {
  710. t.Fatalf("err: %v", err)
  711. }
  712. defer stream.Close()
  713. stream2, err := server.Accept()
  714. if err != nil {
  715. t.Fatalf("err: %v", err)
  716. }
  717. defer stream2.Close()
  718. // Start a read that will block
  719. errCh := make(chan error, 1)
  720. go func() {
  721. buf := make([]byte, 4)
  722. _, err := stream.Read(buf)
  723. errCh <- err
  724. close(errCh)
  725. }()
  726. // Wait to ensure the read has started.
  727. time.Sleep(5 * time.Millisecond)
  728. // Update the read deadline
  729. if err := stream.SetReadDeadline(time.Now().Add(5 * time.Millisecond)); err != nil {
  730. t.Fatalf("err: %v", err)
  731. }
  732. select {
  733. case <-time.After(100 * time.Millisecond):
  734. t.Fatal("expected read timeout")
  735. case err := <-errCh:
  736. if err != ErrTimeout {
  737. t.Fatalf("expected ErrTimeout; got %v", err)
  738. }
  739. }
  740. }
  741. func TestWriteDeadline(t *testing.T) {
  742. client, server := testClientServer()
  743. defer client.Close()
  744. defer server.Close()
  745. stream, err := client.Open()
  746. if err != nil {
  747. t.Fatalf("err: %v", err)
  748. }
  749. defer stream.Close()
  750. stream2, err := server.Accept()
  751. if err != nil {
  752. t.Fatalf("err: %v", err)
  753. }
  754. defer stream2.Close()
  755. if err := stream.SetWriteDeadline(time.Now().Add(50 * time.Millisecond)); err != nil {
  756. t.Fatalf("err: %v", err)
  757. }
  758. buf := make([]byte, 512)
  759. for i := 0; i < int(initialStreamWindow); i++ {
  760. _, err := stream.Write(buf)
  761. if err != nil && err == ErrTimeout {
  762. return
  763. } else if err != nil {
  764. t.Fatalf("err: %v", err)
  765. }
  766. }
  767. t.Fatalf("Expected timeout")
  768. }
  769. func TestWriteDeadline_BlockedWrite(t *testing.T) {
  770. client, server := testClientServer()
  771. defer client.Close()
  772. defer server.Close()
  773. stream, err := client.Open()
  774. if err != nil {
  775. t.Fatalf("err: %v", err)
  776. }
  777. defer stream.Close()
  778. stream2, err := server.Accept()
  779. if err != nil {
  780. t.Fatalf("err: %v", err)
  781. }
  782. defer stream2.Close()
  783. // Start a goroutine making writes that will block
  784. errCh := make(chan error, 1)
  785. go func() {
  786. buf := make([]byte, 512)
  787. for i := 0; i < int(initialStreamWindow); i++ {
  788. _, err := stream.Write(buf)
  789. if err == nil {
  790. continue
  791. }
  792. errCh <- err
  793. close(errCh)
  794. return
  795. }
  796. close(errCh)
  797. }()
  798. // Wait to ensure the write has started.
  799. time.Sleep(5 * time.Millisecond)
  800. // Update the write deadline
  801. if err := stream.SetWriteDeadline(time.Now().Add(5 * time.Millisecond)); err != nil {
  802. t.Fatalf("err: %v", err)
  803. }
  804. select {
  805. case <-time.After(1 * time.Second):
  806. t.Fatal("expected write timeout")
  807. case err := <-errCh:
  808. if err != ErrTimeout {
  809. t.Fatalf("expected ErrTimeout; got %v", err)
  810. }
  811. }
  812. }
  813. func TestBacklogExceeded(t *testing.T) {
  814. client, server := testClientServer()
  815. defer client.Close()
  816. defer server.Close()
  817. // Fill the backlog
  818. max := client.config.AcceptBacklog
  819. for i := 0; i < max; i++ {
  820. stream, err := client.Open()
  821. if err != nil {
  822. t.Fatalf("err: %v", err)
  823. }
  824. defer stream.Close()
  825. if _, err := stream.Write([]byte("foo")); err != nil {
  826. t.Fatalf("err: %v", err)
  827. }
  828. }
  829. // Attempt to open a new stream
  830. errCh := make(chan error, 1)
  831. go func() {
  832. _, err := client.Open()
  833. errCh <- err
  834. }()
  835. // Shutdown the server
  836. go func() {
  837. time.Sleep(10 * time.Millisecond)
  838. server.Close()
  839. }()
  840. select {
  841. case err := <-errCh:
  842. if err == nil {
  843. t.Fatalf("open should fail")
  844. }
  845. case <-time.After(time.Second):
  846. t.Fatalf("timeout")
  847. }
  848. }
  849. func TestKeepAlive(t *testing.T) {
  850. client, server := testClientServer()
  851. defer client.Close()
  852. defer server.Close()
  853. time.Sleep(200 * time.Millisecond)
  854. // Ping value should increase
  855. client.pingLock.Lock()
  856. defer client.pingLock.Unlock()
  857. if client.pingID == 0 {
  858. t.Fatalf("should ping")
  859. }
  860. server.pingLock.Lock()
  861. defer server.pingLock.Unlock()
  862. if server.pingID == 0 {
  863. t.Fatalf("should ping")
  864. }
  865. }
  866. func TestKeepAlive_Timeout(t *testing.T) {
  867. conn1, conn2 := testConn()
  868. clientConf := testConf()
  869. clientConf.ConnectionWriteTimeout = time.Hour // We're testing keep alives, not connection writes
  870. clientConf.EnableKeepAlive = false // Just test one direction, so it's deterministic who hangs up on whom
  871. client, _ := Client(conn1, clientConf)
  872. defer client.Close()
  873. server, _ := Server(conn2, testConf())
  874. defer server.Close()
  875. _ = captureLogs(client) // Client logs aren't part of the test
  876. serverLogs := captureLogs(server)
  877. errCh := make(chan error, 1)
  878. go func() {
  879. _, err := server.Accept() // Wait until server closes
  880. errCh <- err
  881. }()
  882. // Prevent the client from responding
  883. clientConn := client.conn.(*pipeConn)
  884. clientConn.writeBlocker.Lock()
  885. select {
  886. case err := <-errCh:
  887. if err != ErrKeepAliveTimeout {
  888. t.Fatalf("unexpected error: %v", err)
  889. }
  890. case <-time.After(1 * time.Second):
  891. t.Fatalf("timeout waiting for timeout")
  892. }
  893. if !server.IsClosed() {
  894. t.Fatalf("server should have closed")
  895. }
  896. if !serverLogs.match([]string{"[ERR] yamux: keepalive failed: i/o deadline reached"}) {
  897. t.Fatalf("server log incorect: %v", serverLogs.logs())
  898. }
  899. }
  900. func TestLargeWindow(t *testing.T) {
  901. conf := DefaultConfig()
  902. conf.MaxStreamWindowSize *= 2
  903. client, server := testClientServerConfig(conf)
  904. defer client.Close()
  905. defer server.Close()
  906. stream, err := client.Open()
  907. if err != nil {
  908. t.Fatalf("err: %v", err)
  909. }
  910. defer stream.Close()
  911. stream2, err := server.Accept()
  912. if err != nil {
  913. t.Fatalf("err: %v", err)
  914. }
  915. defer stream2.Close()
  916. stream.SetWriteDeadline(time.Now().Add(10 * time.Millisecond))
  917. buf := make([]byte, conf.MaxStreamWindowSize)
  918. n, err := stream.Write(buf)
  919. if err != nil {
  920. t.Fatalf("err: %v", err)
  921. }
  922. if n != len(buf) {
  923. t.Fatalf("short write: %d", n)
  924. }
  925. }
  926. type UnlimitedReader struct{}
  927. func (u *UnlimitedReader) Read(p []byte) (int, error) {
  928. runtime.Gosched()
  929. return len(p), nil
  930. }
  931. func TestSendData_VeryLarge(t *testing.T) {
  932. client, server := testClientServer()
  933. defer client.Close()
  934. defer server.Close()
  935. var n int64 = 1 * 1024 * 1024 * 1024
  936. var workers int = 16
  937. wg := &sync.WaitGroup{}
  938. wg.Add(workers * 2)
  939. for i := 0; i < workers; i++ {
  940. go func() {
  941. defer wg.Done()
  942. stream, err := server.AcceptStream()
  943. if err != nil {
  944. t.Fatalf("err: %v", err)
  945. }
  946. defer stream.Close()
  947. buf := make([]byte, 4)
  948. _, err = stream.Read(buf)
  949. if err != nil {
  950. t.Fatalf("err: %v", err)
  951. }
  952. if !bytes.Equal(buf, []byte{0, 1, 2, 3}) {
  953. t.Fatalf("bad header")
  954. }
  955. recv, err := io.Copy(ioutil.Discard, stream)
  956. if err != nil {
  957. t.Fatalf("err: %v", err)
  958. }
  959. if recv != n {
  960. t.Fatalf("bad: %v", recv)
  961. }
  962. }()
  963. }
  964. for i := 0; i < workers; i++ {
  965. go func() {
  966. defer wg.Done()
  967. stream, err := client.Open()
  968. if err != nil {
  969. t.Fatalf("err: %v", err)
  970. }
  971. defer stream.Close()
  972. _, err = stream.Write([]byte{0, 1, 2, 3})
  973. if err != nil {
  974. t.Fatalf("err: %v", err)
  975. }
  976. unlimited := &UnlimitedReader{}
  977. sent, err := io.Copy(stream, io.LimitReader(unlimited, n))
  978. if err != nil {
  979. t.Fatalf("err: %v", err)
  980. }
  981. if sent != n {
  982. t.Fatalf("bad: %v", sent)
  983. }
  984. }()
  985. }
  986. doneCh := make(chan struct{})
  987. go func() {
  988. wg.Wait()
  989. close(doneCh)
  990. }()
  991. select {
  992. case <-doneCh:
  993. case <-time.After(20 * time.Second):
  994. panic("timeout")
  995. }
  996. }
  997. func TestBacklogExceeded_Accept(t *testing.T) {
  998. client, server := testClientServer()
  999. defer client.Close()
  1000. defer server.Close()
  1001. max := 5 * client.config.AcceptBacklog
  1002. go func() {
  1003. for i := 0; i < max; i++ {
  1004. stream, err := server.Accept()
  1005. if err != nil {
  1006. t.Fatalf("err: %v", err)
  1007. }
  1008. defer stream.Close()
  1009. }
  1010. }()
  1011. // Fill the backlog
  1012. for i := 0; i < max; i++ {
  1013. stream, err := client.Open()
  1014. if err != nil {
  1015. t.Fatalf("err: %v", err)
  1016. }
  1017. defer stream.Close()
  1018. if _, err := stream.Write([]byte("foo")); err != nil {
  1019. t.Fatalf("err: %v", err)
  1020. }
  1021. }
  1022. }
  1023. func TestSession_WindowUpdateWriteDuringRead(t *testing.T) {
  1024. client, server := testClientServerConfig(testConfNoKeepAlive())
  1025. defer client.Close()
  1026. defer server.Close()
  1027. var wg sync.WaitGroup
  1028. wg.Add(2)
  1029. // Choose a huge flood size that we know will result in a window update.
  1030. flood := int64(client.config.MaxStreamWindowSize) - 1
  1031. // The server will accept a new stream and then flood data to it.
  1032. go func() {
  1033. defer wg.Done()
  1034. stream, err := server.AcceptStream()
  1035. if err != nil {
  1036. t.Fatalf("err: %v", err)
  1037. }
  1038. defer stream.Close()
  1039. n, err := stream.Write(make([]byte, flood))
  1040. if err != nil {
  1041. t.Fatalf("err: %v", err)
  1042. }
  1043. if int64(n) != flood {
  1044. t.Fatalf("short write: %d", n)
  1045. }
  1046. }()
  1047. // The client will open a stream, block outbound writes, and then
  1048. // listen to the flood from the server, which should time out since
  1049. // it won't be able to send the window update.
  1050. go func() {
  1051. defer wg.Done()
  1052. stream, err := client.OpenStream()
  1053. if err != nil {
  1054. t.Fatalf("err: %v", err)
  1055. }
  1056. defer stream.Close()
  1057. conn := client.conn.(*pipeConn)
  1058. conn.writeBlocker.Lock()
  1059. _, err = stream.Read(make([]byte, flood))
  1060. if err != ErrConnectionWriteTimeout {
  1061. t.Fatalf("err: %v", err)
  1062. }
  1063. }()
  1064. wg.Wait()
  1065. }
  1066. func TestSession_PartialReadWindowUpdate(t *testing.T) {
  1067. client, server := testClientServerConfig(testConfNoKeepAlive())
  1068. defer client.Close()
  1069. defer server.Close()
  1070. var wg sync.WaitGroup
  1071. wg.Add(1)
  1072. // Choose a huge flood size that we know will result in a window update.
  1073. flood := int64(client.config.MaxStreamWindowSize)
  1074. var wr *Stream
  1075. // The server will accept a new stream and then flood data to it.
  1076. go func() {
  1077. defer wg.Done()
  1078. var err error
  1079. wr, err = server.AcceptStream()
  1080. if err != nil {
  1081. t.Fatalf("err: %v", err)
  1082. }
  1083. defer wr.Close()
  1084. if wr.sendWindow != client.config.MaxStreamWindowSize {
  1085. t.Fatalf("sendWindow: exp=%d, got=%d", client.config.MaxStreamWindowSize, wr.sendWindow)
  1086. }
  1087. n, err := wr.Write(make([]byte, flood))
  1088. if err != nil {
  1089. t.Fatalf("err: %v", err)
  1090. }
  1091. if int64(n) != flood {
  1092. t.Fatalf("short write: %d", n)
  1093. }
  1094. if wr.sendWindow != 0 {
  1095. t.Fatalf("sendWindow: exp=%d, got=%d", 0, wr.sendWindow)
  1096. }
  1097. }()
  1098. stream, err := client.OpenStream()
  1099. if err != nil {
  1100. t.Fatalf("err: %v", err)
  1101. }
  1102. defer stream.Close()
  1103. wg.Wait()
  1104. _, err = stream.Read(make([]byte, flood/2+1))
  1105. if exp := uint32(flood/2 + 1); wr.sendWindow != exp {
  1106. t.Errorf("sendWindow: exp=%d, got=%d", exp, wr.sendWindow)
  1107. }
  1108. }
  1109. func TestSession_sendNoWait_Timeout(t *testing.T) {
  1110. client, server := testClientServerConfig(testConfNoKeepAlive())
  1111. defer client.Close()
  1112. defer server.Close()
  1113. var wg sync.WaitGroup
  1114. wg.Add(2)
  1115. go func() {
  1116. defer wg.Done()
  1117. stream, err := server.AcceptStream()
  1118. if err != nil {
  1119. t.Fatalf("err: %v", err)
  1120. }
  1121. defer stream.Close()
  1122. }()
  1123. // The client will open the stream and then block outbound writes, we'll
  1124. // probe sendNoWait once it gets into that state.
  1125. go func() {
  1126. defer wg.Done()
  1127. stream, err := client.OpenStream()
  1128. if err != nil {
  1129. t.Fatalf("err: %v", err)
  1130. }
  1131. defer stream.Close()
  1132. conn := client.conn.(*pipeConn)
  1133. conn.writeBlocker.Lock()
  1134. hdr := header(make([]byte, headerSize))
  1135. hdr.encode(typePing, flagACK, 0, 0)
  1136. for {
  1137. err = client.sendNoWait(hdr)
  1138. if err == nil {
  1139. continue
  1140. } else if err == ErrConnectionWriteTimeout {
  1141. break
  1142. } else {
  1143. t.Fatalf("err: %v", err)
  1144. }
  1145. }
  1146. }()
  1147. wg.Wait()
  1148. }
  1149. func TestSession_PingOfDeath(t *testing.T) {
  1150. client, server := testClientServerConfig(testConfNoKeepAlive())
  1151. defer client.Close()
  1152. defer server.Close()
  1153. var wg sync.WaitGroup
  1154. wg.Add(2)
  1155. var doPingOfDeath sync.Mutex
  1156. doPingOfDeath.Lock()
  1157. // This is used later to block outbound writes.
  1158. conn := server.conn.(*pipeConn)
  1159. // The server will accept a stream, block outbound writes, and then
  1160. // flood its send channel so that no more headers can be queued.
  1161. go func() {
  1162. defer wg.Done()
  1163. stream, err := server.AcceptStream()
  1164. if err != nil {
  1165. t.Fatalf("err: %v", err)
  1166. }
  1167. defer stream.Close()
  1168. conn.writeBlocker.Lock()
  1169. for {
  1170. hdr := header(make([]byte, headerSize))
  1171. hdr.encode(typePing, 0, 0, 0)
  1172. err = server.sendNoWait(hdr)
  1173. if err == nil {
  1174. continue
  1175. } else if err == ErrConnectionWriteTimeout {
  1176. break
  1177. } else {
  1178. t.Fatalf("err: %v", err)
  1179. }
  1180. }
  1181. doPingOfDeath.Unlock()
  1182. }()
  1183. // The client will open a stream and then send the server a ping once it
  1184. // can no longer write. This makes sure the server doesn't deadlock reads
  1185. // while trying to reply to the ping with no ability to write.
  1186. go func() {
  1187. defer wg.Done()
  1188. stream, err := client.OpenStream()
  1189. if err != nil {
  1190. t.Fatalf("err: %v", err)
  1191. }
  1192. defer stream.Close()
  1193. // This ping will never unblock because the ping id will never
  1194. // show up in a response.
  1195. doPingOfDeath.Lock()
  1196. go func() { client.Ping() }()
  1197. // Wait for a while to make sure the previous ping times out,
  1198. // then turn writes back on and make sure a ping works again.
  1199. time.Sleep(2 * server.config.ConnectionWriteTimeout)
  1200. conn.writeBlocker.Unlock()
  1201. if _, err = client.Ping(); err != nil {
  1202. t.Fatalf("err: %v", err)
  1203. }
  1204. }()
  1205. wg.Wait()
  1206. }
  1207. func TestSession_ConnectionWriteTimeout(t *testing.T) {
  1208. client, server := testClientServerConfig(testConfNoKeepAlive())
  1209. defer client.Close()
  1210. defer server.Close()
  1211. var wg sync.WaitGroup
  1212. wg.Add(2)
  1213. go func() {
  1214. defer wg.Done()
  1215. stream, err := server.AcceptStream()
  1216. if err != nil {
  1217. t.Fatalf("err: %v", err)
  1218. }
  1219. defer stream.Close()
  1220. }()
  1221. // The client will open the stream and then block outbound writes, we'll
  1222. // tee up a write and make sure it eventually times out.
  1223. go func() {
  1224. defer wg.Done()
  1225. stream, err := client.OpenStream()
  1226. if err != nil {
  1227. t.Fatalf("err: %v", err)
  1228. }
  1229. defer stream.Close()
  1230. conn := client.conn.(*pipeConn)
  1231. conn.writeBlocker.Lock()
  1232. // Since the write goroutine is blocked then this will return a
  1233. // timeout since it can't get feedback about whether the write
  1234. // worked.
  1235. n, err := stream.Write([]byte("hello"))
  1236. if err != ErrConnectionWriteTimeout {
  1237. t.Fatalf("err: %v", err)
  1238. }
  1239. if n != 0 {
  1240. t.Fatalf("lied about writes: %d", n)
  1241. }
  1242. }()
  1243. wg.Wait()
  1244. }