server.go 45 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548
  1. /*
  2. *
  3. * Copyright 2014 gRPC authors.
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. *
  17. */
  18. package grpc
  19. import (
  20. "context"
  21. "errors"
  22. "fmt"
  23. "io"
  24. "math"
  25. "net"
  26. "net/http"
  27. "reflect"
  28. "runtime"
  29. "strings"
  30. "sync"
  31. "sync/atomic"
  32. "time"
  33. "golang.org/x/net/trace"
  34. "google.golang.org/grpc/codes"
  35. "google.golang.org/grpc/credentials"
  36. "google.golang.org/grpc/encoding"
  37. "google.golang.org/grpc/encoding/proto"
  38. "google.golang.org/grpc/grpclog"
  39. "google.golang.org/grpc/internal/binarylog"
  40. "google.golang.org/grpc/internal/channelz"
  41. "google.golang.org/grpc/internal/grpcsync"
  42. "google.golang.org/grpc/internal/transport"
  43. "google.golang.org/grpc/keepalive"
  44. "google.golang.org/grpc/metadata"
  45. "google.golang.org/grpc/peer"
  46. "google.golang.org/grpc/stats"
  47. "google.golang.org/grpc/status"
  48. "google.golang.org/grpc/tap"
  49. )
  50. const (
  51. defaultServerMaxReceiveMessageSize = 1024 * 1024 * 4
  52. defaultServerMaxSendMessageSize = math.MaxInt32
  53. )
  54. var statusOK = status.New(codes.OK, "")
  55. type methodHandler func(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor UnaryServerInterceptor) (interface{}, error)
  56. // MethodDesc represents an RPC service's method specification.
  57. type MethodDesc struct {
  58. MethodName string
  59. Handler methodHandler
  60. }
  61. // ServiceDesc represents an RPC service's specification.
  62. type ServiceDesc struct {
  63. ServiceName string
  64. // The pointer to the service interface. Used to check whether the user
  65. // provided implementation satisfies the interface requirements.
  66. HandlerType interface{}
  67. Methods []MethodDesc
  68. Streams []StreamDesc
  69. Metadata interface{}
  70. }
  71. // service consists of the information of the server serving this service and
  72. // the methods in this service.
  73. type service struct {
  74. server interface{} // the server for service methods
  75. md map[string]*MethodDesc
  76. sd map[string]*StreamDesc
  77. mdata interface{}
  78. }
  79. // Server is a gRPC server to serve RPC requests.
  80. type Server struct {
  81. opts serverOptions
  82. mu sync.Mutex // guards following
  83. lis map[net.Listener]bool
  84. conns map[transport.ServerTransport]bool
  85. serve bool
  86. drain bool
  87. cv *sync.Cond // signaled when connections close for GracefulStop
  88. m map[string]*service // service name -> service info
  89. events trace.EventLog
  90. quit *grpcsync.Event
  91. done *grpcsync.Event
  92. channelzRemoveOnce sync.Once
  93. serveWG sync.WaitGroup // counts active Serve goroutines for GracefulStop
  94. channelzID int64 // channelz unique identification number
  95. czData *channelzData
  96. }
  97. type serverOptions struct {
  98. creds credentials.TransportCredentials
  99. codec baseCodec
  100. cp Compressor
  101. dc Decompressor
  102. unaryInt UnaryServerInterceptor
  103. streamInt StreamServerInterceptor
  104. inTapHandle tap.ServerInHandle
  105. statsHandler stats.Handler
  106. maxConcurrentStreams uint32
  107. maxReceiveMessageSize int
  108. maxSendMessageSize int
  109. unknownStreamDesc *StreamDesc
  110. keepaliveParams keepalive.ServerParameters
  111. keepalivePolicy keepalive.EnforcementPolicy
  112. initialWindowSize int32
  113. initialConnWindowSize int32
  114. writeBufferSize int
  115. readBufferSize int
  116. connectionTimeout time.Duration
  117. maxHeaderListSize *uint32
  118. headerTableSize *uint32
  119. }
  120. var defaultServerOptions = serverOptions{
  121. maxReceiveMessageSize: defaultServerMaxReceiveMessageSize,
  122. maxSendMessageSize: defaultServerMaxSendMessageSize,
  123. connectionTimeout: 120 * time.Second,
  124. writeBufferSize: defaultWriteBufSize,
  125. readBufferSize: defaultReadBufSize,
  126. }
  127. // A ServerOption sets options such as credentials, codec and keepalive parameters, etc.
  128. type ServerOption interface {
  129. apply(*serverOptions)
  130. }
  131. // EmptyServerOption does not alter the server configuration. It can be embedded
  132. // in another structure to build custom server options.
  133. //
  134. // This API is EXPERIMENTAL.
  135. type EmptyServerOption struct{}
  136. func (EmptyServerOption) apply(*serverOptions) {}
  137. // funcServerOption wraps a function that modifies serverOptions into an
  138. // implementation of the ServerOption interface.
  139. type funcServerOption struct {
  140. f func(*serverOptions)
  141. }
  142. func (fdo *funcServerOption) apply(do *serverOptions) {
  143. fdo.f(do)
  144. }
  145. func newFuncServerOption(f func(*serverOptions)) *funcServerOption {
  146. return &funcServerOption{
  147. f: f,
  148. }
  149. }
  150. // WriteBufferSize determines how much data can be batched before doing a write on the wire.
  151. // The corresponding memory allocation for this buffer will be twice the size to keep syscalls low.
  152. // The default value for this buffer is 32KB.
  153. // Zero will disable the write buffer such that each write will be on underlying connection.
  154. // Note: A Send call may not directly translate to a write.
  155. func WriteBufferSize(s int) ServerOption {
  156. return newFuncServerOption(func(o *serverOptions) {
  157. o.writeBufferSize = s
  158. })
  159. }
  160. // ReadBufferSize lets you set the size of read buffer, this determines how much data can be read at most
  161. // for one read syscall.
  162. // The default value for this buffer is 32KB.
  163. // Zero will disable read buffer for a connection so data framer can access the underlying
  164. // conn directly.
  165. func ReadBufferSize(s int) ServerOption {
  166. return newFuncServerOption(func(o *serverOptions) {
  167. o.readBufferSize = s
  168. })
  169. }
  170. // InitialWindowSize returns a ServerOption that sets window size for stream.
  171. // The lower bound for window size is 64K and any value smaller than that will be ignored.
  172. func InitialWindowSize(s int32) ServerOption {
  173. return newFuncServerOption(func(o *serverOptions) {
  174. o.initialWindowSize = s
  175. })
  176. }
  177. // InitialConnWindowSize returns a ServerOption that sets window size for a connection.
  178. // The lower bound for window size is 64K and any value smaller than that will be ignored.
  179. func InitialConnWindowSize(s int32) ServerOption {
  180. return newFuncServerOption(func(o *serverOptions) {
  181. o.initialConnWindowSize = s
  182. })
  183. }
  184. // KeepaliveParams returns a ServerOption that sets keepalive and max-age parameters for the server.
  185. func KeepaliveParams(kp keepalive.ServerParameters) ServerOption {
  186. if kp.Time > 0 && kp.Time < time.Second {
  187. grpclog.Warning("Adjusting keepalive ping interval to minimum period of 1s")
  188. kp.Time = time.Second
  189. }
  190. return newFuncServerOption(func(o *serverOptions) {
  191. o.keepaliveParams = kp
  192. })
  193. }
  194. // KeepaliveEnforcementPolicy returns a ServerOption that sets keepalive enforcement policy for the server.
  195. func KeepaliveEnforcementPolicy(kep keepalive.EnforcementPolicy) ServerOption {
  196. return newFuncServerOption(func(o *serverOptions) {
  197. o.keepalivePolicy = kep
  198. })
  199. }
  200. // CustomCodec returns a ServerOption that sets a codec for message marshaling and unmarshaling.
  201. //
  202. // This will override any lookups by content-subtype for Codecs registered with RegisterCodec.
  203. func CustomCodec(codec Codec) ServerOption {
  204. return newFuncServerOption(func(o *serverOptions) {
  205. o.codec = codec
  206. })
  207. }
  208. // RPCCompressor returns a ServerOption that sets a compressor for outbound
  209. // messages. For backward compatibility, all outbound messages will be sent
  210. // using this compressor, regardless of incoming message compression. By
  211. // default, server messages will be sent using the same compressor with which
  212. // request messages were sent.
  213. //
  214. // Deprecated: use encoding.RegisterCompressor instead.
  215. func RPCCompressor(cp Compressor) ServerOption {
  216. return newFuncServerOption(func(o *serverOptions) {
  217. o.cp = cp
  218. })
  219. }
  220. // RPCDecompressor returns a ServerOption that sets a decompressor for inbound
  221. // messages. It has higher priority than decompressors registered via
  222. // encoding.RegisterCompressor.
  223. //
  224. // Deprecated: use encoding.RegisterCompressor instead.
  225. func RPCDecompressor(dc Decompressor) ServerOption {
  226. return newFuncServerOption(func(o *serverOptions) {
  227. o.dc = dc
  228. })
  229. }
  230. // MaxMsgSize returns a ServerOption to set the max message size in bytes the server can receive.
  231. // If this is not set, gRPC uses the default limit.
  232. //
  233. // Deprecated: use MaxRecvMsgSize instead.
  234. func MaxMsgSize(m int) ServerOption {
  235. return MaxRecvMsgSize(m)
  236. }
  237. // MaxRecvMsgSize returns a ServerOption to set the max message size in bytes the server can receive.
  238. // If this is not set, gRPC uses the default 4MB.
  239. func MaxRecvMsgSize(m int) ServerOption {
  240. return newFuncServerOption(func(o *serverOptions) {
  241. o.maxReceiveMessageSize = m
  242. })
  243. }
  244. // MaxSendMsgSize returns a ServerOption to set the max message size in bytes the server can send.
  245. // If this is not set, gRPC uses the default `math.MaxInt32`.
  246. func MaxSendMsgSize(m int) ServerOption {
  247. return newFuncServerOption(func(o *serverOptions) {
  248. o.maxSendMessageSize = m
  249. })
  250. }
  251. // MaxConcurrentStreams returns a ServerOption that will apply a limit on the number
  252. // of concurrent streams to each ServerTransport.
  253. func MaxConcurrentStreams(n uint32) ServerOption {
  254. return newFuncServerOption(func(o *serverOptions) {
  255. o.maxConcurrentStreams = n
  256. })
  257. }
  258. // Creds returns a ServerOption that sets credentials for server connections.
  259. func Creds(c credentials.TransportCredentials) ServerOption {
  260. return newFuncServerOption(func(o *serverOptions) {
  261. o.creds = c
  262. })
  263. }
  264. // UnaryInterceptor returns a ServerOption that sets the UnaryServerInterceptor for the
  265. // server. Only one unary interceptor can be installed. The construction of multiple
  266. // interceptors (e.g., chaining) can be implemented at the caller.
  267. func UnaryInterceptor(i UnaryServerInterceptor) ServerOption {
  268. return newFuncServerOption(func(o *serverOptions) {
  269. if o.unaryInt != nil {
  270. panic("The unary server interceptor was already set and may not be reset.")
  271. }
  272. o.unaryInt = i
  273. })
  274. }
  275. // StreamInterceptor returns a ServerOption that sets the StreamServerInterceptor for the
  276. // server. Only one stream interceptor can be installed.
  277. func StreamInterceptor(i StreamServerInterceptor) ServerOption {
  278. return newFuncServerOption(func(o *serverOptions) {
  279. if o.streamInt != nil {
  280. panic("The stream server interceptor was already set and may not be reset.")
  281. }
  282. o.streamInt = i
  283. })
  284. }
  285. // InTapHandle returns a ServerOption that sets the tap handle for all the server
  286. // transport to be created. Only one can be installed.
  287. func InTapHandle(h tap.ServerInHandle) ServerOption {
  288. return newFuncServerOption(func(o *serverOptions) {
  289. if o.inTapHandle != nil {
  290. panic("The tap handle was already set and may not be reset.")
  291. }
  292. o.inTapHandle = h
  293. })
  294. }
  295. // StatsHandler returns a ServerOption that sets the stats handler for the server.
  296. func StatsHandler(h stats.Handler) ServerOption {
  297. return newFuncServerOption(func(o *serverOptions) {
  298. o.statsHandler = h
  299. })
  300. }
  301. // UnknownServiceHandler returns a ServerOption that allows for adding a custom
  302. // unknown service handler. The provided method is a bidi-streaming RPC service
  303. // handler that will be invoked instead of returning the "unimplemented" gRPC
  304. // error whenever a request is received for an unregistered service or method.
  305. // The handling function and stream interceptor (if set) have full access to
  306. // the ServerStream, including its Context.
  307. func UnknownServiceHandler(streamHandler StreamHandler) ServerOption {
  308. return newFuncServerOption(func(o *serverOptions) {
  309. o.unknownStreamDesc = &StreamDesc{
  310. StreamName: "unknown_service_handler",
  311. Handler: streamHandler,
  312. // We need to assume that the users of the streamHandler will want to use both.
  313. ClientStreams: true,
  314. ServerStreams: true,
  315. }
  316. })
  317. }
  318. // ConnectionTimeout returns a ServerOption that sets the timeout for
  319. // connection establishment (up to and including HTTP/2 handshaking) for all
  320. // new connections. If this is not set, the default is 120 seconds. A zero or
  321. // negative value will result in an immediate timeout.
  322. //
  323. // This API is EXPERIMENTAL.
  324. func ConnectionTimeout(d time.Duration) ServerOption {
  325. return newFuncServerOption(func(o *serverOptions) {
  326. o.connectionTimeout = d
  327. })
  328. }
  329. // MaxHeaderListSize returns a ServerOption that sets the max (uncompressed) size
  330. // of header list that the server is prepared to accept.
  331. func MaxHeaderListSize(s uint32) ServerOption {
  332. return newFuncServerOption(func(o *serverOptions) {
  333. o.maxHeaderListSize = &s
  334. })
  335. }
  336. // HeaderTableSize returns a ServerOption that sets the size of dynamic
  337. // header table for stream.
  338. //
  339. // This API is EXPERIMENTAL.
  340. func HeaderTableSize(s uint32) ServerOption {
  341. return newFuncServerOption(func(o *serverOptions) {
  342. o.headerTableSize = &s
  343. })
  344. }
  345. // NewServer creates a gRPC server which has no service registered and has not
  346. // started to accept requests yet.
  347. func NewServer(opt ...ServerOption) *Server {
  348. opts := defaultServerOptions
  349. for _, o := range opt {
  350. o.apply(&opts)
  351. }
  352. s := &Server{
  353. lis: make(map[net.Listener]bool),
  354. opts: opts,
  355. conns: make(map[transport.ServerTransport]bool),
  356. m: make(map[string]*service),
  357. quit: grpcsync.NewEvent(),
  358. done: grpcsync.NewEvent(),
  359. czData: new(channelzData),
  360. }
  361. s.cv = sync.NewCond(&s.mu)
  362. if EnableTracing {
  363. _, file, line, _ := runtime.Caller(1)
  364. s.events = trace.NewEventLog("grpc.Server", fmt.Sprintf("%s:%d", file, line))
  365. }
  366. if channelz.IsOn() {
  367. s.channelzID = channelz.RegisterServer(&channelzServer{s}, "")
  368. }
  369. return s
  370. }
  371. // printf records an event in s's event log, unless s has been stopped.
  372. // REQUIRES s.mu is held.
  373. func (s *Server) printf(format string, a ...interface{}) {
  374. if s.events != nil {
  375. s.events.Printf(format, a...)
  376. }
  377. }
  378. // errorf records an error in s's event log, unless s has been stopped.
  379. // REQUIRES s.mu is held.
  380. func (s *Server) errorf(format string, a ...interface{}) {
  381. if s.events != nil {
  382. s.events.Errorf(format, a...)
  383. }
  384. }
  385. // RegisterService registers a service and its implementation to the gRPC
  386. // server. It is called from the IDL generated code. This must be called before
  387. // invoking Serve.
  388. func (s *Server) RegisterService(sd *ServiceDesc, ss interface{}) {
  389. ht := reflect.TypeOf(sd.HandlerType).Elem()
  390. st := reflect.TypeOf(ss)
  391. if !st.Implements(ht) {
  392. grpclog.Fatalf("grpc: Server.RegisterService found the handler of type %v that does not satisfy %v", st, ht)
  393. }
  394. s.register(sd, ss)
  395. }
  396. func (s *Server) register(sd *ServiceDesc, ss interface{}) {
  397. s.mu.Lock()
  398. defer s.mu.Unlock()
  399. s.printf("RegisterService(%q)", sd.ServiceName)
  400. if s.serve {
  401. grpclog.Fatalf("grpc: Server.RegisterService after Server.Serve for %q", sd.ServiceName)
  402. }
  403. if _, ok := s.m[sd.ServiceName]; ok {
  404. grpclog.Fatalf("grpc: Server.RegisterService found duplicate service registration for %q", sd.ServiceName)
  405. }
  406. srv := &service{
  407. server: ss,
  408. md: make(map[string]*MethodDesc),
  409. sd: make(map[string]*StreamDesc),
  410. mdata: sd.Metadata,
  411. }
  412. for i := range sd.Methods {
  413. d := &sd.Methods[i]
  414. srv.md[d.MethodName] = d
  415. }
  416. for i := range sd.Streams {
  417. d := &sd.Streams[i]
  418. srv.sd[d.StreamName] = d
  419. }
  420. s.m[sd.ServiceName] = srv
  421. }
  422. // MethodInfo contains the information of an RPC including its method name and type.
  423. type MethodInfo struct {
  424. // Name is the method name only, without the service name or package name.
  425. Name string
  426. // IsClientStream indicates whether the RPC is a client streaming RPC.
  427. IsClientStream bool
  428. // IsServerStream indicates whether the RPC is a server streaming RPC.
  429. IsServerStream bool
  430. }
  431. // ServiceInfo contains unary RPC method info, streaming RPC method info and metadata for a service.
  432. type ServiceInfo struct {
  433. Methods []MethodInfo
  434. // Metadata is the metadata specified in ServiceDesc when registering service.
  435. Metadata interface{}
  436. }
  437. // GetServiceInfo returns a map from service names to ServiceInfo.
  438. // Service names include the package names, in the form of <package>.<service>.
  439. func (s *Server) GetServiceInfo() map[string]ServiceInfo {
  440. ret := make(map[string]ServiceInfo)
  441. for n, srv := range s.m {
  442. methods := make([]MethodInfo, 0, len(srv.md)+len(srv.sd))
  443. for m := range srv.md {
  444. methods = append(methods, MethodInfo{
  445. Name: m,
  446. IsClientStream: false,
  447. IsServerStream: false,
  448. })
  449. }
  450. for m, d := range srv.sd {
  451. methods = append(methods, MethodInfo{
  452. Name: m,
  453. IsClientStream: d.ClientStreams,
  454. IsServerStream: d.ServerStreams,
  455. })
  456. }
  457. ret[n] = ServiceInfo{
  458. Methods: methods,
  459. Metadata: srv.mdata,
  460. }
  461. }
  462. return ret
  463. }
  464. // ErrServerStopped indicates that the operation is now illegal because of
  465. // the server being stopped.
  466. var ErrServerStopped = errors.New("grpc: the server has been stopped")
  467. func (s *Server) useTransportAuthenticator(rawConn net.Conn) (net.Conn, credentials.AuthInfo, error) {
  468. if s.opts.creds == nil {
  469. return rawConn, nil, nil
  470. }
  471. return s.opts.creds.ServerHandshake(rawConn)
  472. }
  473. type listenSocket struct {
  474. net.Listener
  475. channelzID int64
  476. }
  477. func (l *listenSocket) ChannelzMetric() *channelz.SocketInternalMetric {
  478. return &channelz.SocketInternalMetric{
  479. SocketOptions: channelz.GetSocketOption(l.Listener),
  480. LocalAddr: l.Listener.Addr(),
  481. }
  482. }
  483. func (l *listenSocket) Close() error {
  484. err := l.Listener.Close()
  485. if channelz.IsOn() {
  486. channelz.RemoveEntry(l.channelzID)
  487. }
  488. return err
  489. }
  490. // Serve accepts incoming connections on the listener lis, creating a new
  491. // ServerTransport and service goroutine for each. The service goroutines
  492. // read gRPC requests and then call the registered handlers to reply to them.
  493. // Serve returns when lis.Accept fails with fatal errors. lis will be closed when
  494. // this method returns.
  495. // Serve will return a non-nil error unless Stop or GracefulStop is called.
  496. func (s *Server) Serve(lis net.Listener) error {
  497. s.mu.Lock()
  498. s.printf("serving")
  499. s.serve = true
  500. if s.lis == nil {
  501. // Serve called after Stop or GracefulStop.
  502. s.mu.Unlock()
  503. lis.Close()
  504. return ErrServerStopped
  505. }
  506. s.serveWG.Add(1)
  507. defer func() {
  508. s.serveWG.Done()
  509. if s.quit.HasFired() {
  510. // Stop or GracefulStop called; block until done and return nil.
  511. <-s.done.Done()
  512. }
  513. }()
  514. ls := &listenSocket{Listener: lis}
  515. s.lis[ls] = true
  516. if channelz.IsOn() {
  517. ls.channelzID = channelz.RegisterListenSocket(ls, s.channelzID, lis.Addr().String())
  518. }
  519. s.mu.Unlock()
  520. defer func() {
  521. s.mu.Lock()
  522. if s.lis != nil && s.lis[ls] {
  523. ls.Close()
  524. delete(s.lis, ls)
  525. }
  526. s.mu.Unlock()
  527. }()
  528. var tempDelay time.Duration // how long to sleep on accept failure
  529. for {
  530. rawConn, err := lis.Accept()
  531. if err != nil {
  532. if ne, ok := err.(interface {
  533. Temporary() bool
  534. }); ok && ne.Temporary() {
  535. if tempDelay == 0 {
  536. tempDelay = 5 * time.Millisecond
  537. } else {
  538. tempDelay *= 2
  539. }
  540. if max := 1 * time.Second; tempDelay > max {
  541. tempDelay = max
  542. }
  543. s.mu.Lock()
  544. s.printf("Accept error: %v; retrying in %v", err, tempDelay)
  545. s.mu.Unlock()
  546. timer := time.NewTimer(tempDelay)
  547. select {
  548. case <-timer.C:
  549. case <-s.quit.Done():
  550. timer.Stop()
  551. return nil
  552. }
  553. continue
  554. }
  555. s.mu.Lock()
  556. s.printf("done serving; Accept = %v", err)
  557. s.mu.Unlock()
  558. if s.quit.HasFired() {
  559. return nil
  560. }
  561. return err
  562. }
  563. tempDelay = 0
  564. // Start a new goroutine to deal with rawConn so we don't stall this Accept
  565. // loop goroutine.
  566. //
  567. // Make sure we account for the goroutine so GracefulStop doesn't nil out
  568. // s.conns before this conn can be added.
  569. s.serveWG.Add(1)
  570. go func() {
  571. s.handleRawConn(rawConn)
  572. s.serveWG.Done()
  573. }()
  574. }
  575. }
  576. // handleRawConn forks a goroutine to handle a just-accepted connection that
  577. // has not had any I/O performed on it yet.
  578. func (s *Server) handleRawConn(rawConn net.Conn) {
  579. if s.quit.HasFired() {
  580. rawConn.Close()
  581. return
  582. }
  583. rawConn.SetDeadline(time.Now().Add(s.opts.connectionTimeout))
  584. conn, authInfo, err := s.useTransportAuthenticator(rawConn)
  585. if err != nil {
  586. // ErrConnDispatched means that the connection was dispatched away from
  587. // gRPC; those connections should be left open.
  588. if err != credentials.ErrConnDispatched {
  589. s.mu.Lock()
  590. s.errorf("ServerHandshake(%q) failed: %v", rawConn.RemoteAddr(), err)
  591. s.mu.Unlock()
  592. grpclog.Warningf("grpc: Server.Serve failed to complete security handshake from %q: %v", rawConn.RemoteAddr(), err)
  593. rawConn.Close()
  594. }
  595. rawConn.SetDeadline(time.Time{})
  596. return
  597. }
  598. // Finish handshaking (HTTP2)
  599. st := s.newHTTP2Transport(conn, authInfo)
  600. if st == nil {
  601. return
  602. }
  603. rawConn.SetDeadline(time.Time{})
  604. if !s.addConn(st) {
  605. return
  606. }
  607. go func() {
  608. s.serveStreams(st)
  609. s.removeConn(st)
  610. }()
  611. }
  612. // newHTTP2Transport sets up a http/2 transport (using the
  613. // gRPC http2 server transport in transport/http2_server.go).
  614. func (s *Server) newHTTP2Transport(c net.Conn, authInfo credentials.AuthInfo) transport.ServerTransport {
  615. config := &transport.ServerConfig{
  616. MaxStreams: s.opts.maxConcurrentStreams,
  617. AuthInfo: authInfo,
  618. InTapHandle: s.opts.inTapHandle,
  619. StatsHandler: s.opts.statsHandler,
  620. KeepaliveParams: s.opts.keepaliveParams,
  621. KeepalivePolicy: s.opts.keepalivePolicy,
  622. InitialWindowSize: s.opts.initialWindowSize,
  623. InitialConnWindowSize: s.opts.initialConnWindowSize,
  624. WriteBufferSize: s.opts.writeBufferSize,
  625. ReadBufferSize: s.opts.readBufferSize,
  626. ChannelzParentID: s.channelzID,
  627. MaxHeaderListSize: s.opts.maxHeaderListSize,
  628. HeaderTableSize: s.opts.headerTableSize,
  629. }
  630. st, err := transport.NewServerTransport("http2", c, config)
  631. if err != nil {
  632. s.mu.Lock()
  633. s.errorf("NewServerTransport(%q) failed: %v", c.RemoteAddr(), err)
  634. s.mu.Unlock()
  635. c.Close()
  636. grpclog.Warningln("grpc: Server.Serve failed to create ServerTransport: ", err)
  637. return nil
  638. }
  639. return st
  640. }
  641. func (s *Server) serveStreams(st transport.ServerTransport) {
  642. defer st.Close()
  643. var wg sync.WaitGroup
  644. st.HandleStreams(func(stream *transport.Stream) {
  645. wg.Add(1)
  646. go func() {
  647. defer wg.Done()
  648. s.handleStream(st, stream, s.traceInfo(st, stream))
  649. }()
  650. }, func(ctx context.Context, method string) context.Context {
  651. if !EnableTracing {
  652. return ctx
  653. }
  654. tr := trace.New("grpc.Recv."+methodFamily(method), method)
  655. return trace.NewContext(ctx, tr)
  656. })
  657. wg.Wait()
  658. }
  659. var _ http.Handler = (*Server)(nil)
  660. // ServeHTTP implements the Go standard library's http.Handler
  661. // interface by responding to the gRPC request r, by looking up
  662. // the requested gRPC method in the gRPC server s.
  663. //
  664. // The provided HTTP request must have arrived on an HTTP/2
  665. // connection. When using the Go standard library's server,
  666. // practically this means that the Request must also have arrived
  667. // over TLS.
  668. //
  669. // To share one port (such as 443 for https) between gRPC and an
  670. // existing http.Handler, use a root http.Handler such as:
  671. //
  672. // if r.ProtoMajor == 2 && strings.HasPrefix(
  673. // r.Header.Get("Content-Type"), "application/grpc") {
  674. // grpcServer.ServeHTTP(w, r)
  675. // } else {
  676. // yourMux.ServeHTTP(w, r)
  677. // }
  678. //
  679. // Note that ServeHTTP uses Go's HTTP/2 server implementation which is totally
  680. // separate from grpc-go's HTTP/2 server. Performance and features may vary
  681. // between the two paths. ServeHTTP does not support some gRPC features
  682. // available through grpc-go's HTTP/2 server, and it is currently EXPERIMENTAL
  683. // and subject to change.
  684. func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
  685. st, err := transport.NewServerHandlerTransport(w, r, s.opts.statsHandler)
  686. if err != nil {
  687. http.Error(w, err.Error(), http.StatusInternalServerError)
  688. return
  689. }
  690. if !s.addConn(st) {
  691. return
  692. }
  693. defer s.removeConn(st)
  694. s.serveStreams(st)
  695. }
  696. // traceInfo returns a traceInfo and associates it with stream, if tracing is enabled.
  697. // If tracing is not enabled, it returns nil.
  698. func (s *Server) traceInfo(st transport.ServerTransport, stream *transport.Stream) (trInfo *traceInfo) {
  699. if !EnableTracing {
  700. return nil
  701. }
  702. tr, ok := trace.FromContext(stream.Context())
  703. if !ok {
  704. return nil
  705. }
  706. trInfo = &traceInfo{
  707. tr: tr,
  708. firstLine: firstLine{
  709. client: false,
  710. remoteAddr: st.RemoteAddr(),
  711. },
  712. }
  713. if dl, ok := stream.Context().Deadline(); ok {
  714. trInfo.firstLine.deadline = time.Until(dl)
  715. }
  716. return trInfo
  717. }
  718. func (s *Server) addConn(st transport.ServerTransport) bool {
  719. s.mu.Lock()
  720. defer s.mu.Unlock()
  721. if s.conns == nil {
  722. st.Close()
  723. return false
  724. }
  725. if s.drain {
  726. // Transport added after we drained our existing conns: drain it
  727. // immediately.
  728. st.Drain()
  729. }
  730. s.conns[st] = true
  731. return true
  732. }
  733. func (s *Server) removeConn(st transport.ServerTransport) {
  734. s.mu.Lock()
  735. defer s.mu.Unlock()
  736. if s.conns != nil {
  737. delete(s.conns, st)
  738. s.cv.Broadcast()
  739. }
  740. }
  741. func (s *Server) channelzMetric() *channelz.ServerInternalMetric {
  742. return &channelz.ServerInternalMetric{
  743. CallsStarted: atomic.LoadInt64(&s.czData.callsStarted),
  744. CallsSucceeded: atomic.LoadInt64(&s.czData.callsSucceeded),
  745. CallsFailed: atomic.LoadInt64(&s.czData.callsFailed),
  746. LastCallStartedTimestamp: time.Unix(0, atomic.LoadInt64(&s.czData.lastCallStartedTime)),
  747. }
  748. }
  749. func (s *Server) incrCallsStarted() {
  750. atomic.AddInt64(&s.czData.callsStarted, 1)
  751. atomic.StoreInt64(&s.czData.lastCallStartedTime, time.Now().UnixNano())
  752. }
  753. func (s *Server) incrCallsSucceeded() {
  754. atomic.AddInt64(&s.czData.callsSucceeded, 1)
  755. }
  756. func (s *Server) incrCallsFailed() {
  757. atomic.AddInt64(&s.czData.callsFailed, 1)
  758. }
  759. func (s *Server) sendResponse(t transport.ServerTransport, stream *transport.Stream, msg interface{}, cp Compressor, opts *transport.Options, comp encoding.Compressor) error {
  760. data, err := encode(s.getCodec(stream.ContentSubtype()), msg)
  761. if err != nil {
  762. grpclog.Errorln("grpc: server failed to encode response: ", err)
  763. return err
  764. }
  765. compData, err := compress(data, cp, comp)
  766. if err != nil {
  767. grpclog.Errorln("grpc: server failed to compress response: ", err)
  768. return err
  769. }
  770. hdr, payload := msgHeader(data, compData)
  771. // TODO(dfawley): should we be checking len(data) instead?
  772. if len(payload) > s.opts.maxSendMessageSize {
  773. return status.Errorf(codes.ResourceExhausted, "grpc: trying to send message larger than max (%d vs. %d)", len(payload), s.opts.maxSendMessageSize)
  774. }
  775. err = t.Write(stream, hdr, payload, opts)
  776. if err == nil && s.opts.statsHandler != nil {
  777. s.opts.statsHandler.HandleRPC(stream.Context(), outPayload(false, msg, data, payload, time.Now()))
  778. }
  779. return err
  780. }
  781. func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.Stream, srv *service, md *MethodDesc, trInfo *traceInfo) (err error) {
  782. sh := s.opts.statsHandler
  783. if sh != nil || trInfo != nil || channelz.IsOn() {
  784. if channelz.IsOn() {
  785. s.incrCallsStarted()
  786. }
  787. var statsBegin *stats.Begin
  788. if sh != nil {
  789. beginTime := time.Now()
  790. statsBegin = &stats.Begin{
  791. BeginTime: beginTime,
  792. }
  793. sh.HandleRPC(stream.Context(), statsBegin)
  794. }
  795. if trInfo != nil {
  796. trInfo.tr.LazyLog(&trInfo.firstLine, false)
  797. }
  798. // The deferred error handling for tracing, stats handler and channelz are
  799. // combined into one function to reduce stack usage -- a defer takes ~56-64
  800. // bytes on the stack, so overflowing the stack will require a stack
  801. // re-allocation, which is expensive.
  802. //
  803. // To maintain behavior similar to separate deferred statements, statements
  804. // should be executed in the reverse order. That is, tracing first, stats
  805. // handler second, and channelz last. Note that panics *within* defers will
  806. // lead to different behavior, but that's an acceptable compromise; that
  807. // would be undefined behavior territory anyway.
  808. defer func() {
  809. if trInfo != nil {
  810. if err != nil && err != io.EOF {
  811. trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
  812. trInfo.tr.SetError()
  813. }
  814. trInfo.tr.Finish()
  815. }
  816. if sh != nil {
  817. end := &stats.End{
  818. BeginTime: statsBegin.BeginTime,
  819. EndTime: time.Now(),
  820. }
  821. if err != nil && err != io.EOF {
  822. end.Error = toRPCErr(err)
  823. }
  824. sh.HandleRPC(stream.Context(), end)
  825. }
  826. if channelz.IsOn() {
  827. if err != nil && err != io.EOF {
  828. s.incrCallsFailed()
  829. } else {
  830. s.incrCallsSucceeded()
  831. }
  832. }
  833. }()
  834. }
  835. binlog := binarylog.GetMethodLogger(stream.Method())
  836. if binlog != nil {
  837. ctx := stream.Context()
  838. md, _ := metadata.FromIncomingContext(ctx)
  839. logEntry := &binarylog.ClientHeader{
  840. Header: md,
  841. MethodName: stream.Method(),
  842. PeerAddr: nil,
  843. }
  844. if deadline, ok := ctx.Deadline(); ok {
  845. logEntry.Timeout = time.Until(deadline)
  846. if logEntry.Timeout < 0 {
  847. logEntry.Timeout = 0
  848. }
  849. }
  850. if a := md[":authority"]; len(a) > 0 {
  851. logEntry.Authority = a[0]
  852. }
  853. if peer, ok := peer.FromContext(ctx); ok {
  854. logEntry.PeerAddr = peer.Addr
  855. }
  856. binlog.Log(logEntry)
  857. }
  858. // comp and cp are used for compression. decomp and dc are used for
  859. // decompression. If comp and decomp are both set, they are the same;
  860. // however they are kept separate to ensure that at most one of the
  861. // compressor/decompressor variable pairs are set for use later.
  862. var comp, decomp encoding.Compressor
  863. var cp Compressor
  864. var dc Decompressor
  865. // If dc is set and matches the stream's compression, use it. Otherwise, try
  866. // to find a matching registered compressor for decomp.
  867. if rc := stream.RecvCompress(); s.opts.dc != nil && s.opts.dc.Type() == rc {
  868. dc = s.opts.dc
  869. } else if rc != "" && rc != encoding.Identity {
  870. decomp = encoding.GetCompressor(rc)
  871. if decomp == nil {
  872. st := status.Newf(codes.Unimplemented, "grpc: Decompressor is not installed for grpc-encoding %q", rc)
  873. t.WriteStatus(stream, st)
  874. return st.Err()
  875. }
  876. }
  877. // If cp is set, use it. Otherwise, attempt to compress the response using
  878. // the incoming message compression method.
  879. //
  880. // NOTE: this needs to be ahead of all handling, https://github.com/grpc/grpc-go/issues/686.
  881. if s.opts.cp != nil {
  882. cp = s.opts.cp
  883. stream.SetSendCompress(cp.Type())
  884. } else if rc := stream.RecvCompress(); rc != "" && rc != encoding.Identity {
  885. // Legacy compressor not specified; attempt to respond with same encoding.
  886. comp = encoding.GetCompressor(rc)
  887. if comp != nil {
  888. stream.SetSendCompress(rc)
  889. }
  890. }
  891. var payInfo *payloadInfo
  892. if sh != nil || binlog != nil {
  893. payInfo = &payloadInfo{}
  894. }
  895. d, err := recvAndDecompress(&parser{r: stream}, stream, dc, s.opts.maxReceiveMessageSize, payInfo, decomp)
  896. if err != nil {
  897. if st, ok := status.FromError(err); ok {
  898. if e := t.WriteStatus(stream, st); e != nil {
  899. grpclog.Warningf("grpc: Server.processUnaryRPC failed to write status %v", e)
  900. }
  901. }
  902. return err
  903. }
  904. if channelz.IsOn() {
  905. t.IncrMsgRecv()
  906. }
  907. df := func(v interface{}) error {
  908. if err := s.getCodec(stream.ContentSubtype()).Unmarshal(d, v); err != nil {
  909. return status.Errorf(codes.Internal, "grpc: error unmarshalling request: %v", err)
  910. }
  911. if sh != nil {
  912. sh.HandleRPC(stream.Context(), &stats.InPayload{
  913. RecvTime: time.Now(),
  914. Payload: v,
  915. WireLength: payInfo.wireLength,
  916. Data: d,
  917. Length: len(d),
  918. })
  919. }
  920. if binlog != nil {
  921. binlog.Log(&binarylog.ClientMessage{
  922. Message: d,
  923. })
  924. }
  925. if trInfo != nil {
  926. trInfo.tr.LazyLog(&payload{sent: false, msg: v}, true)
  927. }
  928. return nil
  929. }
  930. ctx := NewContextWithServerTransportStream(stream.Context(), stream)
  931. reply, appErr := md.Handler(srv.server, ctx, df, s.opts.unaryInt)
  932. if appErr != nil {
  933. appStatus, ok := status.FromError(appErr)
  934. if !ok {
  935. // Convert appErr if it is not a grpc status error.
  936. appErr = status.Error(codes.Unknown, appErr.Error())
  937. appStatus, _ = status.FromError(appErr)
  938. }
  939. if trInfo != nil {
  940. trInfo.tr.LazyLog(stringer(appStatus.Message()), true)
  941. trInfo.tr.SetError()
  942. }
  943. if e := t.WriteStatus(stream, appStatus); e != nil {
  944. grpclog.Warningf("grpc: Server.processUnaryRPC failed to write status: %v", e)
  945. }
  946. if binlog != nil {
  947. if h, _ := stream.Header(); h.Len() > 0 {
  948. // Only log serverHeader if there was header. Otherwise it can
  949. // be trailer only.
  950. binlog.Log(&binarylog.ServerHeader{
  951. Header: h,
  952. })
  953. }
  954. binlog.Log(&binarylog.ServerTrailer{
  955. Trailer: stream.Trailer(),
  956. Err: appErr,
  957. })
  958. }
  959. return appErr
  960. }
  961. if trInfo != nil {
  962. trInfo.tr.LazyLog(stringer("OK"), false)
  963. }
  964. opts := &transport.Options{Last: true}
  965. if err := s.sendResponse(t, stream, reply, cp, opts, comp); err != nil {
  966. if err == io.EOF {
  967. // The entire stream is done (for unary RPC only).
  968. return err
  969. }
  970. if s, ok := status.FromError(err); ok {
  971. if e := t.WriteStatus(stream, s); e != nil {
  972. grpclog.Warningf("grpc: Server.processUnaryRPC failed to write status: %v", e)
  973. }
  974. } else {
  975. switch st := err.(type) {
  976. case transport.ConnectionError:
  977. // Nothing to do here.
  978. default:
  979. panic(fmt.Sprintf("grpc: Unexpected error (%T) from sendResponse: %v", st, st))
  980. }
  981. }
  982. if binlog != nil {
  983. h, _ := stream.Header()
  984. binlog.Log(&binarylog.ServerHeader{
  985. Header: h,
  986. })
  987. binlog.Log(&binarylog.ServerTrailer{
  988. Trailer: stream.Trailer(),
  989. Err: appErr,
  990. })
  991. }
  992. return err
  993. }
  994. if binlog != nil {
  995. h, _ := stream.Header()
  996. binlog.Log(&binarylog.ServerHeader{
  997. Header: h,
  998. })
  999. binlog.Log(&binarylog.ServerMessage{
  1000. Message: reply,
  1001. })
  1002. }
  1003. if channelz.IsOn() {
  1004. t.IncrMsgSent()
  1005. }
  1006. if trInfo != nil {
  1007. trInfo.tr.LazyLog(&payload{sent: true, msg: reply}, true)
  1008. }
  1009. // TODO: Should we be logging if writing status failed here, like above?
  1010. // Should the logging be in WriteStatus? Should we ignore the WriteStatus
  1011. // error or allow the stats handler to see it?
  1012. err = t.WriteStatus(stream, statusOK)
  1013. if binlog != nil {
  1014. binlog.Log(&binarylog.ServerTrailer{
  1015. Trailer: stream.Trailer(),
  1016. Err: appErr,
  1017. })
  1018. }
  1019. return err
  1020. }
  1021. func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transport.Stream, srv *service, sd *StreamDesc, trInfo *traceInfo) (err error) {
  1022. if channelz.IsOn() {
  1023. s.incrCallsStarted()
  1024. }
  1025. sh := s.opts.statsHandler
  1026. var statsBegin *stats.Begin
  1027. if sh != nil {
  1028. beginTime := time.Now()
  1029. statsBegin = &stats.Begin{
  1030. BeginTime: beginTime,
  1031. }
  1032. sh.HandleRPC(stream.Context(), statsBegin)
  1033. }
  1034. ctx := NewContextWithServerTransportStream(stream.Context(), stream)
  1035. ss := &serverStream{
  1036. ctx: ctx,
  1037. t: t,
  1038. s: stream,
  1039. p: &parser{r: stream},
  1040. codec: s.getCodec(stream.ContentSubtype()),
  1041. maxReceiveMessageSize: s.opts.maxReceiveMessageSize,
  1042. maxSendMessageSize: s.opts.maxSendMessageSize,
  1043. trInfo: trInfo,
  1044. statsHandler: sh,
  1045. }
  1046. if sh != nil || trInfo != nil || channelz.IsOn() {
  1047. // See comment in processUnaryRPC on defers.
  1048. defer func() {
  1049. if trInfo != nil {
  1050. ss.mu.Lock()
  1051. if err != nil && err != io.EOF {
  1052. ss.trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
  1053. ss.trInfo.tr.SetError()
  1054. }
  1055. ss.trInfo.tr.Finish()
  1056. ss.trInfo.tr = nil
  1057. ss.mu.Unlock()
  1058. }
  1059. if sh != nil {
  1060. end := &stats.End{
  1061. BeginTime: statsBegin.BeginTime,
  1062. EndTime: time.Now(),
  1063. }
  1064. if err != nil && err != io.EOF {
  1065. end.Error = toRPCErr(err)
  1066. }
  1067. sh.HandleRPC(stream.Context(), end)
  1068. }
  1069. if channelz.IsOn() {
  1070. if err != nil && err != io.EOF {
  1071. s.incrCallsFailed()
  1072. } else {
  1073. s.incrCallsSucceeded()
  1074. }
  1075. }
  1076. }()
  1077. }
  1078. ss.binlog = binarylog.GetMethodLogger(stream.Method())
  1079. if ss.binlog != nil {
  1080. md, _ := metadata.FromIncomingContext(ctx)
  1081. logEntry := &binarylog.ClientHeader{
  1082. Header: md,
  1083. MethodName: stream.Method(),
  1084. PeerAddr: nil,
  1085. }
  1086. if deadline, ok := ctx.Deadline(); ok {
  1087. logEntry.Timeout = time.Until(deadline)
  1088. if logEntry.Timeout < 0 {
  1089. logEntry.Timeout = 0
  1090. }
  1091. }
  1092. if a := md[":authority"]; len(a) > 0 {
  1093. logEntry.Authority = a[0]
  1094. }
  1095. if peer, ok := peer.FromContext(ss.Context()); ok {
  1096. logEntry.PeerAddr = peer.Addr
  1097. }
  1098. ss.binlog.Log(logEntry)
  1099. }
  1100. // If dc is set and matches the stream's compression, use it. Otherwise, try
  1101. // to find a matching registered compressor for decomp.
  1102. if rc := stream.RecvCompress(); s.opts.dc != nil && s.opts.dc.Type() == rc {
  1103. ss.dc = s.opts.dc
  1104. } else if rc != "" && rc != encoding.Identity {
  1105. ss.decomp = encoding.GetCompressor(rc)
  1106. if ss.decomp == nil {
  1107. st := status.Newf(codes.Unimplemented, "grpc: Decompressor is not installed for grpc-encoding %q", rc)
  1108. t.WriteStatus(ss.s, st)
  1109. return st.Err()
  1110. }
  1111. }
  1112. // If cp is set, use it. Otherwise, attempt to compress the response using
  1113. // the incoming message compression method.
  1114. //
  1115. // NOTE: this needs to be ahead of all handling, https://github.com/grpc/grpc-go/issues/686.
  1116. if s.opts.cp != nil {
  1117. ss.cp = s.opts.cp
  1118. stream.SetSendCompress(s.opts.cp.Type())
  1119. } else if rc := stream.RecvCompress(); rc != "" && rc != encoding.Identity {
  1120. // Legacy compressor not specified; attempt to respond with same encoding.
  1121. ss.comp = encoding.GetCompressor(rc)
  1122. if ss.comp != nil {
  1123. stream.SetSendCompress(rc)
  1124. }
  1125. }
  1126. if trInfo != nil {
  1127. trInfo.tr.LazyLog(&trInfo.firstLine, false)
  1128. }
  1129. var appErr error
  1130. var server interface{}
  1131. if srv != nil {
  1132. server = srv.server
  1133. }
  1134. if s.opts.streamInt == nil {
  1135. appErr = sd.Handler(server, ss)
  1136. } else {
  1137. info := &StreamServerInfo{
  1138. FullMethod: stream.Method(),
  1139. IsClientStream: sd.ClientStreams,
  1140. IsServerStream: sd.ServerStreams,
  1141. }
  1142. appErr = s.opts.streamInt(server, ss, info, sd.Handler)
  1143. }
  1144. if appErr != nil {
  1145. appStatus, ok := status.FromError(appErr)
  1146. if !ok {
  1147. appStatus = status.New(codes.Unknown, appErr.Error())
  1148. appErr = appStatus.Err()
  1149. }
  1150. if trInfo != nil {
  1151. ss.mu.Lock()
  1152. ss.trInfo.tr.LazyLog(stringer(appStatus.Message()), true)
  1153. ss.trInfo.tr.SetError()
  1154. ss.mu.Unlock()
  1155. }
  1156. t.WriteStatus(ss.s, appStatus)
  1157. if ss.binlog != nil {
  1158. ss.binlog.Log(&binarylog.ServerTrailer{
  1159. Trailer: ss.s.Trailer(),
  1160. Err: appErr,
  1161. })
  1162. }
  1163. // TODO: Should we log an error from WriteStatus here and below?
  1164. return appErr
  1165. }
  1166. if trInfo != nil {
  1167. ss.mu.Lock()
  1168. ss.trInfo.tr.LazyLog(stringer("OK"), false)
  1169. ss.mu.Unlock()
  1170. }
  1171. err = t.WriteStatus(ss.s, statusOK)
  1172. if ss.binlog != nil {
  1173. ss.binlog.Log(&binarylog.ServerTrailer{
  1174. Trailer: ss.s.Trailer(),
  1175. Err: appErr,
  1176. })
  1177. }
  1178. return err
  1179. }
  1180. func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Stream, trInfo *traceInfo) {
  1181. sm := stream.Method()
  1182. if sm != "" && sm[0] == '/' {
  1183. sm = sm[1:]
  1184. }
  1185. pos := strings.LastIndex(sm, "/")
  1186. if pos == -1 {
  1187. if trInfo != nil {
  1188. trInfo.tr.LazyLog(&fmtStringer{"Malformed method name %q", []interface{}{sm}}, true)
  1189. trInfo.tr.SetError()
  1190. }
  1191. errDesc := fmt.Sprintf("malformed method name: %q", stream.Method())
  1192. if err := t.WriteStatus(stream, status.New(codes.ResourceExhausted, errDesc)); err != nil {
  1193. if trInfo != nil {
  1194. trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
  1195. trInfo.tr.SetError()
  1196. }
  1197. grpclog.Warningf("grpc: Server.handleStream failed to write status: %v", err)
  1198. }
  1199. if trInfo != nil {
  1200. trInfo.tr.Finish()
  1201. }
  1202. return
  1203. }
  1204. service := sm[:pos]
  1205. method := sm[pos+1:]
  1206. srv, knownService := s.m[service]
  1207. if knownService {
  1208. if md, ok := srv.md[method]; ok {
  1209. s.processUnaryRPC(t, stream, srv, md, trInfo)
  1210. return
  1211. }
  1212. if sd, ok := srv.sd[method]; ok {
  1213. s.processStreamingRPC(t, stream, srv, sd, trInfo)
  1214. return
  1215. }
  1216. }
  1217. // Unknown service, or known server unknown method.
  1218. if unknownDesc := s.opts.unknownStreamDesc; unknownDesc != nil {
  1219. s.processStreamingRPC(t, stream, nil, unknownDesc, trInfo)
  1220. return
  1221. }
  1222. var errDesc string
  1223. if !knownService {
  1224. errDesc = fmt.Sprintf("unknown service %v", service)
  1225. } else {
  1226. errDesc = fmt.Sprintf("unknown method %v for service %v", method, service)
  1227. }
  1228. if trInfo != nil {
  1229. trInfo.tr.LazyPrintf("%s", errDesc)
  1230. trInfo.tr.SetError()
  1231. }
  1232. if err := t.WriteStatus(stream, status.New(codes.Unimplemented, errDesc)); err != nil {
  1233. if trInfo != nil {
  1234. trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
  1235. trInfo.tr.SetError()
  1236. }
  1237. grpclog.Warningf("grpc: Server.handleStream failed to write status: %v", err)
  1238. }
  1239. if trInfo != nil {
  1240. trInfo.tr.Finish()
  1241. }
  1242. }
  1243. // The key to save ServerTransportStream in the context.
  1244. type streamKey struct{}
  1245. // NewContextWithServerTransportStream creates a new context from ctx and
  1246. // attaches stream to it.
  1247. //
  1248. // This API is EXPERIMENTAL.
  1249. func NewContextWithServerTransportStream(ctx context.Context, stream ServerTransportStream) context.Context {
  1250. return context.WithValue(ctx, streamKey{}, stream)
  1251. }
  1252. // ServerTransportStream is a minimal interface that a transport stream must
  1253. // implement. This can be used to mock an actual transport stream for tests of
  1254. // handler code that use, for example, grpc.SetHeader (which requires some
  1255. // stream to be in context).
  1256. //
  1257. // See also NewContextWithServerTransportStream.
  1258. //
  1259. // This API is EXPERIMENTAL.
  1260. type ServerTransportStream interface {
  1261. Method() string
  1262. SetHeader(md metadata.MD) error
  1263. SendHeader(md metadata.MD) error
  1264. SetTrailer(md metadata.MD) error
  1265. }
  1266. // ServerTransportStreamFromContext returns the ServerTransportStream saved in
  1267. // ctx. Returns nil if the given context has no stream associated with it
  1268. // (which implies it is not an RPC invocation context).
  1269. //
  1270. // This API is EXPERIMENTAL.
  1271. func ServerTransportStreamFromContext(ctx context.Context) ServerTransportStream {
  1272. s, _ := ctx.Value(streamKey{}).(ServerTransportStream)
  1273. return s
  1274. }
  1275. // Stop stops the gRPC server. It immediately closes all open
  1276. // connections and listeners.
  1277. // It cancels all active RPCs on the server side and the corresponding
  1278. // pending RPCs on the client side will get notified by connection
  1279. // errors.
  1280. func (s *Server) Stop() {
  1281. s.quit.Fire()
  1282. defer func() {
  1283. s.serveWG.Wait()
  1284. s.done.Fire()
  1285. }()
  1286. s.channelzRemoveOnce.Do(func() {
  1287. if channelz.IsOn() {
  1288. channelz.RemoveEntry(s.channelzID)
  1289. }
  1290. })
  1291. s.mu.Lock()
  1292. listeners := s.lis
  1293. s.lis = nil
  1294. st := s.conns
  1295. s.conns = nil
  1296. // interrupt GracefulStop if Stop and GracefulStop are called concurrently.
  1297. s.cv.Broadcast()
  1298. s.mu.Unlock()
  1299. for lis := range listeners {
  1300. lis.Close()
  1301. }
  1302. for c := range st {
  1303. c.Close()
  1304. }
  1305. s.mu.Lock()
  1306. if s.events != nil {
  1307. s.events.Finish()
  1308. s.events = nil
  1309. }
  1310. s.mu.Unlock()
  1311. }
  1312. // GracefulStop stops the gRPC server gracefully. It stops the server from
  1313. // accepting new connections and RPCs and blocks until all the pending RPCs are
  1314. // finished.
  1315. func (s *Server) GracefulStop() {
  1316. s.quit.Fire()
  1317. defer s.done.Fire()
  1318. s.channelzRemoveOnce.Do(func() {
  1319. if channelz.IsOn() {
  1320. channelz.RemoveEntry(s.channelzID)
  1321. }
  1322. })
  1323. s.mu.Lock()
  1324. if s.conns == nil {
  1325. s.mu.Unlock()
  1326. return
  1327. }
  1328. for lis := range s.lis {
  1329. lis.Close()
  1330. }
  1331. s.lis = nil
  1332. if !s.drain {
  1333. for st := range s.conns {
  1334. st.Drain()
  1335. }
  1336. s.drain = true
  1337. }
  1338. // Wait for serving threads to be ready to exit. Only then can we be sure no
  1339. // new conns will be created.
  1340. s.mu.Unlock()
  1341. s.serveWG.Wait()
  1342. s.mu.Lock()
  1343. for len(s.conns) != 0 {
  1344. s.cv.Wait()
  1345. }
  1346. s.conns = nil
  1347. if s.events != nil {
  1348. s.events.Finish()
  1349. s.events = nil
  1350. }
  1351. s.mu.Unlock()
  1352. }
  1353. // contentSubtype must be lowercase
  1354. // cannot return nil
  1355. func (s *Server) getCodec(contentSubtype string) baseCodec {
  1356. if s.opts.codec != nil {
  1357. return s.opts.codec
  1358. }
  1359. if contentSubtype == "" {
  1360. return encoding.GetCodec(proto.Name)
  1361. }
  1362. codec := encoding.GetCodec(contentSubtype)
  1363. if codec == nil {
  1364. return encoding.GetCodec(proto.Name)
  1365. }
  1366. return codec
  1367. }
  1368. // SetHeader sets the header metadata.
  1369. // When called multiple times, all the provided metadata will be merged.
  1370. // All the metadata will be sent out when one of the following happens:
  1371. // - grpc.SendHeader() is called;
  1372. // - The first response is sent out;
  1373. // - An RPC status is sent out (error or success).
  1374. func SetHeader(ctx context.Context, md metadata.MD) error {
  1375. if md.Len() == 0 {
  1376. return nil
  1377. }
  1378. stream := ServerTransportStreamFromContext(ctx)
  1379. if stream == nil {
  1380. return status.Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx)
  1381. }
  1382. return stream.SetHeader(md)
  1383. }
  1384. // SendHeader sends header metadata. It may be called at most once.
  1385. // The provided md and headers set by SetHeader() will be sent.
  1386. func SendHeader(ctx context.Context, md metadata.MD) error {
  1387. stream := ServerTransportStreamFromContext(ctx)
  1388. if stream == nil {
  1389. return status.Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx)
  1390. }
  1391. if err := stream.SendHeader(md); err != nil {
  1392. return toRPCErr(err)
  1393. }
  1394. return nil
  1395. }
  1396. // SetTrailer sets the trailer metadata that will be sent when an RPC returns.
  1397. // When called more than once, all the provided metadata will be merged.
  1398. func SetTrailer(ctx context.Context, md metadata.MD) error {
  1399. if md.Len() == 0 {
  1400. return nil
  1401. }
  1402. stream := ServerTransportStreamFromContext(ctx)
  1403. if stream == nil {
  1404. return status.Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx)
  1405. }
  1406. return stream.SetTrailer(md)
  1407. }
  1408. // Method returns the method string for the server context. The returned
  1409. // string is in the format of "/service/method".
  1410. func Method(ctx context.Context) (string, bool) {
  1411. s := ServerTransportStreamFromContext(ctx)
  1412. if s == nil {
  1413. return "", false
  1414. }
  1415. return s.Method(), true
  1416. }
  1417. type channelzServer struct {
  1418. s *Server
  1419. }
  1420. func (c *channelzServer) ChannelzMetric() *channelz.ServerInternalMetric {
  1421. return c.s.channelzMetric()
  1422. }