api.go 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646
  1. // Copyright 2011 Google Inc. All rights reserved.
  2. // Use of this source code is governed by the Apache 2.0
  3. // license that can be found in the LICENSE file.
  4. // +build !appengine
  5. package internal
  6. import (
  7. "bytes"
  8. "errors"
  9. "fmt"
  10. "io/ioutil"
  11. "log"
  12. "net"
  13. "net/http"
  14. "net/url"
  15. "os"
  16. "runtime"
  17. "strconv"
  18. "strings"
  19. "sync"
  20. "sync/atomic"
  21. "time"
  22. "github.com/golang/protobuf/proto"
  23. netcontext "golang.org/x/net/context"
  24. basepb "google.golang.org/appengine/internal/base"
  25. logpb "google.golang.org/appengine/internal/log"
  26. remotepb "google.golang.org/appengine/internal/remote_api"
  27. )
  28. const (
  29. apiPath = "/rpc_http"
  30. )
  31. var (
  32. // Incoming headers.
  33. ticketHeader = http.CanonicalHeaderKey("X-AppEngine-API-Ticket")
  34. dapperHeader = http.CanonicalHeaderKey("X-Google-DapperTraceInfo")
  35. traceHeader = http.CanonicalHeaderKey("X-Cloud-Trace-Context")
  36. curNamespaceHeader = http.CanonicalHeaderKey("X-AppEngine-Current-Namespace")
  37. userIPHeader = http.CanonicalHeaderKey("X-AppEngine-User-IP")
  38. remoteAddrHeader = http.CanonicalHeaderKey("X-AppEngine-Remote-Addr")
  39. // Outgoing headers.
  40. apiEndpointHeader = http.CanonicalHeaderKey("X-Google-RPC-Service-Endpoint")
  41. apiEndpointHeaderValue = []string{"app-engine-apis"}
  42. apiMethodHeader = http.CanonicalHeaderKey("X-Google-RPC-Service-Method")
  43. apiMethodHeaderValue = []string{"/VMRemoteAPI.CallRemoteAPI"}
  44. apiDeadlineHeader = http.CanonicalHeaderKey("X-Google-RPC-Service-Deadline")
  45. apiContentType = http.CanonicalHeaderKey("Content-Type")
  46. apiContentTypeValue = []string{"application/octet-stream"}
  47. logFlushHeader = http.CanonicalHeaderKey("X-AppEngine-Log-Flush-Count")
  48. apiHTTPClient = &http.Client{
  49. Transport: &http.Transport{
  50. Proxy: http.ProxyFromEnvironment,
  51. Dial: limitDial,
  52. },
  53. }
  54. )
  55. func apiURL() *url.URL {
  56. host, port := "appengine.googleapis.internal", "10001"
  57. if h := os.Getenv("API_HOST"); h != "" {
  58. host = h
  59. }
  60. if p := os.Getenv("API_PORT"); p != "" {
  61. port = p
  62. }
  63. return &url.URL{
  64. Scheme: "http",
  65. Host: host + ":" + port,
  66. Path: apiPath,
  67. }
  68. }
  69. func handleHTTP(w http.ResponseWriter, r *http.Request) {
  70. c := &context{
  71. req: r,
  72. outHeader: w.Header(),
  73. apiURL: apiURL(),
  74. }
  75. stopFlushing := make(chan int)
  76. ctxs.Lock()
  77. ctxs.m[r] = c
  78. ctxs.Unlock()
  79. defer func() {
  80. ctxs.Lock()
  81. delete(ctxs.m, r)
  82. ctxs.Unlock()
  83. }()
  84. // Patch up RemoteAddr so it looks reasonable.
  85. if addr := r.Header.Get(userIPHeader); addr != "" {
  86. r.RemoteAddr = addr
  87. } else if addr = r.Header.Get(remoteAddrHeader); addr != "" {
  88. r.RemoteAddr = addr
  89. } else {
  90. // Should not normally reach here, but pick a sensible default anyway.
  91. r.RemoteAddr = "127.0.0.1"
  92. }
  93. // The address in the headers will most likely be of these forms:
  94. // 123.123.123.123
  95. // 2001:db8::1
  96. // net/http.Request.RemoteAddr is specified to be in "IP:port" form.
  97. if _, _, err := net.SplitHostPort(r.RemoteAddr); err != nil {
  98. // Assume the remote address is only a host; add a default port.
  99. r.RemoteAddr = net.JoinHostPort(r.RemoteAddr, "80")
  100. }
  101. // Start goroutine responsible for flushing app logs.
  102. // This is done after adding c to ctx.m (and stopped before removing it)
  103. // because flushing logs requires making an API call.
  104. go c.logFlusher(stopFlushing)
  105. executeRequestSafely(c, r)
  106. c.outHeader = nil // make sure header changes aren't respected any more
  107. stopFlushing <- 1 // any logging beyond this point will be dropped
  108. // Flush any pending logs asynchronously.
  109. c.pendingLogs.Lock()
  110. flushes := c.pendingLogs.flushes
  111. if len(c.pendingLogs.lines) > 0 {
  112. flushes++
  113. }
  114. c.pendingLogs.Unlock()
  115. go c.flushLog(false)
  116. w.Header().Set(logFlushHeader, strconv.Itoa(flushes))
  117. // Avoid nil Write call if c.Write is never called.
  118. if c.outCode != 0 {
  119. w.WriteHeader(c.outCode)
  120. }
  121. if c.outBody != nil {
  122. w.Write(c.outBody)
  123. }
  124. }
  125. func executeRequestSafely(c *context, r *http.Request) {
  126. defer func() {
  127. if x := recover(); x != nil {
  128. logf(c, 4, "%s", renderPanic(x)) // 4 == critical
  129. c.outCode = 500
  130. }
  131. }()
  132. http.DefaultServeMux.ServeHTTP(c, r)
  133. }
  134. func renderPanic(x interface{}) string {
  135. buf := make([]byte, 16<<10) // 16 KB should be plenty
  136. buf = buf[:runtime.Stack(buf, false)]
  137. // Remove the first few stack frames:
  138. // this func
  139. // the recover closure in the caller
  140. // That will root the stack trace at the site of the panic.
  141. const (
  142. skipStart = "internal.renderPanic"
  143. skipFrames = 2
  144. )
  145. start := bytes.Index(buf, []byte(skipStart))
  146. p := start
  147. for i := 0; i < skipFrames*2 && p+1 < len(buf); i++ {
  148. p = bytes.IndexByte(buf[p+1:], '\n') + p + 1
  149. if p < 0 {
  150. break
  151. }
  152. }
  153. if p >= 0 {
  154. // buf[start:p+1] is the block to remove.
  155. // Copy buf[p+1:] over buf[start:] and shrink buf.
  156. copy(buf[start:], buf[p+1:])
  157. buf = buf[:len(buf)-(p+1-start)]
  158. }
  159. // Add panic heading.
  160. head := fmt.Sprintf("panic: %v\n\n", x)
  161. if len(head) > len(buf) {
  162. // Extremely unlikely to happen.
  163. return head
  164. }
  165. copy(buf[len(head):], buf)
  166. copy(buf, head)
  167. return string(buf)
  168. }
  169. var ctxs = struct {
  170. sync.Mutex
  171. m map[*http.Request]*context
  172. bg *context // background context, lazily initialized
  173. // dec is used by tests to decorate the netcontext.Context returned
  174. // for a given request. This allows tests to add overrides (such as
  175. // WithAppIDOverride) to the context. The map is nil outside tests.
  176. dec map[*http.Request]func(netcontext.Context) netcontext.Context
  177. }{
  178. m: make(map[*http.Request]*context),
  179. }
  180. // context represents the context of an in-flight HTTP request.
  181. // It implements the appengine.Context and http.ResponseWriter interfaces.
  182. type context struct {
  183. req *http.Request
  184. outCode int
  185. outHeader http.Header
  186. outBody []byte
  187. pendingLogs struct {
  188. sync.Mutex
  189. lines []*logpb.UserAppLogLine
  190. flushes int
  191. }
  192. apiURL *url.URL
  193. }
  194. var contextKey = "holds a *context"
  195. func fromContext(ctx netcontext.Context) *context {
  196. c, _ := ctx.Value(&contextKey).(*context)
  197. return c
  198. }
  199. func withContext(parent netcontext.Context, c *context) netcontext.Context {
  200. ctx := netcontext.WithValue(parent, &contextKey, c)
  201. if ns := c.req.Header.Get(curNamespaceHeader); ns != "" {
  202. ctx = withNamespace(ctx, ns)
  203. }
  204. return ctx
  205. }
  206. func toContext(c *context) netcontext.Context {
  207. return withContext(netcontext.Background(), c)
  208. }
  209. func IncomingHeaders(ctx netcontext.Context) http.Header {
  210. if c := fromContext(ctx); c != nil {
  211. return c.req.Header
  212. }
  213. return nil
  214. }
  215. func WithContext(parent netcontext.Context, req *http.Request) netcontext.Context {
  216. ctxs.Lock()
  217. c := ctxs.m[req]
  218. d := ctxs.dec[req]
  219. ctxs.Unlock()
  220. if d != nil {
  221. parent = d(parent)
  222. }
  223. if c == nil {
  224. // Someone passed in an http.Request that is not in-flight.
  225. // We panic here rather than panicking at a later point
  226. // so that stack traces will be more sensible.
  227. log.Panic("appengine: NewContext passed an unknown http.Request")
  228. }
  229. return withContext(parent, c)
  230. }
  231. func BackgroundContext() netcontext.Context {
  232. ctxs.Lock()
  233. defer ctxs.Unlock()
  234. if ctxs.bg != nil {
  235. return toContext(ctxs.bg)
  236. }
  237. // Compute background security ticket.
  238. appID := partitionlessAppID()
  239. escAppID := strings.Replace(strings.Replace(appID, ":", "_", -1), ".", "_", -1)
  240. majVersion := VersionID(nil)
  241. if i := strings.Index(majVersion, "."); i > 0 {
  242. majVersion = majVersion[:i]
  243. }
  244. ticket := fmt.Sprintf("%s/%s.%s.%s", escAppID, ModuleName(nil), majVersion, InstanceID())
  245. ctxs.bg = &context{
  246. req: &http.Request{
  247. Header: http.Header{
  248. ticketHeader: []string{ticket},
  249. },
  250. },
  251. apiURL: apiURL(),
  252. }
  253. // TODO(dsymonds): Wire up the shutdown handler to do a final flush.
  254. go ctxs.bg.logFlusher(make(chan int))
  255. return toContext(ctxs.bg)
  256. }
  257. // RegisterTestRequest registers the HTTP request req for testing, such that
  258. // any API calls are sent to the provided URL. It returns a closure to delete
  259. // the registration.
  260. // It should only be used by aetest package.
  261. func RegisterTestRequest(req *http.Request, apiURL *url.URL, decorate func(netcontext.Context) netcontext.Context) func() {
  262. c := &context{
  263. req: req,
  264. apiURL: apiURL,
  265. }
  266. ctxs.Lock()
  267. defer ctxs.Unlock()
  268. if _, ok := ctxs.m[req]; ok {
  269. log.Panic("req already associated with context")
  270. }
  271. if _, ok := ctxs.dec[req]; ok {
  272. log.Panic("req already associated with context")
  273. }
  274. if ctxs.dec == nil {
  275. ctxs.dec = make(map[*http.Request]func(netcontext.Context) netcontext.Context)
  276. }
  277. ctxs.m[req] = c
  278. ctxs.dec[req] = decorate
  279. return func() {
  280. ctxs.Lock()
  281. delete(ctxs.m, req)
  282. delete(ctxs.dec, req)
  283. ctxs.Unlock()
  284. }
  285. }
  286. var errTimeout = &CallError{
  287. Detail: "Deadline exceeded",
  288. Code: int32(remotepb.RpcError_CANCELLED),
  289. Timeout: true,
  290. }
  291. func (c *context) Header() http.Header { return c.outHeader }
  292. // Copied from $GOROOT/src/pkg/net/http/transfer.go. Some response status
  293. // codes do not permit a response body (nor response entity headers such as
  294. // Content-Length, Content-Type, etc).
  295. func bodyAllowedForStatus(status int) bool {
  296. switch {
  297. case status >= 100 && status <= 199:
  298. return false
  299. case status == 204:
  300. return false
  301. case status == 304:
  302. return false
  303. }
  304. return true
  305. }
  306. func (c *context) Write(b []byte) (int, error) {
  307. if c.outCode == 0 {
  308. c.WriteHeader(http.StatusOK)
  309. }
  310. if len(b) > 0 && !bodyAllowedForStatus(c.outCode) {
  311. return 0, http.ErrBodyNotAllowed
  312. }
  313. c.outBody = append(c.outBody, b...)
  314. return len(b), nil
  315. }
  316. func (c *context) WriteHeader(code int) {
  317. if c.outCode != 0 {
  318. logf(c, 3, "WriteHeader called multiple times on request.") // error level
  319. return
  320. }
  321. c.outCode = code
  322. }
  323. func (c *context) post(body []byte, timeout time.Duration) (b []byte, err error) {
  324. hreq := &http.Request{
  325. Method: "POST",
  326. URL: c.apiURL,
  327. Header: http.Header{
  328. apiEndpointHeader: apiEndpointHeaderValue,
  329. apiMethodHeader: apiMethodHeaderValue,
  330. apiContentType: apiContentTypeValue,
  331. apiDeadlineHeader: []string{strconv.FormatFloat(timeout.Seconds(), 'f', -1, 64)},
  332. },
  333. Body: ioutil.NopCloser(bytes.NewReader(body)),
  334. ContentLength: int64(len(body)),
  335. Host: c.apiURL.Host,
  336. }
  337. if info := c.req.Header.Get(dapperHeader); info != "" {
  338. hreq.Header.Set(dapperHeader, info)
  339. }
  340. if info := c.req.Header.Get(traceHeader); info != "" {
  341. hreq.Header.Set(traceHeader, info)
  342. }
  343. tr := apiHTTPClient.Transport.(*http.Transport)
  344. var timedOut int32 // atomic; set to 1 if timed out
  345. t := time.AfterFunc(timeout, func() {
  346. atomic.StoreInt32(&timedOut, 1)
  347. tr.CancelRequest(hreq)
  348. })
  349. defer t.Stop()
  350. defer func() {
  351. // Check if timeout was exceeded.
  352. if atomic.LoadInt32(&timedOut) != 0 {
  353. err = errTimeout
  354. }
  355. }()
  356. hresp, err := apiHTTPClient.Do(hreq)
  357. if err != nil {
  358. return nil, &CallError{
  359. Detail: fmt.Sprintf("service bridge HTTP failed: %v", err),
  360. Code: int32(remotepb.RpcError_UNKNOWN),
  361. }
  362. }
  363. defer hresp.Body.Close()
  364. hrespBody, err := ioutil.ReadAll(hresp.Body)
  365. if hresp.StatusCode != 200 {
  366. return nil, &CallError{
  367. Detail: fmt.Sprintf("service bridge returned HTTP %d (%q)", hresp.StatusCode, hrespBody),
  368. Code: int32(remotepb.RpcError_UNKNOWN),
  369. }
  370. }
  371. if err != nil {
  372. return nil, &CallError{
  373. Detail: fmt.Sprintf("service bridge response bad: %v", err),
  374. Code: int32(remotepb.RpcError_UNKNOWN),
  375. }
  376. }
  377. return hrespBody, nil
  378. }
  379. func Call(ctx netcontext.Context, service, method string, in, out proto.Message) error {
  380. if ns := NamespaceFromContext(ctx); ns != "" {
  381. if fn, ok := NamespaceMods[service]; ok {
  382. fn(in, ns)
  383. }
  384. }
  385. if f, ctx, ok := callOverrideFromContext(ctx); ok {
  386. return f(ctx, service, method, in, out)
  387. }
  388. // Handle already-done contexts quickly.
  389. select {
  390. case <-ctx.Done():
  391. return ctx.Err()
  392. default:
  393. }
  394. c := fromContext(ctx)
  395. if c == nil {
  396. // Give a good error message rather than a panic lower down.
  397. return errors.New("not an App Engine context")
  398. }
  399. // Apply transaction modifications if we're in a transaction.
  400. if t := transactionFromContext(ctx); t != nil {
  401. if t.finished {
  402. return errors.New("transaction context has expired")
  403. }
  404. applyTransaction(in, &t.transaction)
  405. }
  406. // Default RPC timeout is 60s.
  407. timeout := 60 * time.Second
  408. if deadline, ok := ctx.Deadline(); ok {
  409. timeout = deadline.Sub(time.Now())
  410. }
  411. data, err := proto.Marshal(in)
  412. if err != nil {
  413. return err
  414. }
  415. ticket := c.req.Header.Get(ticketHeader)
  416. req := &remotepb.Request{
  417. ServiceName: &service,
  418. Method: &method,
  419. Request: data,
  420. RequestId: &ticket,
  421. }
  422. hreqBody, err := proto.Marshal(req)
  423. if err != nil {
  424. return err
  425. }
  426. hrespBody, err := c.post(hreqBody, timeout)
  427. if err != nil {
  428. return err
  429. }
  430. res := &remotepb.Response{}
  431. if err := proto.Unmarshal(hrespBody, res); err != nil {
  432. return err
  433. }
  434. if res.RpcError != nil {
  435. ce := &CallError{
  436. Detail: res.RpcError.GetDetail(),
  437. Code: *res.RpcError.Code,
  438. }
  439. switch remotepb.RpcError_ErrorCode(ce.Code) {
  440. case remotepb.RpcError_CANCELLED, remotepb.RpcError_DEADLINE_EXCEEDED:
  441. ce.Timeout = true
  442. }
  443. return ce
  444. }
  445. if res.ApplicationError != nil {
  446. return &APIError{
  447. Service: *req.ServiceName,
  448. Detail: res.ApplicationError.GetDetail(),
  449. Code: *res.ApplicationError.Code,
  450. }
  451. }
  452. if res.Exception != nil || res.JavaException != nil {
  453. // This shouldn't happen, but let's be defensive.
  454. return &CallError{
  455. Detail: "service bridge returned exception",
  456. Code: int32(remotepb.RpcError_UNKNOWN),
  457. }
  458. }
  459. return proto.Unmarshal(res.Response, out)
  460. }
  461. func (c *context) Request() *http.Request {
  462. return c.req
  463. }
  464. func (c *context) addLogLine(ll *logpb.UserAppLogLine) {
  465. // Truncate long log lines.
  466. // TODO(dsymonds): Check if this is still necessary.
  467. const lim = 8 << 10
  468. if len(*ll.Message) > lim {
  469. suffix := fmt.Sprintf("...(length %d)", len(*ll.Message))
  470. ll.Message = proto.String((*ll.Message)[:lim-len(suffix)] + suffix)
  471. }
  472. c.pendingLogs.Lock()
  473. c.pendingLogs.lines = append(c.pendingLogs.lines, ll)
  474. c.pendingLogs.Unlock()
  475. }
  476. var logLevelName = map[int64]string{
  477. 0: "DEBUG",
  478. 1: "INFO",
  479. 2: "WARNING",
  480. 3: "ERROR",
  481. 4: "CRITICAL",
  482. }
  483. func logf(c *context, level int64, format string, args ...interface{}) {
  484. s := fmt.Sprintf(format, args...)
  485. s = strings.TrimRight(s, "\n") // Remove any trailing newline characters.
  486. c.addLogLine(&logpb.UserAppLogLine{
  487. TimestampUsec: proto.Int64(time.Now().UnixNano() / 1e3),
  488. Level: &level,
  489. Message: &s,
  490. })
  491. log.Print(logLevelName[level] + ": " + s)
  492. }
  493. // flushLog attempts to flush any pending logs to the appserver.
  494. // It should not be called concurrently.
  495. func (c *context) flushLog(force bool) (flushed bool) {
  496. c.pendingLogs.Lock()
  497. // Grab up to 30 MB. We can get away with up to 32 MB, but let's be cautious.
  498. n, rem := 0, 30<<20
  499. for ; n < len(c.pendingLogs.lines); n++ {
  500. ll := c.pendingLogs.lines[n]
  501. // Each log line will require about 3 bytes of overhead.
  502. nb := proto.Size(ll) + 3
  503. if nb > rem {
  504. break
  505. }
  506. rem -= nb
  507. }
  508. lines := c.pendingLogs.lines[:n]
  509. c.pendingLogs.lines = c.pendingLogs.lines[n:]
  510. c.pendingLogs.Unlock()
  511. if len(lines) == 0 && !force {
  512. // Nothing to flush.
  513. return false
  514. }
  515. rescueLogs := false
  516. defer func() {
  517. if rescueLogs {
  518. c.pendingLogs.Lock()
  519. c.pendingLogs.lines = append(lines, c.pendingLogs.lines...)
  520. c.pendingLogs.Unlock()
  521. }
  522. }()
  523. buf, err := proto.Marshal(&logpb.UserAppLogGroup{
  524. LogLine: lines,
  525. })
  526. if err != nil {
  527. log.Printf("internal.flushLog: marshaling UserAppLogGroup: %v", err)
  528. rescueLogs = true
  529. return false
  530. }
  531. req := &logpb.FlushRequest{
  532. Logs: buf,
  533. }
  534. res := &basepb.VoidProto{}
  535. c.pendingLogs.Lock()
  536. c.pendingLogs.flushes++
  537. c.pendingLogs.Unlock()
  538. if err := Call(toContext(c), "logservice", "Flush", req, res); err != nil {
  539. log.Printf("internal.flushLog: Flush RPC: %v", err)
  540. rescueLogs = true
  541. return false
  542. }
  543. return true
  544. }
  545. const (
  546. // Log flushing parameters.
  547. flushInterval = 1 * time.Second
  548. forceFlushInterval = 60 * time.Second
  549. )
  550. func (c *context) logFlusher(stop <-chan int) {
  551. lastFlush := time.Now()
  552. tick := time.NewTicker(flushInterval)
  553. for {
  554. select {
  555. case <-stop:
  556. // Request finished.
  557. tick.Stop()
  558. return
  559. case <-tick.C:
  560. force := time.Now().Sub(lastFlush) > forceFlushInterval
  561. if c.flushLog(force) {
  562. lastFlush = time.Now()
  563. }
  564. }
  565. }
  566. }
  567. func ContextForTesting(req *http.Request) netcontext.Context {
  568. return toContext(&context{req: req})
  569. }