stream.go 7.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368
  1. package yamux
  2. import (
  3. "bytes"
  4. "io"
  5. "sync"
  6. "time"
  7. )
  8. type streamState int
  9. const (
  10. streamInit streamState = iota
  11. streamSYNSent
  12. streamSYNReceived
  13. streamEstablished
  14. streamLocalClose
  15. streamRemoteClose
  16. streamClosed
  17. )
  18. // Stream is used to represent a logical stream
  19. // within a session.
  20. type Stream struct {
  21. id uint32
  22. session *Session
  23. state streamState
  24. lock sync.Mutex
  25. recvBuf bytes.Buffer
  26. sendHdr header
  27. recvWindow uint32
  28. sendWindow uint32
  29. notifyCh chan struct{}
  30. readDeadline time.Time
  31. writeDeadline time.Time
  32. }
  33. // newStream is used to construct a new stream within
  34. // a given session for an ID
  35. func newStream(session *Session, id uint32, state streamState) *Stream {
  36. s := &Stream{
  37. id: id,
  38. session: session,
  39. state: state,
  40. sendHdr: header(make([]byte, headerSize)),
  41. recvWindow: initialStreamWindow,
  42. sendWindow: initialStreamWindow,
  43. notifyCh: make(chan struct{}, 1),
  44. }
  45. return s
  46. }
  47. // Session returns the associated stream session
  48. func (s *Stream) Session() *Session {
  49. return s.session
  50. }
  51. // StreamID returns the ID of this stream
  52. func (s *Stream) StreamID() uint32 {
  53. return s.id
  54. }
  55. // Read is used to read from the stream
  56. func (s *Stream) Read(b []byte) (n int, err error) {
  57. START:
  58. s.lock.Lock()
  59. switch s.state {
  60. case streamRemoteClose:
  61. fallthrough
  62. case streamClosed:
  63. if s.recvBuf.Len() == 0 {
  64. s.lock.Unlock()
  65. return 0, io.EOF
  66. }
  67. }
  68. // If there is no data available, block
  69. if s.recvBuf.Len() == 0 {
  70. s.lock.Unlock()
  71. goto WAIT
  72. }
  73. // Read any bytes
  74. n, _ = s.recvBuf.Read(b)
  75. // Send a window update potentially
  76. err = s.sendWindowUpdate()
  77. s.lock.Unlock()
  78. return n, err
  79. WAIT:
  80. var timeout <-chan time.Time
  81. if !s.readDeadline.IsZero() {
  82. delay := s.readDeadline.Sub(time.Now())
  83. timeout = time.After(delay)
  84. }
  85. select {
  86. case <-s.notifyCh:
  87. goto START
  88. case <-timeout:
  89. return 0, ErrTimeout
  90. }
  91. }
  92. // Write is used to write to the stream
  93. func (s *Stream) Write(b []byte) (n int, err error) {
  94. total := 0
  95. for total < len(b) {
  96. n, err := s.write(b[total:])
  97. total += n
  98. if err != nil {
  99. return total, err
  100. }
  101. }
  102. return total, nil
  103. }
  104. // write is used to write to the stream, may return on
  105. // a short write.
  106. func (s *Stream) write(b []byte) (n int, err error) {
  107. var flags uint16
  108. var max uint32
  109. var body io.Reader
  110. START:
  111. s.lock.Lock()
  112. switch s.state {
  113. case streamLocalClose:
  114. fallthrough
  115. case streamClosed:
  116. s.lock.Unlock()
  117. return 0, ErrStreamClosed
  118. }
  119. // If there is no data available, block
  120. if s.sendWindow == 0 {
  121. s.lock.Unlock()
  122. goto WAIT
  123. }
  124. // Determine the flags if any
  125. flags = s.sendFlags()
  126. // Send up to our send window
  127. max = min(s.sendWindow, uint32(len(b)))
  128. body = bytes.NewReader(b[:max])
  129. // Send the header
  130. s.sendHdr.encode(typeData, flags, s.id, max)
  131. if err := s.session.waitForSend(s.sendHdr, body); err != nil {
  132. s.lock.Unlock()
  133. return 0, err
  134. }
  135. // Reduce our send window
  136. s.sendWindow -= max
  137. // Unlock
  138. s.lock.Unlock()
  139. return int(max), err
  140. WAIT:
  141. var timeout <-chan time.Time
  142. if !s.writeDeadline.IsZero() {
  143. delay := s.writeDeadline.Sub(time.Now())
  144. timeout = time.After(delay)
  145. }
  146. select {
  147. case <-s.notifyCh:
  148. goto START
  149. case <-timeout:
  150. return 0, ErrTimeout
  151. }
  152. return 0, nil
  153. }
  154. // sendFlags determines any flags that are appropriate
  155. // based on the current stream state
  156. func (s *Stream) sendFlags() uint16 {
  157. // Determine the flags if any
  158. var flags uint16
  159. switch s.state {
  160. case streamInit:
  161. flags |= flagSYN
  162. s.state = streamSYNSent
  163. case streamSYNReceived:
  164. flags |= flagACK
  165. s.state = streamEstablished
  166. }
  167. return flags
  168. }
  169. // sendWindowUpdate potentially sends a window update enabling
  170. // further writes to take place. Must be invoked with the lock.
  171. func (s *Stream) sendWindowUpdate() error {
  172. // Determine the delta update
  173. max := s.session.config.MaxStreamWindowSize
  174. delta := max - s.recvWindow
  175. // Determine the flags if any
  176. flags := s.sendFlags()
  177. // Check if we can omit the update
  178. if delta < (max/2) && flags == 0 {
  179. return nil
  180. }
  181. // Send the header
  182. s.sendHdr.encode(typeWindowUpdate, flags, s.id, delta)
  183. if err := s.session.waitForSend(s.sendHdr, nil); err != nil {
  184. return err
  185. }
  186. // Update our window
  187. s.recvWindow += delta
  188. return nil
  189. }
  190. // sendClose is used to send a FIN
  191. func (s *Stream) sendClose() error {
  192. flags := s.sendFlags()
  193. flags |= flagFIN
  194. s.sendHdr.encode(typeWindowUpdate, flags, s.id, 0)
  195. if err := s.session.waitForSend(s.sendHdr, nil); err != nil {
  196. return err
  197. }
  198. return nil
  199. }
  200. // Close is used to close the stream
  201. func (s *Stream) Close() error {
  202. s.lock.Lock()
  203. defer s.lock.Unlock()
  204. switch s.state {
  205. // Local or full close means nothing to do
  206. case streamLocalClose:
  207. fallthrough
  208. case streamClosed:
  209. return nil
  210. // Remote close, weneed to send FIN and we are done
  211. case streamRemoteClose:
  212. s.state = streamClosed
  213. s.session.closeStream(s.id, false)
  214. s.sendClose()
  215. return nil
  216. // Opened means we need to signal a close
  217. case streamSYNSent:
  218. fallthrough
  219. case streamSYNReceived:
  220. fallthrough
  221. case streamEstablished:
  222. s.state = streamLocalClose
  223. s.sendClose()
  224. return nil
  225. }
  226. panic("unhandled state")
  227. }
  228. // forceClose is used for when the session is exiting
  229. func (s *Stream) forceClose() {
  230. s.lock.Lock()
  231. defer s.lock.Unlock()
  232. s.state = streamClosed
  233. asyncNotify(s.notifyCh)
  234. }
  235. // processFlags is used to update the state of the stream
  236. // based on set flags, if any. Lock must be held
  237. func (s *Stream) processFlags(flags uint16) error {
  238. if flags&flagACK == flagACK {
  239. if s.state == streamSYNSent {
  240. s.state = streamEstablished
  241. }
  242. } else if flags&flagFIN == flagFIN {
  243. switch s.state {
  244. case streamSYNSent:
  245. fallthrough
  246. case streamSYNReceived:
  247. fallthrough
  248. case streamEstablished:
  249. s.state = streamRemoteClose
  250. case streamLocalClose:
  251. s.state = streamClosed
  252. s.session.closeStream(s.id, true)
  253. default:
  254. return ErrUnexpectedFlag
  255. }
  256. } else if flags&flagRST == flagRST {
  257. s.state = streamClosed
  258. s.session.closeStream(s.id, true)
  259. }
  260. return nil
  261. }
  262. // incrSendWindow updates the size of our send window
  263. func (s *Stream) incrSendWindow(hdr header, flags uint16) error {
  264. s.lock.Lock()
  265. defer s.lock.Unlock()
  266. if err := s.processFlags(flags); err != nil {
  267. return err
  268. }
  269. // Increase window, unblock a sender
  270. s.sendWindow += hdr.Length()
  271. asyncNotify(s.notifyCh)
  272. return nil
  273. }
  274. // readData is used to handle a data frame
  275. func (s *Stream) readData(hdr header, flags uint16, conn io.Reader) error {
  276. s.lock.Lock()
  277. defer s.lock.Unlock()
  278. if err := s.processFlags(flags); err != nil {
  279. return err
  280. }
  281. // Check that our recv window is not exceeded
  282. length := hdr.Length()
  283. if length > s.recvWindow {
  284. return ErrRecvWindowExceeded
  285. }
  286. // Decrement the receive window
  287. s.recvWindow -= length
  288. // Wrap in a limited reader
  289. conn = &io.LimitedReader{R: conn, N: int64(length)}
  290. // Copy to our buffer
  291. if _, err := io.Copy(&s.recvBuf, conn); err != nil {
  292. return err
  293. }
  294. // Unblock any readers
  295. asyncNotify(s.notifyCh)
  296. return nil
  297. }
  298. // SetDeadline sets the read and write deadlines
  299. func (s *Stream) SetDeadline(t time.Time) error {
  300. if err := s.SetReadDeadline(t); err != nil {
  301. return err
  302. }
  303. if err := s.SetWriteDeadline(t); err != nil {
  304. return err
  305. }
  306. return nil
  307. }
  308. // SetReadDeadline sets the deadline for future Read calls.
  309. func (s *Stream) SetReadDeadline(t time.Time) error {
  310. s.readDeadline = t
  311. return nil
  312. }
  313. // SetWriteDeadline sets the deadline for future Write calls
  314. func (s *Stream) SetWriteDeadline(t time.Time) error {
  315. s.writeDeadline = t
  316. return nil
  317. }