session_test.go 29 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475
  1. package yamux
  2. import (
  3. "bytes"
  4. "fmt"
  5. "io"
  6. "io/ioutil"
  7. "log"
  8. "net"
  9. "reflect"
  10. "runtime"
  11. "strings"
  12. "sync"
  13. "testing"
  14. "time"
  15. )
  16. type logCapture struct{ bytes.Buffer }
  17. func (l *logCapture) logs() []string {
  18. return strings.Split(strings.TrimSpace(l.String()), "\n")
  19. }
  20. func (l *logCapture) match(expect []string) bool {
  21. return reflect.DeepEqual(l.logs(), expect)
  22. }
  23. func captureLogs(s *Session) *logCapture {
  24. buf := new(logCapture)
  25. s.logger = log.New(buf, "", 0)
  26. return buf
  27. }
  28. type pipeConn struct {
  29. reader *io.PipeReader
  30. writer *io.PipeWriter
  31. writeBlocker sync.Mutex
  32. }
  33. func (p *pipeConn) Read(b []byte) (int, error) {
  34. return p.reader.Read(b)
  35. }
  36. func (p *pipeConn) Write(b []byte) (int, error) {
  37. p.writeBlocker.Lock()
  38. defer p.writeBlocker.Unlock()
  39. return p.writer.Write(b)
  40. }
  41. func (p *pipeConn) Close() error {
  42. p.reader.Close()
  43. return p.writer.Close()
  44. }
  45. func testConn() (io.ReadWriteCloser, io.ReadWriteCloser) {
  46. read1, write1 := io.Pipe()
  47. read2, write2 := io.Pipe()
  48. conn1 := &pipeConn{reader: read1, writer: write2}
  49. conn2 := &pipeConn{reader: read2, writer: write1}
  50. return conn1, conn2
  51. }
  52. func testConf() *Config {
  53. conf := DefaultConfig()
  54. conf.AcceptBacklog = 64
  55. conf.KeepAliveInterval = 100 * time.Millisecond
  56. conf.ConnectionWriteTimeout = 250 * time.Millisecond
  57. return conf
  58. }
  59. func testConfNoKeepAlive() *Config {
  60. conf := testConf()
  61. conf.EnableKeepAlive = false
  62. return conf
  63. }
  64. func testClientServer() (*Session, *Session) {
  65. return testClientServerConfig(testConf())
  66. }
  67. func testClientServerConfig(conf *Config) (*Session, *Session) {
  68. conn1, conn2 := testConn()
  69. client, _ := Client(conn1, conf)
  70. server, _ := Server(conn2, conf)
  71. return client, server
  72. }
  73. func TestPing(t *testing.T) {
  74. client, server := testClientServer()
  75. defer client.Close()
  76. defer server.Close()
  77. rtt, err := client.Ping()
  78. if err != nil {
  79. t.Fatalf("err: %v", err)
  80. }
  81. if rtt == 0 {
  82. t.Fatalf("bad: %v", rtt)
  83. }
  84. rtt, err = server.Ping()
  85. if err != nil {
  86. t.Fatalf("err: %v", err)
  87. }
  88. if rtt == 0 {
  89. t.Fatalf("bad: %v", rtt)
  90. }
  91. }
  92. func TestPing_Timeout(t *testing.T) {
  93. client, server := testClientServerConfig(testConfNoKeepAlive())
  94. defer client.Close()
  95. defer server.Close()
  96. // Prevent the client from responding
  97. clientConn := client.conn.(*pipeConn)
  98. clientConn.writeBlocker.Lock()
  99. errCh := make(chan error, 1)
  100. go func() {
  101. _, err := server.Ping() // Ping via the server session
  102. errCh <- err
  103. }()
  104. select {
  105. case err := <-errCh:
  106. if err != ErrTimeout {
  107. t.Fatalf("err: %v", err)
  108. }
  109. case <-time.After(client.config.ConnectionWriteTimeout * 2):
  110. t.Fatalf("failed to timeout within expected %v", client.config.ConnectionWriteTimeout)
  111. }
  112. // Verify that we recover, even if we gave up
  113. clientConn.writeBlocker.Unlock()
  114. go func() {
  115. _, err := server.Ping() // Ping via the server session
  116. errCh <- err
  117. }()
  118. select {
  119. case err := <-errCh:
  120. if err != nil {
  121. t.Fatalf("err: %v", err)
  122. }
  123. case <-time.After(client.config.ConnectionWriteTimeout):
  124. t.Fatalf("timeout")
  125. }
  126. }
  127. func TestCloseBeforeAck(t *testing.T) {
  128. cfg := testConf()
  129. cfg.AcceptBacklog = 8
  130. client, server := testClientServerConfig(cfg)
  131. defer client.Close()
  132. defer server.Close()
  133. for i := 0; i < 8; i++ {
  134. s, err := client.OpenStream()
  135. if err != nil {
  136. t.Fatal(err)
  137. }
  138. s.Close()
  139. }
  140. for i := 0; i < 8; i++ {
  141. s, err := server.AcceptStream()
  142. if err != nil {
  143. t.Fatal(err)
  144. }
  145. s.Close()
  146. }
  147. done := make(chan struct{})
  148. go func() {
  149. defer close(done)
  150. s, err := client.OpenStream()
  151. if err != nil {
  152. t.Fatal(err)
  153. }
  154. s.Close()
  155. }()
  156. select {
  157. case <-done:
  158. case <-time.After(time.Second * 5):
  159. t.Fatal("timed out trying to open stream")
  160. }
  161. }
  162. func TestAccept(t *testing.T) {
  163. client, server := testClientServer()
  164. defer client.Close()
  165. defer server.Close()
  166. if client.NumStreams() != 0 {
  167. t.Fatalf("bad")
  168. }
  169. if server.NumStreams() != 0 {
  170. t.Fatalf("bad")
  171. }
  172. wg := &sync.WaitGroup{}
  173. wg.Add(4)
  174. go func() {
  175. defer wg.Done()
  176. stream, err := server.AcceptStream()
  177. if err != nil {
  178. t.Fatalf("err: %v", err)
  179. }
  180. if id := stream.StreamID(); id != 1 {
  181. t.Fatalf("bad: %v", id)
  182. }
  183. if err := stream.Close(); err != nil {
  184. t.Fatalf("err: %v", err)
  185. }
  186. }()
  187. go func() {
  188. defer wg.Done()
  189. stream, err := client.AcceptStream()
  190. if err != nil {
  191. t.Fatalf("err: %v", err)
  192. }
  193. if id := stream.StreamID(); id != 2 {
  194. t.Fatalf("bad: %v", id)
  195. }
  196. if err := stream.Close(); err != nil {
  197. t.Fatalf("err: %v", err)
  198. }
  199. }()
  200. go func() {
  201. defer wg.Done()
  202. stream, err := server.OpenStream()
  203. if err != nil {
  204. t.Fatalf("err: %v", err)
  205. }
  206. if id := stream.StreamID(); id != 2 {
  207. t.Fatalf("bad: %v", id)
  208. }
  209. if err := stream.Close(); err != nil {
  210. t.Fatalf("err: %v", err)
  211. }
  212. }()
  213. go func() {
  214. defer wg.Done()
  215. stream, err := client.OpenStream()
  216. if err != nil {
  217. t.Fatalf("err: %v", err)
  218. }
  219. if id := stream.StreamID(); id != 1 {
  220. t.Fatalf("bad: %v", id)
  221. }
  222. if err := stream.Close(); err != nil {
  223. t.Fatalf("err: %v", err)
  224. }
  225. }()
  226. doneCh := make(chan struct{})
  227. go func() {
  228. wg.Wait()
  229. close(doneCh)
  230. }()
  231. select {
  232. case <-doneCh:
  233. case <-time.After(time.Second):
  234. panic("timeout")
  235. }
  236. }
  237. func TestOpenStreamTimeout(t *testing.T) {
  238. const timeout = 25 * time.Millisecond
  239. cfg := testConf()
  240. cfg.StreamOpenTimeout = timeout
  241. client, server := testClientServerConfig(cfg)
  242. defer client.Close()
  243. defer server.Close()
  244. clientLogs := captureLogs(client)
  245. // Open a single stream without a server to acknowledge it.
  246. s, err := client.OpenStream()
  247. if err != nil {
  248. t.Fatal(err)
  249. }
  250. // Sleep for longer than the stream open timeout.
  251. // Since no ACKs are received, the stream and session should be closed.
  252. time.Sleep(timeout * 5)
  253. if !clientLogs.match([]string{"[ERR] yamux: aborted stream open (destination=yamux:remote): i/o deadline reached"}) {
  254. t.Fatalf("server log incorect: %v", clientLogs.logs())
  255. }
  256. if s.state != streamClosed {
  257. t.Fatalf("stream should have been closed")
  258. }
  259. if !client.IsClosed() {
  260. t.Fatalf("session should have been closed")
  261. }
  262. }
  263. func TestClose_closeTimeout(t *testing.T) {
  264. conf := testConf()
  265. conf.StreamCloseTimeout = 10 * time.Millisecond
  266. client, server := testClientServerConfig(conf)
  267. defer client.Close()
  268. defer server.Close()
  269. if client.NumStreams() != 0 {
  270. t.Fatalf("bad")
  271. }
  272. if server.NumStreams() != 0 {
  273. t.Fatalf("bad")
  274. }
  275. wg := &sync.WaitGroup{}
  276. wg.Add(2)
  277. // Open a stream on the client but only close it on the server.
  278. // We want to see if the stream ever gets cleaned up on the client.
  279. var clientStream *Stream
  280. go func() {
  281. defer wg.Done()
  282. var err error
  283. clientStream, err = client.OpenStream()
  284. if err != nil {
  285. t.Fatalf("err: %v", err)
  286. }
  287. }()
  288. go func() {
  289. defer wg.Done()
  290. stream, err := server.AcceptStream()
  291. if err != nil {
  292. t.Fatalf("err: %v", err)
  293. }
  294. if err := stream.Close(); err != nil {
  295. t.Fatalf("err: %v", err)
  296. }
  297. }()
  298. doneCh := make(chan struct{})
  299. go func() {
  300. wg.Wait()
  301. close(doneCh)
  302. }()
  303. select {
  304. case <-doneCh:
  305. case <-time.After(time.Second):
  306. panic("timeout")
  307. }
  308. // We should have zero streams after our timeout period
  309. time.Sleep(100 * time.Millisecond)
  310. if v := server.NumStreams(); v > 0 {
  311. t.Fatalf("should have zero streams: %d", v)
  312. }
  313. if v := client.NumStreams(); v > 0 {
  314. t.Fatalf("should have zero streams: %d", v)
  315. }
  316. if _, err := clientStream.Write([]byte("hello")); err == nil {
  317. t.Fatal("should error on write")
  318. } else if err.Error() != "connection reset" {
  319. t.Fatalf("expected connection reset, got %q", err)
  320. }
  321. }
  322. func TestNonNilInterface(t *testing.T) {
  323. _, server := testClientServer()
  324. server.Close()
  325. conn, err := server.Accept()
  326. if err != nil && conn != nil {
  327. t.Error("bad: accept should return a connection of nil value")
  328. }
  329. conn, err = server.Open()
  330. if err != nil && conn != nil {
  331. t.Error("bad: open should return a connection of nil value")
  332. }
  333. }
  334. func TestSendData_Small(t *testing.T) {
  335. client, server := testClientServer()
  336. defer client.Close()
  337. defer server.Close()
  338. wg := &sync.WaitGroup{}
  339. wg.Add(2)
  340. go func() {
  341. defer wg.Done()
  342. stream, err := server.AcceptStream()
  343. if err != nil {
  344. t.Fatalf("err: %v", err)
  345. }
  346. if server.NumStreams() != 1 {
  347. t.Fatalf("bad")
  348. }
  349. buf := make([]byte, 4)
  350. for i := 0; i < 1000; i++ {
  351. n, err := stream.Read(buf)
  352. if err != nil {
  353. t.Fatalf("err: %v", err)
  354. }
  355. if n != 4 {
  356. t.Fatalf("short read: %d", n)
  357. }
  358. if string(buf) != "test" {
  359. t.Fatalf("bad: %s", buf)
  360. }
  361. }
  362. if err := stream.Close(); err != nil {
  363. t.Fatalf("err: %v", err)
  364. }
  365. }()
  366. go func() {
  367. defer wg.Done()
  368. stream, err := client.Open()
  369. if err != nil {
  370. t.Fatalf("err: %v", err)
  371. }
  372. if client.NumStreams() != 1 {
  373. t.Fatalf("bad")
  374. }
  375. for i := 0; i < 1000; i++ {
  376. n, err := stream.Write([]byte("test"))
  377. if err != nil {
  378. t.Fatalf("err: %v", err)
  379. }
  380. if n != 4 {
  381. t.Fatalf("short write %d", n)
  382. }
  383. }
  384. if err := stream.Close(); err != nil {
  385. t.Fatalf("err: %v", err)
  386. }
  387. }()
  388. doneCh := make(chan struct{})
  389. go func() {
  390. wg.Wait()
  391. close(doneCh)
  392. }()
  393. select {
  394. case <-doneCh:
  395. case <-time.After(time.Second):
  396. panic("timeout")
  397. }
  398. if client.NumStreams() != 0 {
  399. t.Fatalf("bad")
  400. }
  401. if server.NumStreams() != 0 {
  402. t.Fatalf("bad")
  403. }
  404. }
  405. func TestSendData_Large(t *testing.T) {
  406. client, server := testClientServer()
  407. defer client.Close()
  408. defer server.Close()
  409. const (
  410. sendSize = 250 * 1024 * 1024
  411. recvSize = 4 * 1024
  412. )
  413. data := make([]byte, sendSize)
  414. for idx := range data {
  415. data[idx] = byte(idx % 256)
  416. }
  417. wg := &sync.WaitGroup{}
  418. wg.Add(2)
  419. go func() {
  420. defer wg.Done()
  421. stream, err := server.AcceptStream()
  422. if err != nil {
  423. t.Fatalf("err: %v", err)
  424. }
  425. var sz int
  426. buf := make([]byte, recvSize)
  427. for i := 0; i < sendSize/recvSize; i++ {
  428. n, err := stream.Read(buf)
  429. if err != nil {
  430. t.Fatalf("err: %v", err)
  431. }
  432. if n != recvSize {
  433. t.Fatalf("short read: %d", n)
  434. }
  435. sz += n
  436. for idx := range buf {
  437. if buf[idx] != byte(idx%256) {
  438. t.Fatalf("bad: %v %v %v", i, idx, buf[idx])
  439. }
  440. }
  441. }
  442. if err := stream.Close(); err != nil {
  443. t.Fatalf("err: %v", err)
  444. }
  445. t.Logf("cap=%d, n=%d\n", stream.recvBuf.Cap(), sz)
  446. }()
  447. go func() {
  448. defer wg.Done()
  449. stream, err := client.Open()
  450. if err != nil {
  451. t.Fatalf("err: %v", err)
  452. }
  453. n, err := stream.Write(data)
  454. if err != nil {
  455. t.Fatalf("err: %v", err)
  456. }
  457. if n != len(data) {
  458. t.Fatalf("short write %d", n)
  459. }
  460. if err := stream.Close(); err != nil {
  461. t.Fatalf("err: %v", err)
  462. }
  463. }()
  464. doneCh := make(chan struct{})
  465. go func() {
  466. wg.Wait()
  467. close(doneCh)
  468. }()
  469. select {
  470. case <-doneCh:
  471. return
  472. case <-time.After(5 * time.Second):
  473. panic("timeout")
  474. }
  475. }
  476. func TestGoAway(t *testing.T) {
  477. client, server := testClientServer()
  478. defer client.Close()
  479. defer server.Close()
  480. if err := server.GoAway(); err != nil {
  481. t.Fatalf("err: %v", err)
  482. }
  483. _, err := client.Open()
  484. if err != ErrRemoteGoAway {
  485. t.Fatalf("err: %v", err)
  486. }
  487. }
  488. func TestManyStreams(t *testing.T) {
  489. client, server := testClientServer()
  490. defer client.Close()
  491. defer server.Close()
  492. wg := &sync.WaitGroup{}
  493. acceptor := func(i int) {
  494. defer wg.Done()
  495. stream, err := server.AcceptStream()
  496. if err != nil {
  497. t.Fatalf("err: %v", err)
  498. }
  499. defer stream.Close()
  500. buf := make([]byte, 512)
  501. for {
  502. n, err := stream.Read(buf)
  503. if err == io.EOF {
  504. return
  505. }
  506. if err != nil {
  507. t.Fatalf("err: %v", err)
  508. }
  509. if n == 0 {
  510. t.Fatalf("err: %v", err)
  511. }
  512. }
  513. }
  514. sender := func(i int) {
  515. defer wg.Done()
  516. stream, err := client.Open()
  517. if err != nil {
  518. t.Fatalf("err: %v", err)
  519. }
  520. defer stream.Close()
  521. msg := fmt.Sprintf("%08d", i)
  522. for i := 0; i < 1000; i++ {
  523. n, err := stream.Write([]byte(msg))
  524. if err != nil {
  525. t.Fatalf("err: %v", err)
  526. }
  527. if n != len(msg) {
  528. t.Fatalf("short write %d", n)
  529. }
  530. }
  531. }
  532. for i := 0; i < 50; i++ {
  533. wg.Add(2)
  534. go acceptor(i)
  535. go sender(i)
  536. }
  537. wg.Wait()
  538. }
  539. func TestManyStreams_PingPong(t *testing.T) {
  540. client, server := testClientServer()
  541. defer client.Close()
  542. defer server.Close()
  543. wg := &sync.WaitGroup{}
  544. ping := []byte("ping")
  545. pong := []byte("pong")
  546. acceptor := func(i int) {
  547. defer wg.Done()
  548. stream, err := server.AcceptStream()
  549. if err != nil {
  550. t.Fatalf("err: %v", err)
  551. }
  552. defer stream.Close()
  553. buf := make([]byte, 4)
  554. for {
  555. // Read the 'ping'
  556. n, err := stream.Read(buf)
  557. if err == io.EOF {
  558. return
  559. }
  560. if err != nil {
  561. t.Fatalf("err: %v", err)
  562. }
  563. if n != 4 {
  564. t.Fatalf("err: %v", err)
  565. }
  566. if !bytes.Equal(buf, ping) {
  567. t.Fatalf("bad: %s", buf)
  568. }
  569. // Shrink the internal buffer!
  570. stream.Shrink()
  571. // Write out the 'pong'
  572. n, err = stream.Write(pong)
  573. if err != nil {
  574. t.Fatalf("err: %v", err)
  575. }
  576. if n != 4 {
  577. t.Fatalf("err: %v", err)
  578. }
  579. }
  580. }
  581. sender := func(i int) {
  582. defer wg.Done()
  583. stream, err := client.OpenStream()
  584. if err != nil {
  585. t.Fatalf("err: %v", err)
  586. }
  587. defer stream.Close()
  588. buf := make([]byte, 4)
  589. for i := 0; i < 1000; i++ {
  590. // Send the 'ping'
  591. n, err := stream.Write(ping)
  592. if err != nil {
  593. t.Fatalf("err: %v", err)
  594. }
  595. if n != 4 {
  596. t.Fatalf("short write %d", n)
  597. }
  598. // Read the 'pong'
  599. n, err = stream.Read(buf)
  600. if err != nil {
  601. t.Fatalf("err: %v", err)
  602. }
  603. if n != 4 {
  604. t.Fatalf("err: %v", err)
  605. }
  606. if !bytes.Equal(buf, pong) {
  607. t.Fatalf("bad: %s", buf)
  608. }
  609. // Shrink the buffer
  610. stream.Shrink()
  611. }
  612. }
  613. for i := 0; i < 50; i++ {
  614. wg.Add(2)
  615. go acceptor(i)
  616. go sender(i)
  617. }
  618. wg.Wait()
  619. }
  620. func TestHalfClose(t *testing.T) {
  621. client, server := testClientServer()
  622. defer client.Close()
  623. defer server.Close()
  624. stream, err := client.Open()
  625. if err != nil {
  626. t.Fatalf("err: %v", err)
  627. }
  628. if _, err = stream.Write([]byte("a")); err != nil {
  629. t.Fatalf("err: %v", err)
  630. }
  631. stream2, err := server.Accept()
  632. if err != nil {
  633. t.Fatalf("err: %v", err)
  634. }
  635. stream2.Close() // Half close
  636. buf := make([]byte, 4)
  637. n, err := stream2.Read(buf)
  638. if err != nil {
  639. t.Fatalf("err: %v", err)
  640. }
  641. if n != 1 {
  642. t.Fatalf("bad: %v", n)
  643. }
  644. // Send more
  645. if _, err = stream.Write([]byte("bcd")); err != nil {
  646. t.Fatalf("err: %v", err)
  647. }
  648. stream.Close()
  649. // Read after close
  650. n, err = stream2.Read(buf)
  651. if err != nil {
  652. t.Fatalf("err: %v", err)
  653. }
  654. if n != 3 {
  655. t.Fatalf("bad: %v", n)
  656. }
  657. // EOF after close
  658. n, err = stream2.Read(buf)
  659. if err != io.EOF {
  660. t.Fatalf("err: %v", err)
  661. }
  662. if n != 0 {
  663. t.Fatalf("bad: %v", n)
  664. }
  665. }
  666. func TestReadDeadline(t *testing.T) {
  667. client, server := testClientServer()
  668. defer client.Close()
  669. defer server.Close()
  670. stream, err := client.Open()
  671. if err != nil {
  672. t.Fatalf("err: %v", err)
  673. }
  674. defer stream.Close()
  675. stream2, err := server.Accept()
  676. if err != nil {
  677. t.Fatalf("err: %v", err)
  678. }
  679. defer stream2.Close()
  680. if err := stream.SetReadDeadline(time.Now().Add(5 * time.Millisecond)); err != nil {
  681. t.Fatalf("err: %v", err)
  682. }
  683. buf := make([]byte, 4)
  684. _, err = stream.Read(buf)
  685. if err != ErrTimeout {
  686. t.Fatalf("err: %v", err)
  687. }
  688. // See https://github.com/hashicorp/yamux/issues/90
  689. // The standard library's http server package will read from connections in
  690. // the background to detect if they are alive.
  691. //
  692. // It sets a read deadline on connections and detect if the returned error
  693. // is a network timeout error which implements net.Error.
  694. //
  695. // The HTTP server will cancel all server requests if it isn't timeout error
  696. // from the connection.
  697. //
  698. // We assert that we return an error meeting the interface to avoid
  699. // accidently breaking yamux session compatability with the standard
  700. // library's http server implementation.
  701. if netErr, ok := err.(net.Error); !ok || !netErr.Timeout() {
  702. t.Fatalf("reading timeout error is expected to implement net.Error and return true when calling Timeout()")
  703. }
  704. }
  705. func TestReadDeadline_BlockedRead(t *testing.T) {
  706. client, server := testClientServer()
  707. defer client.Close()
  708. defer server.Close()
  709. stream, err := client.Open()
  710. if err != nil {
  711. t.Fatalf("err: %v", err)
  712. }
  713. defer stream.Close()
  714. stream2, err := server.Accept()
  715. if err != nil {
  716. t.Fatalf("err: %v", err)
  717. }
  718. defer stream2.Close()
  719. // Start a read that will block
  720. errCh := make(chan error, 1)
  721. go func() {
  722. buf := make([]byte, 4)
  723. _, err := stream.Read(buf)
  724. errCh <- err
  725. close(errCh)
  726. }()
  727. // Wait to ensure the read has started.
  728. time.Sleep(5 * time.Millisecond)
  729. // Update the read deadline
  730. if err := stream.SetReadDeadline(time.Now().Add(5 * time.Millisecond)); err != nil {
  731. t.Fatalf("err: %v", err)
  732. }
  733. select {
  734. case <-time.After(100 * time.Millisecond):
  735. t.Fatal("expected read timeout")
  736. case err := <-errCh:
  737. if err != ErrTimeout {
  738. t.Fatalf("expected ErrTimeout; got %v", err)
  739. }
  740. }
  741. }
  742. func TestWriteDeadline(t *testing.T) {
  743. client, server := testClientServer()
  744. defer client.Close()
  745. defer server.Close()
  746. stream, err := client.Open()
  747. if err != nil {
  748. t.Fatalf("err: %v", err)
  749. }
  750. defer stream.Close()
  751. stream2, err := server.Accept()
  752. if err != nil {
  753. t.Fatalf("err: %v", err)
  754. }
  755. defer stream2.Close()
  756. if err := stream.SetWriteDeadline(time.Now().Add(50 * time.Millisecond)); err != nil {
  757. t.Fatalf("err: %v", err)
  758. }
  759. buf := make([]byte, 512)
  760. for i := 0; i < int(initialStreamWindow); i++ {
  761. _, err := stream.Write(buf)
  762. if err != nil && err == ErrTimeout {
  763. return
  764. } else if err != nil {
  765. t.Fatalf("err: %v", err)
  766. }
  767. }
  768. t.Fatalf("Expected timeout")
  769. }
  770. func TestWriteDeadline_BlockedWrite(t *testing.T) {
  771. client, server := testClientServer()
  772. defer client.Close()
  773. defer server.Close()
  774. stream, err := client.Open()
  775. if err != nil {
  776. t.Fatalf("err: %v", err)
  777. }
  778. defer stream.Close()
  779. stream2, err := server.Accept()
  780. if err != nil {
  781. t.Fatalf("err: %v", err)
  782. }
  783. defer stream2.Close()
  784. // Start a goroutine making writes that will block
  785. errCh := make(chan error, 1)
  786. go func() {
  787. buf := make([]byte, 512)
  788. for i := 0; i < int(initialStreamWindow); i++ {
  789. _, err := stream.Write(buf)
  790. if err == nil {
  791. continue
  792. }
  793. errCh <- err
  794. close(errCh)
  795. return
  796. }
  797. close(errCh)
  798. }()
  799. // Wait to ensure the write has started.
  800. time.Sleep(5 * time.Millisecond)
  801. // Update the write deadline
  802. if err := stream.SetWriteDeadline(time.Now().Add(5 * time.Millisecond)); err != nil {
  803. t.Fatalf("err: %v", err)
  804. }
  805. select {
  806. case <-time.After(1 * time.Second):
  807. t.Fatal("expected write timeout")
  808. case err := <-errCh:
  809. if err != ErrTimeout {
  810. t.Fatalf("expected ErrTimeout; got %v", err)
  811. }
  812. }
  813. }
  814. func TestBacklogExceeded(t *testing.T) {
  815. client, server := testClientServer()
  816. defer client.Close()
  817. defer server.Close()
  818. // Fill the backlog
  819. max := client.config.AcceptBacklog
  820. for i := 0; i < max; i++ {
  821. stream, err := client.Open()
  822. if err != nil {
  823. t.Fatalf("err: %v", err)
  824. }
  825. defer stream.Close()
  826. if _, err := stream.Write([]byte("foo")); err != nil {
  827. t.Fatalf("err: %v", err)
  828. }
  829. }
  830. // Attempt to open a new stream
  831. errCh := make(chan error, 1)
  832. go func() {
  833. _, err := client.Open()
  834. errCh <- err
  835. }()
  836. // Shutdown the server
  837. go func() {
  838. time.Sleep(10 * time.Millisecond)
  839. server.Close()
  840. }()
  841. select {
  842. case err := <-errCh:
  843. if err == nil {
  844. t.Fatalf("open should fail")
  845. }
  846. case <-time.After(time.Second):
  847. t.Fatalf("timeout")
  848. }
  849. }
  850. func TestKeepAlive(t *testing.T) {
  851. client, server := testClientServer()
  852. defer client.Close()
  853. defer server.Close()
  854. time.Sleep(200 * time.Millisecond)
  855. // Ping value should increase
  856. client.pingLock.Lock()
  857. defer client.pingLock.Unlock()
  858. if client.pingID == 0 {
  859. t.Fatalf("should ping")
  860. }
  861. server.pingLock.Lock()
  862. defer server.pingLock.Unlock()
  863. if server.pingID == 0 {
  864. t.Fatalf("should ping")
  865. }
  866. }
  867. func TestKeepAlive_Timeout(t *testing.T) {
  868. conn1, conn2 := testConn()
  869. clientConf := testConf()
  870. clientConf.ConnectionWriteTimeout = time.Hour // We're testing keep alives, not connection writes
  871. clientConf.EnableKeepAlive = false // Just test one direction, so it's deterministic who hangs up on whom
  872. client, _ := Client(conn1, clientConf)
  873. defer client.Close()
  874. server, _ := Server(conn2, testConf())
  875. defer server.Close()
  876. _ = captureLogs(client) // Client logs aren't part of the test
  877. serverLogs := captureLogs(server)
  878. errCh := make(chan error, 1)
  879. go func() {
  880. _, err := server.Accept() // Wait until server closes
  881. errCh <- err
  882. }()
  883. // Prevent the client from responding
  884. clientConn := client.conn.(*pipeConn)
  885. clientConn.writeBlocker.Lock()
  886. select {
  887. case err := <-errCh:
  888. if err != ErrKeepAliveTimeout {
  889. t.Fatalf("unexpected error: %v", err)
  890. }
  891. case <-time.After(1 * time.Second):
  892. t.Fatalf("timeout waiting for timeout")
  893. }
  894. if !server.IsClosed() {
  895. t.Fatalf("server should have closed")
  896. }
  897. if !serverLogs.match([]string{"[ERR] yamux: keepalive failed: i/o deadline reached"}) {
  898. t.Fatalf("server log incorect: %v", serverLogs.logs())
  899. }
  900. }
  901. func TestLargeWindow(t *testing.T) {
  902. conf := DefaultConfig()
  903. conf.MaxStreamWindowSize *= 2
  904. client, server := testClientServerConfig(conf)
  905. defer client.Close()
  906. defer server.Close()
  907. stream, err := client.Open()
  908. if err != nil {
  909. t.Fatalf("err: %v", err)
  910. }
  911. defer stream.Close()
  912. stream2, err := server.Accept()
  913. if err != nil {
  914. t.Fatalf("err: %v", err)
  915. }
  916. defer stream2.Close()
  917. stream.SetWriteDeadline(time.Now().Add(10 * time.Millisecond))
  918. buf := make([]byte, conf.MaxStreamWindowSize)
  919. n, err := stream.Write(buf)
  920. if err != nil {
  921. t.Fatalf("err: %v", err)
  922. }
  923. if n != len(buf) {
  924. t.Fatalf("short write: %d", n)
  925. }
  926. }
  927. type UnlimitedReader struct{}
  928. func (u *UnlimitedReader) Read(p []byte) (int, error) {
  929. runtime.Gosched()
  930. return len(p), nil
  931. }
  932. func TestSendData_VeryLarge(t *testing.T) {
  933. client, server := testClientServer()
  934. defer client.Close()
  935. defer server.Close()
  936. var n int64 = 1 * 1024 * 1024 * 1024
  937. var workers int = 16
  938. wg := &sync.WaitGroup{}
  939. wg.Add(workers * 2)
  940. for i := 0; i < workers; i++ {
  941. go func() {
  942. defer wg.Done()
  943. stream, err := server.AcceptStream()
  944. if err != nil {
  945. t.Fatalf("err: %v", err)
  946. }
  947. defer stream.Close()
  948. buf := make([]byte, 4)
  949. _, err = stream.Read(buf)
  950. if err != nil {
  951. t.Fatalf("err: %v", err)
  952. }
  953. if !bytes.Equal(buf, []byte{0, 1, 2, 3}) {
  954. t.Fatalf("bad header")
  955. }
  956. recv, err := io.Copy(ioutil.Discard, stream)
  957. if err != nil {
  958. t.Fatalf("err: %v", err)
  959. }
  960. if recv != n {
  961. t.Fatalf("bad: %v", recv)
  962. }
  963. }()
  964. }
  965. for i := 0; i < workers; i++ {
  966. go func() {
  967. defer wg.Done()
  968. stream, err := client.Open()
  969. if err != nil {
  970. t.Fatalf("err: %v", err)
  971. }
  972. defer stream.Close()
  973. _, err = stream.Write([]byte{0, 1, 2, 3})
  974. if err != nil {
  975. t.Fatalf("err: %v", err)
  976. }
  977. unlimited := &UnlimitedReader{}
  978. sent, err := io.Copy(stream, io.LimitReader(unlimited, n))
  979. if err != nil {
  980. t.Fatalf("err: %v", err)
  981. }
  982. if sent != n {
  983. t.Fatalf("bad: %v", sent)
  984. }
  985. }()
  986. }
  987. doneCh := make(chan struct{})
  988. go func() {
  989. wg.Wait()
  990. close(doneCh)
  991. }()
  992. select {
  993. case <-doneCh:
  994. case <-time.After(20 * time.Second):
  995. panic("timeout")
  996. }
  997. }
  998. func TestBacklogExceeded_Accept(t *testing.T) {
  999. client, server := testClientServer()
  1000. defer client.Close()
  1001. defer server.Close()
  1002. max := 5 * client.config.AcceptBacklog
  1003. go func() {
  1004. for i := 0; i < max; i++ {
  1005. stream, err := server.Accept()
  1006. if err != nil {
  1007. t.Fatalf("err: %v", err)
  1008. }
  1009. defer stream.Close()
  1010. }
  1011. }()
  1012. // Fill the backlog
  1013. for i := 0; i < max; i++ {
  1014. stream, err := client.Open()
  1015. if err != nil {
  1016. t.Fatalf("err: %v", err)
  1017. }
  1018. defer stream.Close()
  1019. if _, err := stream.Write([]byte("foo")); err != nil {
  1020. t.Fatalf("err: %v", err)
  1021. }
  1022. }
  1023. }
  1024. func TestSession_WindowUpdateWriteDuringRead(t *testing.T) {
  1025. client, server := testClientServerConfig(testConfNoKeepAlive())
  1026. defer client.Close()
  1027. defer server.Close()
  1028. var wg sync.WaitGroup
  1029. wg.Add(2)
  1030. // Choose a huge flood size that we know will result in a window update.
  1031. flood := int64(client.config.MaxStreamWindowSize) - 1
  1032. // The server will accept a new stream and then flood data to it.
  1033. go func() {
  1034. defer wg.Done()
  1035. stream, err := server.AcceptStream()
  1036. if err != nil {
  1037. t.Fatalf("err: %v", err)
  1038. }
  1039. defer stream.Close()
  1040. n, err := stream.Write(make([]byte, flood))
  1041. if err != nil {
  1042. t.Fatalf("err: %v", err)
  1043. }
  1044. if int64(n) != flood {
  1045. t.Fatalf("short write: %d", n)
  1046. }
  1047. }()
  1048. // The client will open a stream, block outbound writes, and then
  1049. // listen to the flood from the server, which should time out since
  1050. // it won't be able to send the window update.
  1051. go func() {
  1052. defer wg.Done()
  1053. stream, err := client.OpenStream()
  1054. if err != nil {
  1055. t.Fatalf("err: %v", err)
  1056. }
  1057. defer stream.Close()
  1058. conn := client.conn.(*pipeConn)
  1059. conn.writeBlocker.Lock()
  1060. _, err = stream.Read(make([]byte, flood))
  1061. if err != ErrConnectionWriteTimeout {
  1062. t.Fatalf("err: %v", err)
  1063. }
  1064. }()
  1065. wg.Wait()
  1066. }
  1067. func TestSession_PartialReadWindowUpdate(t *testing.T) {
  1068. client, server := testClientServerConfig(testConfNoKeepAlive())
  1069. defer client.Close()
  1070. defer server.Close()
  1071. var wg sync.WaitGroup
  1072. wg.Add(1)
  1073. // Choose a huge flood size that we know will result in a window update.
  1074. flood := int64(client.config.MaxStreamWindowSize)
  1075. var wr *Stream
  1076. // The server will accept a new stream and then flood data to it.
  1077. go func() {
  1078. defer wg.Done()
  1079. var err error
  1080. wr, err = server.AcceptStream()
  1081. if err != nil {
  1082. t.Fatalf("err: %v", err)
  1083. }
  1084. defer wr.Close()
  1085. if wr.sendWindow != client.config.MaxStreamWindowSize {
  1086. t.Fatalf("sendWindow: exp=%d, got=%d", client.config.MaxStreamWindowSize, wr.sendWindow)
  1087. }
  1088. n, err := wr.Write(make([]byte, flood))
  1089. if err != nil {
  1090. t.Fatalf("err: %v", err)
  1091. }
  1092. if int64(n) != flood {
  1093. t.Fatalf("short write: %d", n)
  1094. }
  1095. if wr.sendWindow != 0 {
  1096. t.Fatalf("sendWindow: exp=%d, got=%d", 0, wr.sendWindow)
  1097. }
  1098. }()
  1099. stream, err := client.OpenStream()
  1100. if err != nil {
  1101. t.Fatalf("err: %v", err)
  1102. }
  1103. defer stream.Close()
  1104. wg.Wait()
  1105. _, err = stream.Read(make([]byte, flood/2+1))
  1106. if exp := uint32(flood/2 + 1); wr.sendWindow != exp {
  1107. t.Errorf("sendWindow: exp=%d, got=%d", exp, wr.sendWindow)
  1108. }
  1109. }
  1110. func TestSession_sendNoWait_Timeout(t *testing.T) {
  1111. client, server := testClientServerConfig(testConfNoKeepAlive())
  1112. defer client.Close()
  1113. defer server.Close()
  1114. var wg sync.WaitGroup
  1115. wg.Add(2)
  1116. go func() {
  1117. defer wg.Done()
  1118. stream, err := server.AcceptStream()
  1119. if err != nil {
  1120. t.Fatalf("err: %v", err)
  1121. }
  1122. defer stream.Close()
  1123. }()
  1124. // The client will open the stream and then block outbound writes, we'll
  1125. // probe sendNoWait once it gets into that state.
  1126. go func() {
  1127. defer wg.Done()
  1128. stream, err := client.OpenStream()
  1129. if err != nil {
  1130. t.Fatalf("err: %v", err)
  1131. }
  1132. defer stream.Close()
  1133. conn := client.conn.(*pipeConn)
  1134. conn.writeBlocker.Lock()
  1135. hdr := header(make([]byte, headerSize))
  1136. hdr.encode(typePing, flagACK, 0, 0)
  1137. for {
  1138. err = client.sendNoWait(hdr)
  1139. if err == nil {
  1140. continue
  1141. } else if err == ErrConnectionWriteTimeout {
  1142. break
  1143. } else {
  1144. t.Fatalf("err: %v", err)
  1145. }
  1146. }
  1147. }()
  1148. wg.Wait()
  1149. }
  1150. func TestSession_PingOfDeath(t *testing.T) {
  1151. client, server := testClientServerConfig(testConfNoKeepAlive())
  1152. defer client.Close()
  1153. defer server.Close()
  1154. var wg sync.WaitGroup
  1155. wg.Add(2)
  1156. var doPingOfDeath sync.Mutex
  1157. doPingOfDeath.Lock()
  1158. // This is used later to block outbound writes.
  1159. conn := server.conn.(*pipeConn)
  1160. // The server will accept a stream, block outbound writes, and then
  1161. // flood its send channel so that no more headers can be queued.
  1162. go func() {
  1163. defer wg.Done()
  1164. stream, err := server.AcceptStream()
  1165. if err != nil {
  1166. t.Fatalf("err: %v", err)
  1167. }
  1168. defer stream.Close()
  1169. conn.writeBlocker.Lock()
  1170. for {
  1171. hdr := header(make([]byte, headerSize))
  1172. hdr.encode(typePing, 0, 0, 0)
  1173. err = server.sendNoWait(hdr)
  1174. if err == nil {
  1175. continue
  1176. } else if err == ErrConnectionWriteTimeout {
  1177. break
  1178. } else {
  1179. t.Fatalf("err: %v", err)
  1180. }
  1181. }
  1182. doPingOfDeath.Unlock()
  1183. }()
  1184. // The client will open a stream and then send the server a ping once it
  1185. // can no longer write. This makes sure the server doesn't deadlock reads
  1186. // while trying to reply to the ping with no ability to write.
  1187. go func() {
  1188. defer wg.Done()
  1189. stream, err := client.OpenStream()
  1190. if err != nil {
  1191. t.Fatalf("err: %v", err)
  1192. }
  1193. defer stream.Close()
  1194. // This ping will never unblock because the ping id will never
  1195. // show up in a response.
  1196. doPingOfDeath.Lock()
  1197. go func() { client.Ping() }()
  1198. // Wait for a while to make sure the previous ping times out,
  1199. // then turn writes back on and make sure a ping works again.
  1200. time.Sleep(2 * server.config.ConnectionWriteTimeout)
  1201. conn.writeBlocker.Unlock()
  1202. if _, err = client.Ping(); err != nil {
  1203. t.Fatalf("err: %v", err)
  1204. }
  1205. }()
  1206. wg.Wait()
  1207. }
  1208. func TestSession_ConnectionWriteTimeout(t *testing.T) {
  1209. client, server := testClientServerConfig(testConfNoKeepAlive())
  1210. defer client.Close()
  1211. defer server.Close()
  1212. var wg sync.WaitGroup
  1213. wg.Add(2)
  1214. go func() {
  1215. defer wg.Done()
  1216. stream, err := server.AcceptStream()
  1217. if err != nil {
  1218. t.Fatalf("err: %v", err)
  1219. }
  1220. defer stream.Close()
  1221. }()
  1222. // The client will open the stream and then block outbound writes, we'll
  1223. // tee up a write and make sure it eventually times out.
  1224. go func() {
  1225. defer wg.Done()
  1226. stream, err := client.OpenStream()
  1227. if err != nil {
  1228. t.Fatalf("err: %v", err)
  1229. }
  1230. defer stream.Close()
  1231. conn := client.conn.(*pipeConn)
  1232. conn.writeBlocker.Lock()
  1233. // Since the write goroutine is blocked then this will return a
  1234. // timeout since it can't get feedback about whether the write
  1235. // worked.
  1236. n, err := stream.Write([]byte("hello"))
  1237. if err != ErrConnectionWriteTimeout {
  1238. t.Fatalf("err: %v", err)
  1239. }
  1240. if n != 0 {
  1241. t.Fatalf("lied about writes: %d", n)
  1242. }
  1243. }()
  1244. wg.Wait()
  1245. }