Browse Source

add 32bit supported

sugar 1 năm trước cách đây
mục cha
commit
9c47b20864
6 tập tin đã thay đổi với 77 bổ sung26 xóa
  1. 1 2
      cmd/main.go
  2. 23 11
      entry/cli/server.go
  3. 3 3
      entry/conn.go
  4. 4 3
      entry/gateway.go
  5. 33 0
      entry/state.go
  6. 13 7
      service.go

+ 1 - 2
cmd/main.go

@@ -4,8 +4,8 @@ import (
 	"context"
 	"embed"
 	"flag"
+
 	"git.nspix.com/golang/kos"
-	"git.nspix.com/golang/kos/pkg/log"
 )
 
 //go:embed web
@@ -20,7 +20,6 @@ func (s *subServer) Start(ctx context.Context) (err error) {
 }
 
 func (s *subServer) Stop() (err error) {
-	log.Debugf("stopxxx")
 	return
 }
 

+ 23 - 11
entry/cli/server.go

@@ -4,15 +4,16 @@ import (
 	"context"
 	"errors"
 	"fmt"
-	"git.nspix.com/golang/kos/util/env"
-	"github.com/sourcegraph/conc"
+	"math"
 	"net"
 	"path"
 	"runtime"
 	"strings"
 	"sync"
-	"sync/atomic"
 	"time"
+
+	"git.nspix.com/golang/kos/util/env"
+	"github.com/sourcegraph/conc"
 )
 
 var (
@@ -20,13 +21,14 @@ var (
 )
 
 type Server struct {
-	ctx        context.Context
-	sequence   int64
-	ctxMap     sync.Map
-	waitGroup  conc.WaitGroup
-	middleware []Middleware
-	router     *Router
-	l          net.Listener
+	ctx            context.Context
+	sequenceLocker sync.Mutex
+	sequence       int64
+	ctxMap         sync.Map
+	waitGroup      conc.WaitGroup
+	middleware     []Middleware
+	router         *Router
+	l              net.Listener
 }
 
 func (svr *Server) applyContext() *Context {
@@ -75,6 +77,16 @@ func (svr *Server) handle(ctx *Context, frame *Frame) {
 	}
 }
 
+func (svr *Server) nextSequence() int64 {
+	svr.sequenceLocker.Lock()
+	defer svr.sequenceLocker.Unlock()
+	if svr.sequence >= math.MaxInt64 {
+		svr.sequence = 1
+	}
+	svr.sequence++
+	return svr.sequence
+}
+
 func (svr *Server) process(conn net.Conn) {
 	var (
 		err   error
@@ -82,7 +94,7 @@ func (svr *Server) process(conn net.Conn) {
 		frame *Frame
 	)
 	ctx = svr.applyContext()
-	ctx.reset(atomic.AddInt64(&svr.sequence, 1), conn)
+	ctx.reset(svr.nextSequence(), conn)
 	svr.ctxMap.Store(ctx.Id, ctx)
 	defer func() {
 		_ = conn.Close()

+ 3 - 3
entry/conn.go

@@ -23,20 +23,20 @@ func (c *Conn) Read(b []byte) (n int, err error) {
 	}
 	n, err = c.conn.Read(b[m:])
 	n += m
-	atomic.AddInt64(&c.state.Traffic.In, int64(n))
+	c.state.IncTrafficIn(int64(n))
 	return
 }
 
 func (c *Conn) Write(b []byte) (n int, err error) {
 	n, err = c.conn.Write(b)
-	atomic.AddInt64(&c.state.Traffic.Out, int64(n))
+	c.state.IncTrafficOut(int64(n))
 	return
 }
 
 func (c *Conn) Close() error {
 	if atomic.CompareAndSwapInt32(&c.exitFlag, 0, 1) {
 		atomic.AddInt32(&c.state.Concurrency, -1)
-		atomic.AddInt64(&c.state.Request.Processed, 1)
+		c.state.IncRequestProcessed(1)
 		return c.conn.Close()
 	}
 	return nil

+ 4 - 3
entry/gateway.go

@@ -4,11 +4,12 @@ import (
 	"bytes"
 	"context"
 	"errors"
-	"github.com/sourcegraph/conc"
 	"io"
 	"net"
 	"sync/atomic"
 	"time"
+
+	"github.com/sourcegraph/conc"
 )
 
 const (
@@ -53,7 +54,7 @@ func (gw *Gateway) handle(conn net.Conn) {
 	defer func() {
 		if atomic.LoadInt32(&success) != 1 {
 			atomic.AddInt32(&gw.state.Concurrency, -1)
-			atomic.AddInt64(&gw.state.Request.Discarded, 1)
+			gw.state.IncRequestDiscarded(1)
 			_ = conn.Close()
 		}
 	}()
@@ -93,7 +94,7 @@ func (gw *Gateway) accept() {
 			} else {
 				select {
 				case gw.ch <- conn:
-					atomic.AddInt64(&gw.state.Request.Total, 1)
+					gw.state.IncRequest(1)
 				case <-gw.ctx.Done():
 					return
 				}

+ 33 - 0
entry/state.go

@@ -1,6 +1,9 @@
 package entry
 
+import "sync"
+
 type State struct {
+	mutex       sync.Mutex
 	Accepting   int32 `json:"accepting"`  //是否正在接收连接
 	Processing  int32 `json:"processing"` //是否正在处理连接
 	Concurrency int32 `json:"concurrency"`
@@ -14,3 +17,33 @@ type State struct {
 		Out int64 `json:"out"` //出网流量
 	} `json:"traffic"`
 }
+
+func (s *State) IncRequest(n int64) {
+	s.mutex.Lock()
+	s.Request.Total += n
+	s.mutex.Unlock()
+}
+
+func (s *State) IncRequestProcessed(n int64) {
+	s.mutex.Lock()
+	s.Request.Processed += n
+	s.mutex.Unlock()
+}
+
+func (s *State) IncRequestDiscarded(n int64) {
+	s.mutex.Lock()
+	s.Request.Discarded += n
+	s.mutex.Unlock()
+}
+
+func (s *State) IncTrafficIn(n int64) {
+	s.mutex.Lock()
+	s.Traffic.In += n
+	s.mutex.Unlock()
+}
+
+func (s *State) IncTrafficOut(n int64) {
+	s.mutex.Lock()
+	s.Traffic.Out += n
+	s.mutex.Unlock()
+}

+ 13 - 7
service.go

@@ -5,13 +5,6 @@ import (
 	"errors"
 	"flag"
 	"fmt"
-	"git.nspix.com/golang/kos/entry"
-	"git.nspix.com/golang/kos/entry/cli"
-	"git.nspix.com/golang/kos/entry/http"
-	_ "git.nspix.com/golang/kos/pkg/cache"
-	"git.nspix.com/golang/kos/pkg/log"
-	"git.nspix.com/golang/kos/util/env"
-	"github.com/sourcegraph/conc"
 	"net"
 	"net/http/pprof"
 	"os"
@@ -22,6 +15,14 @@ import (
 	"sync/atomic"
 	"syscall"
 	"time"
+
+	"git.nspix.com/golang/kos/entry"
+	"git.nspix.com/golang/kos/entry/cli"
+	"git.nspix.com/golang/kos/entry/http"
+	_ "git.nspix.com/golang/kos/pkg/cache"
+	"git.nspix.com/golang/kos/pkg/log"
+	"git.nspix.com/golang/kos/util/env"
+	"github.com/sourcegraph/conc"
 )
 
 var (
@@ -273,6 +274,11 @@ func (app *application) preStop() (err error) {
 		return
 	}
 	app.Log().Infof("server stopping")
+	if app.opts.server != nil {
+		if err = app.opts.server.Stop(); err != nil {
+			app.Log().Warnf("app server stop error: %s", err.Error())
+		}
+	}
 	app.cancelFunc(ErrStopping)
 	app.plugins.Range(func(key, value any) bool {
 		if plugin, ok := value.(Plugin); ok {