conn.go 1.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172
  1. package entry
  2. import (
  3. "net"
  4. "sync/atomic"
  5. "time"
  6. )
  7. type Conn struct {
  8. buf []byte
  9. conn net.Conn
  10. exitFlag int32
  11. state *State
  12. }
  13. func (c *Conn) Read(b []byte) (n int, err error) {
  14. var m int
  15. if len(c.buf) > 0 {
  16. if len(b) >= len(c.buf) {
  17. m = copy(b[:], c.buf[:])
  18. c.buf = c.buf[m:]
  19. }
  20. }
  21. n, err = c.conn.Read(b[m:])
  22. n += m
  23. atomic.AddInt64(&c.state.Traffic.In, int64(n))
  24. return
  25. }
  26. func (c *Conn) Write(b []byte) (n int, err error) {
  27. n, err = c.conn.Write(b)
  28. atomic.AddInt64(&c.state.Traffic.Out, int64(n))
  29. return
  30. }
  31. func (c *Conn) Close() error {
  32. if atomic.CompareAndSwapInt32(&c.exitFlag, 0, 1) {
  33. atomic.AddInt32(&c.state.Concurrency, -1)
  34. atomic.AddInt64(&c.state.Request.Processed, 1)
  35. return c.conn.Close()
  36. }
  37. return nil
  38. }
  39. func (c *Conn) LocalAddr() net.Addr {
  40. return c.conn.LocalAddr()
  41. }
  42. func (c *Conn) RemoteAddr() net.Addr {
  43. return c.conn.RemoteAddr()
  44. }
  45. func (c *Conn) SetDeadline(t time.Time) error {
  46. return c.conn.SetDeadline(t)
  47. }
  48. func (c *Conn) SetReadDeadline(t time.Time) error {
  49. return c.conn.SetReadDeadline(t)
  50. }
  51. func (c *Conn) SetWriteDeadline(t time.Time) error {
  52. return c.conn.SetWriteDeadline(t)
  53. }
  54. func wrapConn(c net.Conn, state *State, buf []byte) net.Conn {
  55. conn := &Conn{conn: c, buf: buf, state: state}
  56. if buf != nil {
  57. conn.buf = make([]byte, len(buf))
  58. copy(conn.buf, buf)
  59. }
  60. return conn
  61. }