Browse Source

添加延时任务处理

lxg 3 years ago
parent
commit
5aa06b2b98

+ 5 - 5
bus/bus.go → broker/bus.go

@@ -1,10 +1,10 @@
-package bus
+package broker
 
 import (
 	"context"
 )
 
-type Bus interface {
+type Broker interface {
 	Publish(e *Event)
 	Dispatch(e *Event) (err error)
 	DispatchCtx(ctx context.Context, e *Event) (err error)
@@ -13,14 +13,14 @@ type Bus interface {
 	Subscribers() []Subscriber
 }
 
-func FromContext(ctx context.Context) Bus {
+func FromContext(ctx context.Context) Broker {
 	if v := ctx.Value("bus"); v != nil {
-		return v.(Bus)
+		return v.(Broker)
 	} else {
 		return nil
 	}
 }
 
-func WitchContext(ctx context.Context, b Bus) context.Context {
+func WitchContext(ctx context.Context, b Broker) context.Context {
 	return context.WithValue(ctx, "bus", b)
 }

+ 4 - 4
bus/event.go → broker/event.go

@@ -1,4 +1,4 @@
-package bus
+package broker
 
 import (
 	"encoding/json"
@@ -29,8 +29,8 @@ func releaseEvent(e *Event) {
 }
 
 type Event struct {
+	Namespace string            `json:"-"`
 	Name      string            `json:"name"`
-	Domain    string            `json:"-"`
 	Trigger   string            `json:"-"` //触发的用户,用于一些订阅者过滤数据
 	Timestamp int64             `json:"ts"`
 	Header    map[string]string `json:"meta"`
@@ -75,9 +75,9 @@ func (e *Event) Encode() (buf []byte) {
 	return
 }
 
-func NewEvent(name string, domain string) *Event {
+func NewEvent(namespace string, name string) *Event {
 	e := applyEvent()
+	e.Namespace = namespace
 	e.Name = name
-	e.Domain = domain
 	return e
 }

+ 4 - 4
bus/global.go → broker/global.go

@@ -1,20 +1,20 @@
-package bus
+package broker
 
 import "context"
 
 var (
-	Default Bus
+	Default Broker
 )
 
 func init() {
 	Default = NewInPrcBus(context.Background())
 }
 
-func SetGlobal(bus Bus) {
+func SetGlobal(bus Broker) {
 	Default = bus
 }
 
-func GetGlobal() Bus {
+func GetGlobal() Broker {
 	return Default
 }
 

+ 17 - 12
bus/inprocess.go → broker/inprocess.go

@@ -1,4 +1,4 @@
-package bus
+package broker
 
 import (
 	"context"
@@ -15,17 +15,6 @@ type InProcBus struct {
 	subscribers      map[string]Subscriber
 }
 
-func (bus *InProcBus) Subscribers() []Subscriber {
-	bus.subscriberLocker.RLock()
-	defer bus.subscriberLocker.RUnlock()
-	vs := make([]Subscriber, len(bus.subscribers))
-	i := 0
-	for _, v := range bus.subscribers {
-		vs[i] = v
-		i++
-	}
-	return vs
-}
 
 func (bus *InProcBus) worker() {
 	for {
@@ -40,6 +29,19 @@ func (bus *InProcBus) worker() {
 	}
 }
 
+//Subscribers 获取所有订阅者的快照
+func (bus *InProcBus) Subscribers() []Subscriber {
+	bus.subscriberLocker.RLock()
+	defer bus.subscriberLocker.RUnlock()
+	vs := make([]Subscriber, len(bus.subscribers))
+	i := 0
+	for _, v := range bus.subscribers {
+		vs[i] = v
+		i++
+	}
+	return vs
+}
+
 // Publish 发布一个事件
 func (bus *InProcBus) Publish(e *Event) {
 	bus.once.Do(func() {
@@ -59,6 +61,7 @@ func (bus *InProcBus) Dispatch(e *Event) (err error) {
 	return bus.DispatchCtx(context.Background(), e)
 }
 
+//DispatchCtx 分配一个事件
 func (bus *InProcBus) DispatchCtx(ctx context.Context, e *Event) (err error) {
 	bus.subscriberLocker.RLock()
 	defer bus.subscriberLocker.RUnlock()
@@ -71,6 +74,7 @@ func (bus *InProcBus) DispatchCtx(ctx context.Context, e *Event) (err error) {
 	return
 }
 
+//Subscribe 订阅一个事件
 func (bus *InProcBus) Subscribe(sub Subscriber) (err error) {
 	bus.subscriberLocker.Lock()
 	defer bus.subscriberLocker.Unlock()
@@ -85,6 +89,7 @@ func (bus *InProcBus) Subscribe(sub Subscriber) (err error) {
 	return
 }
 
+//UnSubscribe 取消一个订阅
 func (bus *InProcBus) UnSubscribe(sub Subscriber) (err error) {
 	bus.subscriberLocker.Lock()
 	defer bus.subscriberLocker.Unlock()

+ 1 - 1
bus/subscriber.go → broker/subscriber.go

@@ -1,4 +1,4 @@
-package bus
+package broker
 
 import (
 	"context"

+ 2 - 2
bus/subscriber/http.go → broker/subscriber/http.go

@@ -7,7 +7,7 @@ import (
 	"encoding/json"
 	"encoding/xml"
 	"fmt"
-	"git.nspix.com/golang/micro/bus"
+	"git.nspix.com/golang/micro/broker"
 	"net/http"
 	"strconv"
 	"time"
@@ -47,7 +47,7 @@ func (sub *Http) OnAttach() (err error) {
 	return
 }
 
-func (sub *Http) Process(ctx context.Context, e *bus.Event) (err error) {
+func (sub *Http) Process(ctx context.Context, e *broker.Event) (err error) {
 	var (
 		req  *http.Request
 		resp *http.Response

+ 4 - 4
bus/subscriber/memory.go → broker/subscriber/memory.go

@@ -3,14 +3,14 @@ package subscriber
 import (
 	"context"
 	"fmt"
-	"git.nspix.com/golang/micro/bus"
+	"git.nspix.com/golang/micro/broker"
 	"reflect"
 )
 
 type Memory struct {
 	Id    string
 	topic string
-	cb    bus.HandleFunc
+	cb    broker.HandleFunc
 }
 
 func (sub *Memory) ID() string {
@@ -25,7 +25,7 @@ func (sub *Memory) OnAttach() (err error) {
 	return
 }
 
-func (sub *Memory) Process(ctx context.Context, e *bus.Event) (err error) {
+func (sub *Memory) Process(ctx context.Context, e *broker.Event) (err error) {
 	if sub.cb != nil {
 		err = sub.cb(ctx, e)
 	}
@@ -36,7 +36,7 @@ func (sub *Memory) OnDetach() (err error) {
 	return
 }
 
-func NewMemorySubscriber(topic string, cb bus.HandleFunc) *Memory {
+func NewMemorySubscriber(topic string, cb broker.HandleFunc) *Memory {
 	id := topic + ":" + fmt.Sprint(reflect.ValueOf(cb).Pointer())
 	return &Memory{
 		Id:    id,

+ 17 - 19
bus/subscriber/websocket.go → broker/subscriber/websocket.go

@@ -3,23 +3,23 @@ package subscriber
 import (
 	"context"
 	"encoding/json"
-	"git.nspix.com/golang/micro/bus"
+	"git.nspix.com/golang/micro/broker"
 	"golang.org/x/net/websocket"
 )
 
 type Websocket struct {
-	id     string
-	sid    string
-	domain string
-	topics []string
-	conn   *websocket.Conn
+	id        string
+	sid       string
+	namespace string
+	topics    []string
+	conn      *websocket.Conn
 }
 
 type websocketMessageWrapper struct {
-	Type string     `json:"type"`
-	From string     `json:"from"`
-	To   string     `json:"to"`
-	Body *bus.Event `json:"body"`
+	Type string        `json:"type"`
+	From string        `json:"from"`
+	To   string        `json:"to"`
+	Body *broker.Event `json:"body"`
 }
 
 func (sub *Websocket) ID() string {
@@ -39,12 +39,10 @@ func (sub *Websocket) OnAttach() (err error) {
 	return
 }
 
-func (sub *Websocket) Process(ctx context.Context, e *bus.Event) (err error) {
-	//websocket 只处理改域下面的消息
-	if e.Domain != sub.domain {
+func (sub *Websocket) Process(ctx context.Context, e *broker.Event) (err error) {
+	if e.Namespace != sub.namespace {
 		return
 	}
-	//websocket 只处理输出它自己的消息
 	if e.Trigger != "" && e.Trigger != sub.sid {
 		return
 	}
@@ -91,11 +89,11 @@ func (sub *Websocket) Unsubscribe(topic string) {
 	}
 }
 
-func NewWebsocketSubscriber(id string, sid string, domain string, conn *websocket.Conn) *Websocket {
+func NewWebsocketSubscriber(id string, sid string, namespace string, conn *websocket.Conn) *Websocket {
 	return &Websocket{
-		id:     "websocket:" + id,
-		sid:    sid,
-		domain: domain,
-		conn:   conn,
+		id:        "websocket:" + id,
+		sid:       sid,
+		namespace: namespace,
+		conn:      conn,
 	}
 }

+ 20 - 1
cmd/main.go

@@ -1,7 +1,11 @@
 package main
 
 import (
+	"context"
+	"fmt"
 	"git.nspix.com/golang/micro"
+	"math/rand"
+	"time"
 )
 
 type (
@@ -16,6 +20,7 @@ func main() {
 	svr := micro.New(
 		micro.WithName("git.nspix.com/test", "0.0.01"),
 		micro.WithoutRegister(),
+		micro.WithHttpDebug(),
 		micro.WithStats(),
 		micro.WithCli(),
 		micro.WithPort(6567),
@@ -30,5 +35,19 @@ func main() {
 	}, func(o *micro.HandleOptions) {
 		o.DisableCli = false
 	})
-	svr.Run()
+
+	go func() {
+		time.AfterFunc(time.Second*5, func() {
+			for i := 0; i < 100000; i++ {
+				r := rand.Int63n(10000)
+				svr.DeferTick(time.Duration(r)*time.Millisecond, func(ctx context.Context) {
+					fmt.Println(time.Now().Unix())
+				})
+			}
+		})
+	}()
+
+	if err := svr.Run(); err != nil {
+		fmt.Println(err)
+	}
 }

+ 7 - 4
gateway/gateway.go

@@ -86,7 +86,7 @@ func (g *Gateway) process(conn net.Conn) {
 		requestCounter.Add(1)
 	}
 	//set deadline
-	if err = conn.SetReadDeadline(time.Now().Add(time.Millisecond * 800)); err != nil {
+	if err = conn.SetReadDeadline(time.Now().Add(time.Second)); err != nil {
 		if g.enableStats {
 			requestDropCounter.Add("ERROR", 1)
 		}
@@ -121,7 +121,7 @@ func (g *Gateway) process(conn net.Conn) {
 	}
 }
 
-func (g *Gateway) wroker(ctx context.Context) {
+func (g *Gateway) worker(ctx context.Context) {
 	var (
 		ok   bool
 		conn net.Conn
@@ -152,12 +152,12 @@ func (g *Gateway) schedule(ctx context.Context) {
 	}
 }
 
-//运行项目
+//Run 运行项目
 func (g *Gateway) Run(ctx context.Context) {
 	var wg sync.WaitGroup
 	wg.Add(2)
 	go func() {
-		g.wroker(ctx)
+		g.worker(ctx)
 		wg.Done()
 	}()
 	go func() {
@@ -165,6 +165,9 @@ func (g *Gateway) Run(ctx context.Context) {
 		wg.Done()
 	}()
 	wg.Wait()
+	for _, l := range g.listeners {
+		_ = l.l.Close()
+	}
 }
 
 func New(l net.Listener, stats bool) *Gateway {

+ 7 - 3
gateway/listener.go

@@ -3,11 +3,13 @@ package gateway
 import (
 	"io"
 	"net"
+	"sync/atomic"
 )
 
 type Listener struct {
-	addr net.Addr
-	ch   chan net.Conn
+	addr     net.Addr
+	ch       chan net.Conn
+	exitFlag int32
 }
 
 func (l *Listener) Receive(conn net.Conn) {
@@ -28,7 +30,9 @@ func (l *Listener) Accept() (net.Conn, error) {
 }
 
 func (l *Listener) Close() error {
-	close(l.ch)
+	if atomic.CompareAndSwapInt32(&l.exitFlag, 0, 1) {
+		close(l.ch)
+	}
 	return nil
 }
 

+ 338 - 0
internal/rbtree/rbtree.go

@@ -0,0 +1,338 @@
+package rbtree
+
+// color of node
+const (
+	RED   = 0
+	BLACK = 1
+)
+
+type Keytype interface {
+	LessThan(interface{}) bool
+}
+
+type valuetype interface{}
+
+type node struct {
+	left, right, parent *node
+	color               int
+	Key                 Keytype
+	Value               valuetype
+}
+
+// Tree is a struct of red-black tree.
+type Tree struct {
+	root *node
+	size int
+}
+
+// NewTree creates a new rbtree.
+func NewTree() *Tree {
+	return &Tree{}
+}
+
+// Find finds the node and return its value.
+func (t *Tree) Find(key Keytype) interface{} {
+	n := t.findnode(key)
+	if n != nil {
+		return n.Value
+	}
+	return nil
+}
+
+// FindIt finds the node and return it as an iterator.
+func (t *Tree) FindIt(key Keytype) *node {
+	return t.findnode(key)
+}
+
+// Empty checks whether the rbtree is empty.
+func (t *Tree) Empty() bool {
+	if t.root == nil {
+		return true
+	}
+	return false
+}
+
+// Iterator creates the rbtree's iterator that points to the minmum node.
+func (t *Tree) Iterator() *node {
+	return minimum(t.root)
+}
+
+// Size returns the size of the rbtree.
+func (t *Tree) Size() int {
+	return t.size
+}
+
+// Clear destroys the rbtree.
+func (t *Tree) Clear() {
+	t.root = nil
+	t.size = 0
+}
+
+// Insert inserts the key-value pair into the rbtree.
+func (t *Tree) Insert(key Keytype, value valuetype) {
+	x := t.root
+	var y *node
+
+	for x != nil {
+		y = x
+		if key.LessThan(x.Key) {
+			x = x.left
+		} else {
+			x = x.right
+		}
+	}
+
+	z := &node{parent: y, color: RED, Key: key, Value: value}
+	t.size++
+
+	if y == nil {
+		z.color = BLACK
+		t.root = z
+		return
+	} else if z.Key.LessThan(y.Key) {
+		y.left = z
+	} else {
+		y.right = z
+	}
+	t.rbInsertFixup(z)
+
+}
+
+// Delete deletes the node by key
+func (t *Tree) Delete(key Keytype) {
+	z := t.findnode(key)
+	if z == nil {
+		return
+	}
+
+	var x, y *node
+	if z.left != nil && z.right != nil {
+		y = successor(z)
+	} else {
+		y = z
+	}
+
+	if y.left != nil {
+		x = y.left
+	} else {
+		x = y.right
+	}
+
+	xparent := y.parent
+	if x != nil {
+		x.parent = xparent
+	}
+	if y.parent == nil {
+		t.root = x
+	} else if y == y.parent.left {
+		y.parent.left = x
+	} else {
+		y.parent.right = x
+	}
+
+	if y != z {
+		z.Key = y.Key
+		z.Value = y.Value
+	}
+
+	if y.color == BLACK {
+		t.rbDeleteFixup(x, xparent)
+	}
+	t.size--
+}
+
+func (t *Tree) rbInsertFixup(z *node) {
+	var y *node
+	for z.parent != nil && z.parent.color == RED {
+		if z.parent == z.parent.parent.left {
+			y = z.parent.parent.right
+			if y != nil && y.color == RED {
+				z.parent.color = BLACK
+				y.color = BLACK
+				z.parent.parent.color = RED
+				z = z.parent.parent
+			} else {
+				if z == z.parent.right {
+					z = z.parent
+					t.leftRotate(z)
+				}
+				z.parent.color = BLACK
+				z.parent.parent.color = RED
+				t.rightRotate(z.parent.parent)
+			}
+		} else {
+			y = z.parent.parent.left
+			if y != nil && y.color == RED {
+				z.parent.color = BLACK
+				y.color = BLACK
+				z.parent.parent.color = RED
+				z = z.parent.parent
+			} else {
+				if z == z.parent.left {
+					z = z.parent
+					t.rightRotate(z)
+				}
+				z.parent.color = BLACK
+				z.parent.parent.color = RED
+				t.leftRotate(z.parent.parent)
+			}
+		}
+	}
+	t.root.color = BLACK
+}
+
+func (t *Tree) rbDeleteFixup(x, parent *node) {
+	var w *node
+
+	for x != t.root && getColor(x) == BLACK {
+		if x != nil {
+			parent = x.parent
+		}
+		if x == parent.left {
+			w = parent.right
+			if w.color == RED {
+				w.color = BLACK
+				parent.color = RED
+				t.leftRotate(parent)
+				w = parent.right
+			}
+			if getColor(w.left) == BLACK && getColor(w.right) == BLACK {
+				w.color = RED
+				x = parent
+			} else {
+				if getColor(w.right) == BLACK {
+					if w.left != nil {
+						w.left.color = BLACK
+					}
+					w.color = RED
+					t.rightRotate(w)
+					w = parent.right
+				}
+				w.color = parent.color
+				parent.color = BLACK
+				if w.right != nil {
+					w.right.color = BLACK
+				}
+				t.leftRotate(parent)
+				x = t.root
+			}
+		} else {
+			w = parent.left
+			if w.color == RED {
+				w.color = BLACK
+				parent.color = RED
+				t.rightRotate(parent)
+				w = parent.left
+			}
+			if getColor(w.left) == BLACK && getColor(w.right) == BLACK {
+				w.color = RED
+				x = parent
+			} else {
+				if getColor(w.left) == BLACK {
+					if w.right != nil {
+						w.right.color = BLACK
+					}
+					w.color = RED
+					t.leftRotate(w)
+					w = parent.left
+				}
+				w.color = parent.color
+				parent.color = BLACK
+				if w.left != nil {
+					w.left.color = BLACK
+				}
+				t.rightRotate(parent)
+				x = t.root
+			}
+		}
+	}
+	if x != nil {
+		x.color = BLACK
+	}
+}
+
+func (t *Tree) leftRotate(x *node) {
+	y := x.right
+	x.right = y.left
+	if y.left != nil {
+		y.left.parent = x
+	}
+	y.parent = x.parent
+	if x.parent == nil {
+		t.root = y
+	} else if x == x.parent.left {
+		x.parent.left = y
+	} else {
+		x.parent.right = y
+	}
+	y.left = x
+	x.parent = y
+}
+
+func (t *Tree) rightRotate(x *node) {
+	y := x.left
+	x.left = y.right
+	if y.right != nil {
+		y.right.parent = x
+	}
+	y.parent = x.parent
+	if x.parent == nil {
+		t.root = y
+	} else if x == x.parent.right {
+		x.parent.right = y
+	} else {
+		x.parent.left = y
+	}
+	y.right = x
+	x.parent = y
+}
+
+// findnode finds the node by key and return it, if not exists return nil.
+func (t *Tree) findnode(key Keytype) *node {
+	x := t.root
+	for x != nil {
+		if key.LessThan(x.Key) {
+			x = x.left
+		} else {
+			if key == x.Key {
+				return x
+			}
+			x = x.right
+		}
+	}
+	return nil
+}
+
+// Next returns the node's successor as an iterator.
+func (n *node) Next() *node {
+	return successor(n)
+}
+
+// successor returns the successor of the node
+func successor(x *node) *node {
+	if x.right != nil {
+		return minimum(x.right)
+	}
+	y := x.parent
+	for y != nil && x == y.right {
+		x = y
+		y = x.parent
+	}
+	return y
+}
+
+// getColor gets color of the node.
+func getColor(n *node) int {
+	if n == nil {
+		return BLACK
+	}
+	return n.color
+}
+
+// minimum finds the minimum node of subtree n.
+func minimum(n *node) *node {
+	for n.left != nil {
+		n = n.left
+	}
+	return n
+}

+ 42 - 12
micro.go

@@ -3,6 +3,7 @@ package micro
 import (
 	"context"
 	"git.nspix.com/golang/micro/gateway/cli"
+	"time"
 
 	"git.nspix.com/golang/micro/gateway/http"
 	"git.nspix.com/golang/micro/gateway/rpc"
@@ -18,12 +19,30 @@ type (
 	applicationKey struct {
 	}
 
+	HandleTickerFunc func(ctx context.Context)
+
+	tickPtr struct {
+		sequence int64
+		next     time.Time
+		running  int32
+		options  *TickOptions
+		callback HandleTickerFunc
+	}
+
+	TickOptions struct {
+		Context       context.Context
+		Canceled      bool
+		MustBeExecute bool
+	}
+
+	TickOption func(o *TickOptions)
+
 	HandleOptions struct {
-		DisableRpc     bool   //禁用RPC功能
-		DisableHttp    bool   //禁用HTTP功能
-		DisableCli     bool   //禁用CLI功能
-		HttpPath       string //重定向HTTP路由
-		HttpMethod     string //HTTP路径
+		DisableRpc  bool   //禁用RPC功能
+		DisableHttp bool   //禁用HTTP功能
+		DisableCli  bool   //禁用CLI功能
+		HttpPath    string //重定向HTTP路由
+		HttpMethod  string //HTTP路径
 	}
 
 	HandleOption func(o *HandleOptions)
@@ -31,13 +50,14 @@ type (
 	HandleFunc func(ctx Context) (err error)
 
 	Application interface {
-		Node() *registry.ServiceNode                                                //获取节点信息
-		HttpServe() *http.Server                                                    //获取HTTP实例
-		RPCServe() *rpc.Server                                                      //获取RPC实例
-		CliServe() *cli.Server                                                      //获取cli服务端
-		PeekService(name string) ([]*registry.ServiceNode, error)                   //选择一个服务
-		Handle(method string, cb HandleFunc, opts ...HandleOption)                  //注册一个处理器
-		NewRequest(name, method string, body interface{}) (req *Request, err error) //创建一个rpc请求
+		Node() *registry.ServiceNode                                                           //获取节点信息
+		HttpServe() *http.Server                                                               //获取HTTP实例
+		RPCServe() *rpc.Server                                                                 //获取RPC实例
+		CliServe() *cli.Server                                                                 //获取cli服务端
+		PeekService(name string) ([]*registry.ServiceNode, error)                              //选择一个服务
+		Handle(method string, cb HandleFunc, opts ...HandleOption)                             //注册一个处理器
+		NewRequest(name, method string, body interface{}) (req *Request, err error)            //创建一个rpc请求
+		DeferTick(duration time.Duration, callback HandleTickerFunc, opts ...TickOption) int64 //异步任务
 		Environment() string
 	}
 
@@ -51,6 +71,16 @@ var (
 	contextKey = applicationKey{}
 )
 
+func (p tickPtr) LessThan(i interface{}) bool {
+	q := i.(*tickPtr)
+	if p.next.Before(q.next) {
+		return true
+	} else if p.sequence < q.sequence {
+		return true
+	}
+	return false
+}
+
 func FromContext(ctx context.Context) Application {
 	if v := ctx.Value(contextKey); v != nil {
 		return v.(Application)

+ 97 - 23
service.go

@@ -4,10 +4,14 @@ import (
 	"context"
 	"crypto/md5"
 	"encoding/hex"
+	"git.nspix.com/golang/micro/broker"
 	"git.nspix.com/golang/micro/gateway/cli"
+	"git.nspix.com/golang/micro/internal/rbtree"
 	"git.nspix.com/golang/micro/stats/prometheusbackend"
 	"git.nspix.com/golang/micro/utils/machineid"
 	"github.com/prometheus/client_golang/prometheus/promhttp"
+	"math"
+	"math/rand"
 	"net"
 	hp "net/http"
 	"net/http/pprof"
@@ -15,6 +19,7 @@ import (
 	"os/signal"
 	"strings"
 	"sync"
+	"sync/atomic"
 	"syscall"
 	"time"
 
@@ -28,6 +33,10 @@ import (
 	"git.nspix.com/golang/micro/utils/unsafestr"
 )
 
+var (
+	tickSequence int64
+)
+
 type Service struct {
 	opts        *Options
 	ctx         context.Context
@@ -42,10 +51,12 @@ type Service struct {
 	cliSvr      *cli.Server
 	upTime      time.Time
 	client      *Client
+	timer       *time.Timer
+	tickTree    *rbtree.Tree
 	environment string
 }
 
-func (svr *Service) wrapSync(f func()) {
+func (svr *Service) async(f func()) {
 	svr.wg.Add(1)
 	go func() {
 		f()
@@ -53,36 +64,73 @@ func (svr *Service) wrapSync(f func()) {
 	}()
 }
 
-func (svr *Service) eventLoop() {
+func (svr *Service) worker() {
 	var (
-		err            error
-		registryTicker *time.Ticker
-		collectTicker  *time.Ticker
+		err    error
+		ticker *time.Ticker
 	)
-	registryTicker = time.NewTicker(time.Second * 20)
-	collectTicker = time.NewTicker(time.Hour * 6)
+	ticker = time.NewTicker(time.Second * 20)
 	defer func() {
-		collectTicker.Stop()
-		registryTicker.Stop()
+		ticker.Stop()
 	}()
 	for {
 		select {
-		case <-registryTicker.C:
+		case <-svr.timer.C:
+			node := svr.tickTree.Iterator()
+			if node == nil {
+				svr.timer.Reset(math.MaxInt64)
+				break
+			}
+			tick := node.Value.(*tickPtr)
+			if tick.next.Before(time.Now()) {
+				svr.tickTree.Delete(node.Key)
+				if !tick.options.Canceled {
+					if atomic.CompareAndSwapInt32(&tick.running, 0, 1) {
+						svr.async(func() {
+							tick.callback(tick.options.Context)
+						})
+					}
+				}
+				next := node.Next()
+				if next == nil {
+					svr.timer.Reset(math.MaxInt64)
+				} else {
+					svr.timer.Reset(next.Value.(*tickPtr).next.Sub(time.Now()))
+				}
+			} else {
+				svr.timer.Reset(tick.next.Sub(time.Now()))
+			}
+		case <-ticker.C:
 			if !svr.opts.DisableRegister {
 				if err = svr.registry.Register(svr.node); err != nil {
 					log.Warnf("registry service %s error: %s", svr.opts.Name, err.Error())
 				}
 			}
-		case <-collectTicker.C:
-			if svr.opts.EnableReport {
-				_ = defaultReporter.Do(svr.opts.Name)
-			}
 		case <-svr.ctx.Done():
 			return
 		}
 	}
 }
 
+//DeferTick 定时执行一个任务
+func (svr *Service) DeferTick(duration time.Duration, callback HandleTickerFunc, opts ...TickOption) int64 {
+	t := &tickPtr{
+		sequence: atomic.AddInt64(&tickSequence, 1),
+		next:     time.Now().Add(duration),
+		options:  &TickOptions{},
+		callback: callback,
+	}
+	for _, optFunc := range opts {
+		optFunc(t.options)
+	}
+	if t.options.Context == nil {
+		t.options.Context = svr.ctx
+	}
+	svr.timer.Reset(0)
+	svr.tickTree.Insert(t, t)
+	return t.sequence
+}
+
 //Handle 处理函数
 func (svr *Service) Handle(method string, cb HandleFunc, opts ...HandleOption) {
 	//disable cli default
@@ -117,6 +165,7 @@ func (svr *Service) Handle(method string, cb HandleFunc, opts ...HandleOption) {
 	return
 }
 
+//NewRequest 创建一个请求
 func (svr *Service) NewRequest(name, method string, body interface{}) (req *Request, err error) {
 	return &Request{
 		ServiceName: name,
@@ -211,12 +260,12 @@ func (svr *Service) instance() *registry.ServiceNode {
 func (svr *Service) startHTTPServe() (err error) {
 	l := gateway.NewListener(svr.listener.Addr())
 	if err = svr.gateway.Attaches([][]byte{[]byte("GET"), []byte("POST"), []byte("PUT"), []byte("DELETE"), []byte("OPTIONS")}, l); err == nil {
-		svr.wrapSync(func() {
+		svr.async(func() {
 			if err = svr.httpSvr.Serve(l); err != nil {
 				log.Warnf("http serve error: %s", err.Error())
 			}
+			log.Infof("http server stopped")
 		})
-
 		svr.httpSvr.Handle("GET", "/healthy", func(ctx *http.Context) (err error) {
 			return ctx.Success(map[string]interface{}{
 				"id":      svr.node.ID,
@@ -251,10 +300,11 @@ func (svr *Service) startHTTPServe() (err error) {
 func (svr *Service) startRPCServe() (err error) {
 	l := gateway.NewListener(svr.listener.Addr())
 	if err = svr.gateway.Attach([]byte("RPC"), l); err == nil {
-		svr.wrapSync(func() {
+		svr.async(func() {
 			if err = svr.rpcSvr.Serve(l); err != nil {
 				log.Warnf("rpc serve start error: %s", err.Error())
 			}
+			log.Infof("rpc server stopped")
 		})
 		log.Infof("attach rpc listener success")
 	} else {
@@ -266,10 +316,11 @@ func (svr *Service) startRPCServe() (err error) {
 func (svr *Service) startCliServe() (err error) {
 	l := gateway.NewListener(svr.listener.Addr())
 	if err = svr.gateway.Attach([]byte("CLI"), l); err == nil {
-		svr.wrapSync(func() {
+		svr.async(func() {
 			if err = svr.cliSvr.Serve(l); err != nil {
 				log.Warnf("cli serve start error: %s", err.Error())
 			}
+			log.Infof("cli server stopped")
 		})
 		log.Infof("attach cli listener success")
 	} else {
@@ -307,15 +358,13 @@ func (svr *Service) prepare() (err error) {
 		}
 		log.Infof("server listen on: %s", svr.listener.Addr())
 		svr.gateway = gateway.New(svr.listener, svr.opts.EnableStats)
-		svr.wrapSync(func() {
+		svr.async(func() {
 			svr.gateway.Run(svr.ctx)
 		})
-
 		//开启HTTP服务
 		if svr.opts.EnableHttp {
 			err = svr.startHTTPServe()
 		}
-
 		//开启RCP服务
 		if svr.opts.EnableRPC {
 			err = svr.startRPCServe()
@@ -326,12 +375,18 @@ func (svr *Service) prepare() (err error) {
 		}
 	}
 	svr.node = svr.instance()
-	svr.wrapSync(func() {
-		svr.eventLoop()
+	svr.async(func() {
+		svr.worker()
 	})
 	if !svr.opts.DisableRegister {
 		_ = svr.registry.Register(svr.node)
 	}
+	if svr.opts.EnableReport {
+		rn := rand.Int31n(48) + 60
+		svr.DeferTick(time.Hour*time.Duration(rn), func(ctx context.Context) {
+			_ = defaultReporter.Do(svr.opts.Name)
+		})
+	}
 	return
 }
 
@@ -348,11 +403,26 @@ func (svr *Service) destroy() (err error) {
 	if svr.listener != nil {
 		if err = svr.listener.Close(); err != nil {
 			log.Warnf(err.Error())
+		} else {
+			log.Infof("listener closed")
 		}
 	}
 	if err = svr.client.Close(); err != nil {
 		log.Warnf(err.Error())
 	}
+	svr.timer.Stop()
+	if svr.tickTree.Size() > 0 {
+		log.Warnf("num of %d defer tick dropped", svr.tickTree.Size())
+		node := svr.tickTree.Iterator()
+		for node != nil {
+			tick := node.Value.(*tickPtr)
+			if tick.options.MustBeExecute {
+				tick.callback(tick.options.Context)
+			}
+			node = node.Next()
+		}
+	}
+	svr.wg.Wait()
 	log.Infof("service stopped")
 	return
 }
@@ -365,6 +435,8 @@ func (svr *Service) Run() (err error) {
 	if err = svr.prepare(); err != nil {
 		return
 	}
+	//设置全局的代理组件
+	broker.SetGlobal(broker.NewInPrcBus(svr.ctx))
 	//start server
 	if svr.opts.Server != nil {
 		if err = svr.opts.Server.Start(svr.ctx); err != nil {
@@ -398,6 +470,8 @@ func New(opts ...Option) *Service {
 		cliSvr:      cli.New(),
 		rpcSvr:      rpc.NewServer(),
 		registry:    o.registry,
+		timer:       time.NewTimer(math.MaxInt64),
+		tickTree:    rbtree.NewTree(),
 		client:      NewClient(o.registry),
 		environment: EnvironmentHost,
 	}