package kos import ( "context" "errors" "flag" "fmt" "net" "net/http/pprof" "os" "os/signal" "runtime" "strconv" "sync" "sync/atomic" "syscall" "time" "git.nspix.com/golang/kos/entry" "git.nspix.com/golang/kos/entry/cli" "git.nspix.com/golang/kos/entry/http" _ "git.nspix.com/golang/kos/pkg/cache" "git.nspix.com/golang/kos/pkg/log" "git.nspix.com/golang/kos/util/env" "github.com/sourcegraph/conc" ) var ( ErrStopping = errors.New("stopping") cliFlag = flag.Bool("cli", false, "Go application interactive mode") ) type ( application struct { ctx context.Context cancelFunc context.CancelCauseFunc opts *Options gateway *entry.Gateway http *http.Server command *cli.Server uptime time.Time info *Info plugins sync.Map waitGroup conc.WaitGroup exitFlag int32 } ) func (app *application) Log() log.Logger { return log.GetLogger() } func (app *application) Healthy() string { if atomic.LoadInt32(&app.gateway.State().Processing) == 1 && atomic.LoadInt32(&app.gateway.State().Accepting) == 1 { return StateHealthy } if atomic.LoadInt32(&app.gateway.State().Processing) == 1 { return StateNoAccepting } if atomic.LoadInt32(&app.gateway.State().Accepting) == 1 { return StateNoProgress } return StateUnavailable } func (app *application) Info() *Info { return app.info } func (app *application) Http() *http.Server { return app.http } func (app *application) Command() *cli.Server { return app.command } func (app *application) Handle(path string, cb HandleFunc) { if app.http != nil { app.http.Handle(http.MethodPost, path, func(ctx *http.Context) (err error) { return cb(ctx) }) } if app.command != nil { app.command.Handle(path, "", func(ctx *cli.Context) (err error) { return cb(ctx) }) } } func (app *application) httpServe() (err error) { var ( l net.Listener ) app.http = http.New(app.ctx) if l, err = app.gateway.Apply( entry.Feature(http.MethodGet), entry.Feature(http.MethodHead), entry.Feature(http.MethodPost), entry.Feature(http.MethodPut), entry.Feature(http.MethodPatch), entry.Feature(http.MethodDelete), entry.Feature(http.MethodConnect), entry.Feature(http.MethodOptions), entry.Feature(http.MethodTrace), ); err != nil { return } if app.opts.EnableDebug { app.http.Handle(http.MethodGet, "/debug/pprof/", http.Wrap(pprof.Index)) app.http.Handle(http.MethodGet, "/debug/pprof/goroutine", http.Wrap(pprof.Index)) app.http.Handle(http.MethodGet, "/debug/pprof/heap", http.Wrap(pprof.Index)) app.http.Handle(http.MethodGet, "/debug/pprof/mutex", http.Wrap(pprof.Index)) app.http.Handle(http.MethodGet, "/debug/pprof/threadcreate", http.Wrap(pprof.Index)) app.http.Handle(http.MethodGet, "/debug/pprof/cmdline", http.Wrap(pprof.Cmdline)) app.http.Handle(http.MethodGet, "/debug/pprof/profile", http.Wrap(pprof.Profile)) app.http.Handle(http.MethodGet, "/debug/pprof/symbol", http.Wrap(pprof.Symbol)) app.http.Handle(http.MethodGet, "/debug/pprof/trace", http.Wrap(pprof.Trace)) } timer := time.NewTimer(time.Millisecond * 200) defer timer.Stop() errChan := make(chan error, 1) app.waitGroup.Go(func() { select { case errChan <- app.http.Serve(l): } }) select { case err = <-errChan: case <-timer.C: if app.opts.EnableDirectHttp { app.gateway.Direct(l) } } return } func (app *application) commandServe() (err error) { var ( l net.Listener ) app.command = cli.New(app.ctx) if l, err = app.gateway.Apply( cli.Feature, ); err != nil { return } timer := time.NewTimer(time.Millisecond * 200) defer timer.Stop() errChan := make(chan error, 1) app.waitGroup.Go(func() { select { case errChan <- app.command.Serve(l): } }) select { case err = <-errChan: case <-timer.C: if app.opts.EnableDirectCommand { app.gateway.Direct(l) } } return } func (app *application) gotoInteractive() (err error) { var ( client *cli.Client ) client = cli.NewClient( app.ctx, net.JoinHostPort(app.opts.Address, strconv.Itoa(app.opts.Port)), ) return client.Shell() } func (app *application) buildInfo() *Info { info := &Info{ ID: app.opts.Name, Name: app.opts.Name, Version: app.opts.Version, Status: StateHealthy, Address: app.opts.Address, Port: app.opts.Port, Metadata: app.opts.Metadata, } if info.Metadata == nil { info.Metadata = make(map[string]string) } info.Metadata["os"] = runtime.GOOS info.Metadata["numOfCPU"] = strconv.Itoa(runtime.NumCPU()) info.Metadata["goVersion"] = runtime.Version() info.Metadata["shortName"] = app.opts.ShortName() info.Metadata["upTime"] = app.uptime.Format(time.DateTime) return info } func (app *application) preStart() (err error) { var ( addr string ) app.ctx, app.cancelFunc = context.WithCancelCause(app.opts.Context) if *cliFlag && !app.opts.DisableCommand { if err = app.gotoInteractive(); err != nil { fmt.Println(err) os.Exit(1) } os.Exit(0) } app.info = app.buildInfo() app.Log().Infof("server starting") env.Set(EnvAppName, app.opts.ShortName()) env.Set(EnvAppVersion, app.opts.Version) addr = net.JoinHostPort(app.opts.Address, strconv.Itoa(app.opts.Port)) app.Log().Infof("server listen on: %s", addr) app.gateway = entry.New(addr) if err = app.gateway.Start(app.ctx); err != nil { return } if !app.opts.DisableHttp { if err = app.httpServe(); err != nil { return } } if !app.opts.DisableCommand { if err = app.commandServe(); err != nil { return } } app.plugins.Range(func(key, value any) bool { if plugin, ok := value.(Plugin); ok { if err = plugin.BeforeStart(); err != nil { return false } } return true }) if app.opts.server != nil { if err = app.opts.server.Start(app.ctx); err != nil { app.Log().Warnf("server start error: %s", err.Error()) return } } if !app.opts.DisableStateApi { app.Handle("/-/run/state", func(ctx Context) (err error) { return ctx.Success(State{ ID: app.opts.Name, Name: app.opts.Name, Version: app.opts.Version, Uptime: time.Now().Sub(app.uptime).String(), Gateway: app.gateway.State(), }) }) app.Handle("/-/healthy", func(ctx Context) (err error) { return ctx.Success(app.Healthy()) }) } app.plugins.Range(func(key, value any) bool { if plugin, ok := value.(Plugin); ok { if err = plugin.AfterStart(); err != nil { return false } } return true }) app.Log().Infof("server started") return } func (app *application) preStop() (err error) { if !atomic.CompareAndSwapInt32(&app.exitFlag, 0, 1) { return } app.Log().Infof("server stopping") if app.opts.server != nil { if err = app.opts.server.Stop(); err != nil { app.Log().Warnf("app server stop error: %s", err.Error()) } } app.cancelFunc(ErrStopping) app.plugins.Range(func(key, value any) bool { if plugin, ok := value.(Plugin); ok { if err = plugin.BeforeStop(); err != nil { return false } } return true }) if app.http != nil { if err = app.http.Shutdown(); err != nil { app.Log().Warnf("server http shutdown error: %s", err.Error()) } } if app.command != nil { if err = app.command.Shutdown(); err != nil { app.Log().Warnf("server command shutdown error: %s", err.Error()) } } if err = app.gateway.Stop(); err != nil { app.Log().Warnf("server gateway shutdown error: %s", err.Error()) } app.plugins.Range(func(key, value any) bool { if plugin, ok := value.(Plugin); ok { if err = plugin.AfterStop(); err != nil { return false } } return true }) app.waitGroup.Wait() app.Log().Infof("server stopped") return } func (app *application) Use(plugin Plugin) (err error) { var ( ok bool ) if _, ok = app.plugins.Load(plugin.Name()); ok { return fmt.Errorf("plugin %s already registered", plugin.Name()) } if err = plugin.Mount(app.ctx); err != nil { return } app.plugins.Store(plugin.Name(), plugin) return } func (app *application) Run() (err error) { if err = app.preStart(); err != nil { return } ch := make(chan os.Signal, 1) if app.opts.Signals == nil { app.opts.Signals = []os.Signal{syscall.SIGTERM, syscall.SIGINT, syscall.SIGQUIT, syscall.SIGKILL} } signal.Notify(ch, app.opts.Signals...) select { case <-ch: case <-app.ctx.Done(): } return app.preStop() } func New(cbs ...Option) *application { opts := NewOptions() for _, cb := range cbs { cb(opts) } app := &application{ opts: opts, uptime: time.Now(), } return app }