session_test.go 22 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153
  1. package yamux
  2. import (
  3. "bytes"
  4. "fmt"
  5. "io"
  6. "io/ioutil"
  7. "log"
  8. "reflect"
  9. "runtime"
  10. "strings"
  11. "sync"
  12. "testing"
  13. "time"
  14. )
  15. type logCapture struct{ bytes.Buffer }
  16. func (l *logCapture) logs() []string {
  17. return strings.Split(strings.TrimSpace(l.String()), "\n")
  18. }
  19. func (l *logCapture) match(expect []string) bool {
  20. return reflect.DeepEqual(l.logs(), expect)
  21. }
  22. func captureLogs(s *Session) *logCapture {
  23. buf := new(logCapture)
  24. s.logger = log.New(buf, "", 0)
  25. return buf
  26. }
  27. type pipeConn struct {
  28. reader *io.PipeReader
  29. writer *io.PipeWriter
  30. writeBlocker sync.Mutex
  31. }
  32. func (p *pipeConn) Read(b []byte) (int, error) {
  33. return p.reader.Read(b)
  34. }
  35. func (p *pipeConn) Write(b []byte) (int, error) {
  36. p.writeBlocker.Lock()
  37. defer p.writeBlocker.Unlock()
  38. return p.writer.Write(b)
  39. }
  40. func (p *pipeConn) Close() error {
  41. p.reader.Close()
  42. return p.writer.Close()
  43. }
  44. func testConn() (io.ReadWriteCloser, io.ReadWriteCloser) {
  45. read1, write1 := io.Pipe()
  46. read2, write2 := io.Pipe()
  47. conn1 := &pipeConn{reader: read1, writer: write2}
  48. conn2 := &pipeConn{reader: read2, writer: write1}
  49. return conn1, conn2
  50. }
  51. func testConf() *Config {
  52. conf := DefaultConfig()
  53. conf.AcceptBacklog = 64
  54. conf.KeepAliveInterval = 100 * time.Millisecond
  55. conf.ConnectionWriteTimeout = 250 * time.Millisecond
  56. return conf
  57. }
  58. func testConfNoKeepAlive() *Config {
  59. conf := testConf()
  60. conf.EnableKeepAlive = false
  61. return conf
  62. }
  63. func testClientServer() (*Session, *Session) {
  64. return testClientServerConfig(testConf())
  65. }
  66. func testClientServerConfig(conf *Config) (*Session, *Session) {
  67. conn1, conn2 := testConn()
  68. client, _ := Client(conn1, conf)
  69. server, _ := Server(conn2, conf)
  70. return client, server
  71. }
  72. func TestPing(t *testing.T) {
  73. client, server := testClientServer()
  74. defer client.Close()
  75. defer server.Close()
  76. rtt, err := client.Ping()
  77. if err != nil {
  78. t.Fatalf("err: %v", err)
  79. }
  80. if rtt == 0 {
  81. t.Fatalf("bad: %v", rtt)
  82. }
  83. rtt, err = server.Ping()
  84. if err != nil {
  85. t.Fatalf("err: %v", err)
  86. }
  87. if rtt == 0 {
  88. t.Fatalf("bad: %v", rtt)
  89. }
  90. }
  91. func TestPing_Timeout(t *testing.T) {
  92. client, server := testClientServerConfig(testConfNoKeepAlive())
  93. defer client.Close()
  94. defer server.Close()
  95. // Prevent the client from responding
  96. clientConn := client.conn.(*pipeConn)
  97. clientConn.writeBlocker.Lock()
  98. errCh := make(chan error, 1)
  99. go func() {
  100. _, err := server.Ping() // Ping via the server session
  101. errCh <- err
  102. }()
  103. select {
  104. case err := <-errCh:
  105. if err != ErrTimeout {
  106. t.Fatalf("err: %v", err)
  107. }
  108. case <-time.After(client.config.ConnectionWriteTimeout * 2):
  109. t.Fatalf("failed to timeout within expected %v", client.config.ConnectionWriteTimeout)
  110. }
  111. // Verify that we recover, even if we gave up
  112. clientConn.writeBlocker.Unlock()
  113. go func() {
  114. _, err := server.Ping() // Ping via the server session
  115. errCh <- err
  116. }()
  117. select {
  118. case err := <-errCh:
  119. if err != nil {
  120. t.Fatalf("err: %v", err)
  121. }
  122. case <-time.After(client.config.ConnectionWriteTimeout):
  123. t.Fatalf("timeout")
  124. }
  125. }
  126. func TestAccept(t *testing.T) {
  127. client, server := testClientServer()
  128. defer client.Close()
  129. defer server.Close()
  130. if client.NumStreams() != 0 {
  131. t.Fatalf("bad")
  132. }
  133. if server.NumStreams() != 0 {
  134. t.Fatalf("bad")
  135. }
  136. wg := &sync.WaitGroup{}
  137. wg.Add(4)
  138. go func() {
  139. defer wg.Done()
  140. stream, err := server.AcceptStream()
  141. if err != nil {
  142. t.Fatalf("err: %v", err)
  143. }
  144. if id := stream.StreamID(); id != 1 {
  145. t.Fatalf("bad: %v", id)
  146. }
  147. if err := stream.Close(); err != nil {
  148. t.Fatalf("err: %v", err)
  149. }
  150. }()
  151. go func() {
  152. defer wg.Done()
  153. stream, err := client.AcceptStream()
  154. if err != nil {
  155. t.Fatalf("err: %v", err)
  156. }
  157. if id := stream.StreamID(); id != 2 {
  158. t.Fatalf("bad: %v", id)
  159. }
  160. if err := stream.Close(); err != nil {
  161. t.Fatalf("err: %v", err)
  162. }
  163. }()
  164. go func() {
  165. defer wg.Done()
  166. stream, err := server.OpenStream()
  167. if err != nil {
  168. t.Fatalf("err: %v", err)
  169. }
  170. if id := stream.StreamID(); id != 2 {
  171. t.Fatalf("bad: %v", id)
  172. }
  173. if err := stream.Close(); err != nil {
  174. t.Fatalf("err: %v", err)
  175. }
  176. }()
  177. go func() {
  178. defer wg.Done()
  179. stream, err := client.OpenStream()
  180. if err != nil {
  181. t.Fatalf("err: %v", err)
  182. }
  183. if id := stream.StreamID(); id != 1 {
  184. t.Fatalf("bad: %v", id)
  185. }
  186. if err := stream.Close(); err != nil {
  187. t.Fatalf("err: %v", err)
  188. }
  189. }()
  190. doneCh := make(chan struct{})
  191. go func() {
  192. wg.Wait()
  193. close(doneCh)
  194. }()
  195. select {
  196. case <-doneCh:
  197. case <-time.After(time.Second):
  198. panic("timeout")
  199. }
  200. }
  201. func TestNonNilInterface(t *testing.T) {
  202. _, server := testClientServer()
  203. server.Close()
  204. conn, err := server.Accept()
  205. if err != nil && conn != nil {
  206. t.Error("bad: accept should return a connection of nil value")
  207. }
  208. conn, err = server.Open()
  209. if err != nil && conn != nil {
  210. t.Error("bad: open should return a connection of nil value")
  211. }
  212. }
  213. func TestSendData_Small(t *testing.T) {
  214. client, server := testClientServer()
  215. defer client.Close()
  216. defer server.Close()
  217. wg := &sync.WaitGroup{}
  218. wg.Add(2)
  219. go func() {
  220. defer wg.Done()
  221. stream, err := server.AcceptStream()
  222. if err != nil {
  223. t.Fatalf("err: %v", err)
  224. }
  225. if server.NumStreams() != 1 {
  226. t.Fatalf("bad")
  227. }
  228. buf := make([]byte, 4)
  229. for i := 0; i < 1000; i++ {
  230. n, err := stream.Read(buf)
  231. if err != nil {
  232. t.Fatalf("err: %v", err)
  233. }
  234. if n != 4 {
  235. t.Fatalf("short read: %d", n)
  236. }
  237. if string(buf) != "test" {
  238. t.Fatalf("bad: %s", buf)
  239. }
  240. }
  241. if err := stream.Close(); err != nil {
  242. t.Fatalf("err: %v", err)
  243. }
  244. }()
  245. go func() {
  246. defer wg.Done()
  247. stream, err := client.Open()
  248. if err != nil {
  249. t.Fatalf("err: %v", err)
  250. }
  251. if client.NumStreams() != 1 {
  252. t.Fatalf("bad")
  253. }
  254. for i := 0; i < 1000; i++ {
  255. n, err := stream.Write([]byte("test"))
  256. if err != nil {
  257. t.Fatalf("err: %v", err)
  258. }
  259. if n != 4 {
  260. t.Fatalf("short write %d", n)
  261. }
  262. }
  263. if err := stream.Close(); err != nil {
  264. t.Fatalf("err: %v", err)
  265. }
  266. }()
  267. doneCh := make(chan struct{})
  268. go func() {
  269. wg.Wait()
  270. close(doneCh)
  271. }()
  272. select {
  273. case <-doneCh:
  274. case <-time.After(time.Second):
  275. panic("timeout")
  276. }
  277. if client.NumStreams() != 0 {
  278. t.Fatalf("bad")
  279. }
  280. if server.NumStreams() != 0 {
  281. t.Fatalf("bad")
  282. }
  283. }
  284. func TestSendData_Large(t *testing.T) {
  285. client, server := testClientServer()
  286. defer client.Close()
  287. defer server.Close()
  288. data := make([]byte, 512*1024)
  289. for idx := range data {
  290. data[idx] = byte(idx % 256)
  291. }
  292. wg := &sync.WaitGroup{}
  293. wg.Add(2)
  294. go func() {
  295. defer wg.Done()
  296. stream, err := server.AcceptStream()
  297. if err != nil {
  298. t.Fatalf("err: %v", err)
  299. }
  300. buf := make([]byte, 4*1024)
  301. for i := 0; i < 128; i++ {
  302. n, err := stream.Read(buf)
  303. if err != nil {
  304. t.Fatalf("err: %v", err)
  305. }
  306. if n != 4*1024 {
  307. t.Fatalf("short read: %d", n)
  308. }
  309. for idx := range buf {
  310. if buf[idx] != byte(idx%256) {
  311. t.Fatalf("bad: %v %v %v", i, idx, buf[idx])
  312. }
  313. }
  314. }
  315. if err := stream.Close(); err != nil {
  316. t.Fatalf("err: %v", err)
  317. }
  318. }()
  319. go func() {
  320. defer wg.Done()
  321. stream, err := client.Open()
  322. if err != nil {
  323. t.Fatalf("err: %v", err)
  324. }
  325. n, err := stream.Write(data)
  326. if err != nil {
  327. t.Fatalf("err: %v", err)
  328. }
  329. if n != len(data) {
  330. t.Fatalf("short write %d", n)
  331. }
  332. if err := stream.Close(); err != nil {
  333. t.Fatalf("err: %v", err)
  334. }
  335. }()
  336. doneCh := make(chan struct{})
  337. go func() {
  338. wg.Wait()
  339. close(doneCh)
  340. }()
  341. select {
  342. case <-doneCh:
  343. case <-time.After(time.Second):
  344. panic("timeout")
  345. }
  346. }
  347. func TestGoAway(t *testing.T) {
  348. client, server := testClientServer()
  349. defer client.Close()
  350. defer server.Close()
  351. if err := server.GoAway(); err != nil {
  352. t.Fatalf("err: %v", err)
  353. }
  354. _, err := client.Open()
  355. if err != ErrRemoteGoAway {
  356. t.Fatalf("err: %v", err)
  357. }
  358. }
  359. func TestManyStreams(t *testing.T) {
  360. client, server := testClientServer()
  361. defer client.Close()
  362. defer server.Close()
  363. wg := &sync.WaitGroup{}
  364. acceptor := func(i int) {
  365. defer wg.Done()
  366. stream, err := server.AcceptStream()
  367. if err != nil {
  368. t.Fatalf("err: %v", err)
  369. }
  370. defer stream.Close()
  371. buf := make([]byte, 512)
  372. for {
  373. n, err := stream.Read(buf)
  374. if err == io.EOF {
  375. return
  376. }
  377. if err != nil {
  378. t.Fatalf("err: %v", err)
  379. }
  380. if n == 0 {
  381. t.Fatalf("err: %v", err)
  382. }
  383. }
  384. }
  385. sender := func(i int) {
  386. defer wg.Done()
  387. stream, err := client.Open()
  388. if err != nil {
  389. t.Fatalf("err: %v", err)
  390. }
  391. defer stream.Close()
  392. msg := fmt.Sprintf("%08d", i)
  393. for i := 0; i < 1000; i++ {
  394. n, err := stream.Write([]byte(msg))
  395. if err != nil {
  396. t.Fatalf("err: %v", err)
  397. }
  398. if n != len(msg) {
  399. t.Fatalf("short write %d", n)
  400. }
  401. }
  402. }
  403. for i := 0; i < 50; i++ {
  404. wg.Add(2)
  405. go acceptor(i)
  406. go sender(i)
  407. }
  408. wg.Wait()
  409. }
  410. func TestManyStreams_PingPong(t *testing.T) {
  411. client, server := testClientServer()
  412. defer client.Close()
  413. defer server.Close()
  414. wg := &sync.WaitGroup{}
  415. ping := []byte("ping")
  416. pong := []byte("pong")
  417. acceptor := func(i int) {
  418. defer wg.Done()
  419. stream, err := server.AcceptStream()
  420. if err != nil {
  421. t.Fatalf("err: %v", err)
  422. }
  423. defer stream.Close()
  424. buf := make([]byte, 4)
  425. for {
  426. // Read the 'ping'
  427. n, err := stream.Read(buf)
  428. if err == io.EOF {
  429. return
  430. }
  431. if err != nil {
  432. t.Fatalf("err: %v", err)
  433. }
  434. if n != 4 {
  435. t.Fatalf("err: %v", err)
  436. }
  437. if !bytes.Equal(buf, ping) {
  438. t.Fatalf("bad: %s", buf)
  439. }
  440. // Shrink the internal buffer!
  441. stream.Shrink()
  442. // Write out the 'pong'
  443. n, err = stream.Write(pong)
  444. if err != nil {
  445. t.Fatalf("err: %v", err)
  446. }
  447. if n != 4 {
  448. t.Fatalf("err: %v", err)
  449. }
  450. }
  451. }
  452. sender := func(i int) {
  453. defer wg.Done()
  454. stream, err := client.OpenStream()
  455. if err != nil {
  456. t.Fatalf("err: %v", err)
  457. }
  458. defer stream.Close()
  459. buf := make([]byte, 4)
  460. for i := 0; i < 1000; i++ {
  461. // Send the 'ping'
  462. n, err := stream.Write(ping)
  463. if err != nil {
  464. t.Fatalf("err: %v", err)
  465. }
  466. if n != 4 {
  467. t.Fatalf("short write %d", n)
  468. }
  469. // Read the 'pong'
  470. n, err = stream.Read(buf)
  471. if err != nil {
  472. t.Fatalf("err: %v", err)
  473. }
  474. if n != 4 {
  475. t.Fatalf("err: %v", err)
  476. }
  477. if !bytes.Equal(buf, pong) {
  478. t.Fatalf("bad: %s", buf)
  479. }
  480. // Shrink the buffer
  481. stream.Shrink()
  482. }
  483. }
  484. for i := 0; i < 50; i++ {
  485. wg.Add(2)
  486. go acceptor(i)
  487. go sender(i)
  488. }
  489. wg.Wait()
  490. }
  491. func TestHalfClose(t *testing.T) {
  492. client, server := testClientServer()
  493. defer client.Close()
  494. defer server.Close()
  495. stream, err := client.Open()
  496. if err != nil {
  497. t.Fatalf("err: %v", err)
  498. }
  499. if _, err := stream.Write([]byte("a")); err != nil {
  500. t.Fatalf("err: %v", err)
  501. }
  502. stream2, err := server.Accept()
  503. if err != nil {
  504. t.Fatalf("err: %v", err)
  505. }
  506. stream2.Close() // Half close
  507. buf := make([]byte, 4)
  508. n, err := stream2.Read(buf)
  509. if err != nil {
  510. t.Fatalf("err: %v", err)
  511. }
  512. if n != 1 {
  513. t.Fatalf("bad: %v", n)
  514. }
  515. // Send more
  516. if _, err := stream.Write([]byte("bcd")); err != nil {
  517. t.Fatalf("err: %v", err)
  518. }
  519. stream.Close()
  520. // Read after close
  521. n, err = stream2.Read(buf)
  522. if err != nil {
  523. t.Fatalf("err: %v", err)
  524. }
  525. if n != 3 {
  526. t.Fatalf("bad: %v", n)
  527. }
  528. // EOF after close
  529. n, err = stream2.Read(buf)
  530. if err != io.EOF {
  531. t.Fatalf("err: %v", err)
  532. }
  533. if n != 0 {
  534. t.Fatalf("bad: %v", n)
  535. }
  536. }
  537. func TestReadDeadline(t *testing.T) {
  538. client, server := testClientServer()
  539. defer client.Close()
  540. defer server.Close()
  541. stream, err := client.Open()
  542. if err != nil {
  543. t.Fatalf("err: %v", err)
  544. }
  545. defer stream.Close()
  546. stream2, err := server.Accept()
  547. if err != nil {
  548. t.Fatalf("err: %v", err)
  549. }
  550. defer stream2.Close()
  551. if err := stream.SetReadDeadline(time.Now().Add(5 * time.Millisecond)); err != nil {
  552. t.Fatalf("err: %v", err)
  553. }
  554. buf := make([]byte, 4)
  555. if _, err := stream.Read(buf); err != ErrTimeout {
  556. t.Fatalf("err: %v", err)
  557. }
  558. }
  559. func TestWriteDeadline(t *testing.T) {
  560. client, server := testClientServer()
  561. defer client.Close()
  562. defer server.Close()
  563. stream, err := client.Open()
  564. if err != nil {
  565. t.Fatalf("err: %v", err)
  566. }
  567. defer stream.Close()
  568. stream2, err := server.Accept()
  569. if err != nil {
  570. t.Fatalf("err: %v", err)
  571. }
  572. defer stream2.Close()
  573. if err := stream.SetWriteDeadline(time.Now().Add(50 * time.Millisecond)); err != nil {
  574. t.Fatalf("err: %v", err)
  575. }
  576. buf := make([]byte, 512)
  577. for i := 0; i < int(initialStreamWindow); i++ {
  578. _, err := stream.Write(buf)
  579. if err != nil && err == ErrTimeout {
  580. return
  581. } else if err != nil {
  582. t.Fatalf("err: %v", err)
  583. }
  584. }
  585. t.Fatalf("Expected timeout")
  586. }
  587. func TestBacklogExceeded(t *testing.T) {
  588. client, server := testClientServer()
  589. defer client.Close()
  590. defer server.Close()
  591. // Fill the backlog
  592. max := client.config.AcceptBacklog
  593. for i := 0; i < max; i++ {
  594. stream, err := client.Open()
  595. if err != nil {
  596. t.Fatalf("err: %v", err)
  597. }
  598. defer stream.Close()
  599. if _, err := stream.Write([]byte("foo")); err != nil {
  600. t.Fatalf("err: %v", err)
  601. }
  602. }
  603. // Attempt to open a new stream
  604. errCh := make(chan error, 1)
  605. go func() {
  606. _, err := client.Open()
  607. errCh <- err
  608. }()
  609. // Shutdown the server
  610. go func() {
  611. time.Sleep(10 * time.Millisecond)
  612. server.Close()
  613. }()
  614. select {
  615. case err := <-errCh:
  616. if err == nil {
  617. t.Fatalf("open should fail")
  618. }
  619. case <-time.After(time.Second):
  620. t.Fatalf("timeout")
  621. }
  622. }
  623. func TestKeepAlive(t *testing.T) {
  624. client, server := testClientServer()
  625. defer client.Close()
  626. defer server.Close()
  627. time.Sleep(200 * time.Millisecond)
  628. // Ping value should increase
  629. client.pingLock.Lock()
  630. defer client.pingLock.Unlock()
  631. if client.pingID == 0 {
  632. t.Fatalf("should ping")
  633. }
  634. server.pingLock.Lock()
  635. defer server.pingLock.Unlock()
  636. if server.pingID == 0 {
  637. t.Fatalf("should ping")
  638. }
  639. }
  640. func TestKeepAlive_Timeout(t *testing.T) {
  641. conn1, conn2 := testConn()
  642. clientConf := testConf()
  643. clientConf.ConnectionWriteTimeout = time.Hour // We're testing keep alives, not connection writes
  644. clientConf.EnableKeepAlive = false // Just test one direction, so it's deterministic who hangs up on whom
  645. client, _ := Client(conn1, clientConf)
  646. defer client.Close()
  647. server, _ := Server(conn2, testConf())
  648. defer server.Close()
  649. _ = captureLogs(client) // Client logs aren't part of the test
  650. serverLogs := captureLogs(server)
  651. errCh := make(chan error, 1)
  652. go func() {
  653. _, err := server.Accept() // Wait until server closes
  654. errCh <- err
  655. }()
  656. // Prevent the client from responding
  657. clientConn := client.conn.(*pipeConn)
  658. clientConn.writeBlocker.Lock()
  659. select {
  660. case err := <-errCh:
  661. if err != ErrKeepAliveTimeout {
  662. t.Fatalf("unexpected error: %v", err)
  663. }
  664. case <-time.After(1 * time.Second):
  665. t.Fatalf("timeout waiting for timeout")
  666. }
  667. if !server.IsClosed() {
  668. t.Fatalf("server should have closed")
  669. }
  670. if !serverLogs.match([]string{"[ERR] yamux: keepalive failed: i/o deadline reached"}) {
  671. t.Fatalf("server log incorect: %v", serverLogs.logs())
  672. }
  673. }
  674. func TestLargeWindow(t *testing.T) {
  675. conf := DefaultConfig()
  676. conf.MaxStreamWindowSize *= 2
  677. client, server := testClientServerConfig(conf)
  678. defer client.Close()
  679. defer server.Close()
  680. stream, err := client.Open()
  681. if err != nil {
  682. t.Fatalf("err: %v", err)
  683. }
  684. defer stream.Close()
  685. stream2, err := server.Accept()
  686. if err != nil {
  687. t.Fatalf("err: %v", err)
  688. }
  689. defer stream2.Close()
  690. stream.SetWriteDeadline(time.Now().Add(10 * time.Millisecond))
  691. buf := make([]byte, conf.MaxStreamWindowSize)
  692. n, err := stream.Write(buf)
  693. if err != nil {
  694. t.Fatalf("err: %v", err)
  695. }
  696. if n != len(buf) {
  697. t.Fatalf("short write: %d", n)
  698. }
  699. }
  700. type UnlimitedReader struct{}
  701. func (u *UnlimitedReader) Read(p []byte) (int, error) {
  702. runtime.Gosched()
  703. return len(p), nil
  704. }
  705. func TestSendData_VeryLarge(t *testing.T) {
  706. client, server := testClientServer()
  707. defer client.Close()
  708. defer server.Close()
  709. var n int64 = 1 * 1024 * 1024 * 1024
  710. var workers int = 16
  711. wg := &sync.WaitGroup{}
  712. wg.Add(workers * 2)
  713. for i := 0; i < workers; i++ {
  714. go func() {
  715. defer wg.Done()
  716. stream, err := server.AcceptStream()
  717. if err != nil {
  718. t.Fatalf("err: %v", err)
  719. }
  720. defer stream.Close()
  721. buf := make([]byte, 4)
  722. _, err = stream.Read(buf)
  723. if err != nil {
  724. t.Fatalf("err: %v", err)
  725. }
  726. if !bytes.Equal(buf, []byte{0, 1, 2, 3}) {
  727. t.Fatalf("bad header")
  728. }
  729. recv, err := io.Copy(ioutil.Discard, stream)
  730. if err != nil {
  731. t.Fatalf("err: %v", err)
  732. }
  733. if recv != n {
  734. t.Fatalf("bad: %v", recv)
  735. }
  736. }()
  737. }
  738. for i := 0; i < workers; i++ {
  739. go func() {
  740. defer wg.Done()
  741. stream, err := client.Open()
  742. if err != nil {
  743. t.Fatalf("err: %v", err)
  744. }
  745. defer stream.Close()
  746. _, err = stream.Write([]byte{0, 1, 2, 3})
  747. if err != nil {
  748. t.Fatalf("err: %v", err)
  749. }
  750. unlimited := &UnlimitedReader{}
  751. sent, err := io.Copy(stream, io.LimitReader(unlimited, n))
  752. if err != nil {
  753. t.Fatalf("err: %v", err)
  754. }
  755. if sent != n {
  756. t.Fatalf("bad: %v", sent)
  757. }
  758. }()
  759. }
  760. doneCh := make(chan struct{})
  761. go func() {
  762. wg.Wait()
  763. close(doneCh)
  764. }()
  765. select {
  766. case <-doneCh:
  767. case <-time.After(20 * time.Second):
  768. panic("timeout")
  769. }
  770. }
  771. func TestBacklogExceeded_Accept(t *testing.T) {
  772. client, server := testClientServer()
  773. defer client.Close()
  774. defer server.Close()
  775. max := 5 * client.config.AcceptBacklog
  776. go func() {
  777. for i := 0; i < max; i++ {
  778. stream, err := server.Accept()
  779. if err != nil {
  780. t.Fatalf("err: %v", err)
  781. }
  782. defer stream.Close()
  783. }
  784. }()
  785. // Fill the backlog
  786. for i := 0; i < max; i++ {
  787. stream, err := client.Open()
  788. if err != nil {
  789. t.Fatalf("err: %v", err)
  790. }
  791. defer stream.Close()
  792. if _, err := stream.Write([]byte("foo")); err != nil {
  793. t.Fatalf("err: %v", err)
  794. }
  795. }
  796. }
  797. func TestSession_WindowUpdateWriteDuringRead(t *testing.T) {
  798. client, server := testClientServerConfig(testConfNoKeepAlive())
  799. defer client.Close()
  800. defer server.Close()
  801. var wg sync.WaitGroup
  802. wg.Add(2)
  803. // Choose a huge flood size that we know will result in a window update.
  804. flood := int64(client.config.MaxStreamWindowSize) - 1
  805. // The server will accept a new stream and then flood data to it.
  806. go func() {
  807. defer wg.Done()
  808. stream, err := server.AcceptStream()
  809. if err != nil {
  810. t.Fatalf("err: %v", err)
  811. }
  812. defer stream.Close()
  813. n, err := stream.Write(make([]byte, flood))
  814. if err != nil {
  815. t.Fatalf("err: %v", err)
  816. }
  817. if int64(n) != flood {
  818. t.Fatalf("short write: %d", n)
  819. }
  820. }()
  821. // The client will open a stream, block outbound writes, and then
  822. // listen to the flood from the server, which should time out since
  823. // it won't be able to send the window update.
  824. go func() {
  825. defer wg.Done()
  826. stream, err := client.OpenStream()
  827. if err != nil {
  828. t.Fatalf("err: %v", err)
  829. }
  830. defer stream.Close()
  831. conn := client.conn.(*pipeConn)
  832. conn.writeBlocker.Lock()
  833. _, err = stream.Read(make([]byte, flood))
  834. if err != ErrConnectionWriteTimeout {
  835. t.Fatalf("err: %v", err)
  836. }
  837. }()
  838. wg.Wait()
  839. }
  840. func TestSession_sendNoWait_Timeout(t *testing.T) {
  841. client, server := testClientServerConfig(testConfNoKeepAlive())
  842. defer client.Close()
  843. defer server.Close()
  844. var wg sync.WaitGroup
  845. wg.Add(2)
  846. go func() {
  847. defer wg.Done()
  848. stream, err := server.AcceptStream()
  849. if err != nil {
  850. t.Fatalf("err: %v", err)
  851. }
  852. defer stream.Close()
  853. }()
  854. // The client will open the stream and then block outbound writes, we'll
  855. // probe sendNoWait once it gets into that state.
  856. go func() {
  857. defer wg.Done()
  858. stream, err := client.OpenStream()
  859. if err != nil {
  860. t.Fatalf("err: %v", err)
  861. }
  862. defer stream.Close()
  863. conn := client.conn.(*pipeConn)
  864. conn.writeBlocker.Lock()
  865. hdr := header(make([]byte, headerSize))
  866. hdr.encode(typePing, flagACK, 0, 0)
  867. for {
  868. err = client.sendNoWait(hdr)
  869. if err == nil {
  870. continue
  871. } else if err == ErrConnectionWriteTimeout {
  872. break
  873. } else {
  874. t.Fatalf("err: %v", err)
  875. }
  876. }
  877. }()
  878. wg.Wait()
  879. }
  880. func TestSession_PingOfDeath(t *testing.T) {
  881. client, server := testClientServerConfig(testConfNoKeepAlive())
  882. defer client.Close()
  883. defer server.Close()
  884. var wg sync.WaitGroup
  885. wg.Add(2)
  886. var doPingOfDeath sync.Mutex
  887. doPingOfDeath.Lock()
  888. // This is used later to block outbound writes.
  889. conn := server.conn.(*pipeConn)
  890. // The server will accept a stream, block outbound writes, and then
  891. // flood its send channel so that no more headers can be queued.
  892. go func() {
  893. defer wg.Done()
  894. stream, err := server.AcceptStream()
  895. if err != nil {
  896. t.Fatalf("err: %v", err)
  897. }
  898. defer stream.Close()
  899. conn.writeBlocker.Lock()
  900. for {
  901. hdr := header(make([]byte, headerSize))
  902. hdr.encode(typePing, 0, 0, 0)
  903. err = server.sendNoWait(hdr)
  904. if err == nil {
  905. continue
  906. } else if err == ErrConnectionWriteTimeout {
  907. break
  908. } else {
  909. t.Fatalf("err: %v", err)
  910. }
  911. }
  912. doPingOfDeath.Unlock()
  913. }()
  914. // The client will open a stream and then send the server a ping once it
  915. // can no longer write. This makes sure the server doesn't deadlock reads
  916. // while trying to reply to the ping with no ability to write.
  917. go func() {
  918. defer wg.Done()
  919. stream, err := client.OpenStream()
  920. if err != nil {
  921. t.Fatalf("err: %v", err)
  922. }
  923. defer stream.Close()
  924. // This ping will never unblock because the ping id will never
  925. // show up in a response.
  926. doPingOfDeath.Lock()
  927. go func() { client.Ping() }()
  928. // Wait for a while to make sure the previous ping times out,
  929. // then turn writes back on and make sure a ping works again.
  930. time.Sleep(2 * server.config.ConnectionWriteTimeout)
  931. conn.writeBlocker.Unlock()
  932. if _, err = client.Ping(); err != nil {
  933. t.Fatalf("err: %v", err)
  934. }
  935. }()
  936. wg.Wait()
  937. }
  938. func TestSession_ConnectionWriteTimeout(t *testing.T) {
  939. client, server := testClientServerConfig(testConfNoKeepAlive())
  940. defer client.Close()
  941. defer server.Close()
  942. var wg sync.WaitGroup
  943. wg.Add(2)
  944. go func() {
  945. defer wg.Done()
  946. stream, err := server.AcceptStream()
  947. if err != nil {
  948. t.Fatalf("err: %v", err)
  949. }
  950. defer stream.Close()
  951. }()
  952. // The client will open the stream and then block outbound writes, we'll
  953. // tee up a write and make sure it eventually times out.
  954. go func() {
  955. defer wg.Done()
  956. stream, err := client.OpenStream()
  957. if err != nil {
  958. t.Fatalf("err: %v", err)
  959. }
  960. defer stream.Close()
  961. conn := client.conn.(*pipeConn)
  962. conn.writeBlocker.Lock()
  963. // Since the write goroutine is blocked then this will return a
  964. // timeout since it can't get feedback about whether the write
  965. // worked.
  966. n, err := stream.Write([]byte("hello"))
  967. if err != ErrConnectionWriteTimeout {
  968. t.Fatalf("err: %v", err)
  969. }
  970. if n != 0 {
  971. t.Fatalf("lied about writes: %d", n)
  972. }
  973. }()
  974. wg.Wait()
  975. }