logging.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468
  1. // Copyright 2015 Google Inc. All Rights Reserved.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. // Package logging contains a Google Cloud Logging client.
  15. //
  16. // This package is experimental and subject to API changes.
  17. package logging // import "google.golang.org/cloud/logging"
  18. import (
  19. "errors"
  20. "io"
  21. "log"
  22. "sync"
  23. "time"
  24. "golang.org/x/net/context"
  25. api "google.golang.org/api/logging/v1beta3"
  26. "google.golang.org/cloud"
  27. "google.golang.org/cloud/internal/transport"
  28. )
  29. // Scope is the OAuth2 scope necessary to use Google Cloud Logging.
  30. const Scope = api.LoggingWriteScope
  31. // Level is the log level.
  32. type Level int
  33. const (
  34. // Default means no assigned severity level.
  35. Default Level = iota
  36. Debug
  37. Info
  38. Warning
  39. Error
  40. Critical
  41. Alert
  42. Emergency
  43. nLevel
  44. )
  45. var levelName = [nLevel]string{
  46. Default: "",
  47. Debug: "DEBUG",
  48. Info: "INFO",
  49. Warning: "WARNING",
  50. Error: "ERROR",
  51. Critical: "CRITICAL",
  52. Alert: "ALERT",
  53. Emergency: "EMERGENCY",
  54. }
  55. func (v Level) String() string {
  56. return levelName[v]
  57. }
  58. // Client is a Google Cloud Logging client.
  59. // It must be constructed via NewClient.
  60. type Client struct {
  61. svc *api.Service
  62. logs *api.ProjectsLogsEntriesService
  63. projID string
  64. logName string
  65. writer [nLevel]io.Writer
  66. logger [nLevel]*log.Logger
  67. mu sync.Mutex
  68. queued []*api.LogEntry
  69. curFlush *flushCall // currently in-flight flush
  70. flushTimer *time.Timer // nil before first use
  71. timerActive bool // whether flushTimer is armed
  72. inFlight int // number of log entries sent to API service but not yet ACKed
  73. // For testing:
  74. timeNow func() time.Time // optional
  75. // ServiceName may be "appengine.googleapis.com",
  76. // "compute.googleapis.com" or "custom.googleapis.com".
  77. //
  78. // The default is "custom.googleapis.com".
  79. //
  80. // The service name is only used by the API server to
  81. // determine which of the labels are used to index the logs.
  82. ServiceName string
  83. // CommonLabels are metadata labels that apply to all log
  84. // entries in this request, so that you don't have to repeat
  85. // them in each log entry's metadata.labels field. If any of
  86. // the log entries contains a (key, value) with the same key
  87. // that is in CommonLabels, then the entry's (key, value)
  88. // overrides the one in CommonLabels.
  89. CommonLabels map[string]string
  90. // BufferLimit is the maximum number of items to keep in memory
  91. // before flushing. Zero means automatic. A value of 1 means to
  92. // flush after each log entry.
  93. // The default is currently 10,000.
  94. BufferLimit int
  95. // FlushAfter optionally specifies a threshold count at which buffered
  96. // log entries are flushed, even if the BufferInterval has not yet
  97. // been reached.
  98. // The default is currently 10.
  99. FlushAfter int
  100. // BufferInterval is the maximum amount of time that an item
  101. // should remain buffered in memory before being flushed to
  102. // the logging service.
  103. // The default is currently 1 second.
  104. BufferInterval time.Duration
  105. // Overflow is a function which runs when the Log function
  106. // overflows its configured buffer limit. If nil, the log
  107. // entry is dropped. The return value from Overflow is
  108. // returned by Log.
  109. Overflow func(*Client, Entry) error
  110. }
  111. func (c *Client) flushAfter() int {
  112. if v := c.FlushAfter; v > 0 {
  113. return v
  114. }
  115. return 10
  116. }
  117. func (c *Client) bufferInterval() time.Duration {
  118. if v := c.BufferInterval; v > 0 {
  119. return v
  120. }
  121. return time.Second
  122. }
  123. func (c *Client) bufferLimit() int {
  124. if v := c.BufferLimit; v > 0 {
  125. return v
  126. }
  127. return 10000
  128. }
  129. func (c *Client) serviceName() string {
  130. if v := c.ServiceName; v != "" {
  131. return v
  132. }
  133. return "custom.googleapis.com"
  134. }
  135. func (c *Client) now() time.Time {
  136. if now := c.timeNow; now != nil {
  137. return now()
  138. }
  139. return time.Now()
  140. }
  141. // Writer returns an io.Writer for the provided log level.
  142. //
  143. // Each Write call on the returned Writer generates a log entry.
  144. //
  145. // This Writer accessor does not allocate, so callers do not need to
  146. // cache.
  147. func (c *Client) Writer(v Level) io.Writer { return c.writer[v] }
  148. // Logger returns a *log.Logger for the provided log level.
  149. //
  150. // A Logger for each Level is pre-allocated by NewClient with an empty
  151. // prefix and no flags. This Logger accessor does not allocate.
  152. // Callers wishing to use alternate flags (such as log.Lshortfile) may
  153. // mutate the returned Logger with SetFlags. Such mutations affect all
  154. // callers in the program.
  155. func (c *Client) Logger(v Level) *log.Logger { return c.logger[v] }
  156. type levelWriter struct {
  157. level Level
  158. c *Client
  159. }
  160. func (w levelWriter) Write(p []byte) (n int, err error) {
  161. return len(p), w.c.Log(Entry{
  162. Level: w.level,
  163. Payload: string(p),
  164. })
  165. }
  166. // Entry is a log entry.
  167. type Entry struct {
  168. // Time is the time of the entry. If the zero value, the current time is used.
  169. Time time.Time
  170. // Level is log entry's severity level.
  171. // The zero value means no assigned severity level.
  172. Level Level
  173. // Payload must be either a string, []byte, or something that
  174. // marshals via the encoding/json package to a JSON object
  175. // (and not any other type of JSON value).
  176. Payload interface{}
  177. // Labels optionally specifies key/value labels for the log entry.
  178. // Depending on the Client's ServiceName, these are indexed differently
  179. // by the Cloud Logging Service.
  180. // See https://cloud.google.com/logging/docs/logs_index
  181. // The Client.Log method takes ownership of this map.
  182. Labels map[string]string
  183. // TODO: de-duping id
  184. }
  185. func (c *Client) apiEntry(e Entry) (*api.LogEntry, error) {
  186. t := e.Time
  187. if t.IsZero() {
  188. t = c.now()
  189. }
  190. ent := &api.LogEntry{
  191. Metadata: &api.LogEntryMetadata{
  192. Timestamp: t.UTC().Format(time.RFC3339Nano),
  193. ServiceName: c.serviceName(),
  194. Severity: e.Level.String(),
  195. Labels: e.Labels,
  196. },
  197. }
  198. switch p := e.Payload.(type) {
  199. case string:
  200. ent.TextPayload = p
  201. case []byte:
  202. ent.TextPayload = string(p)
  203. default:
  204. ent.StructPayload = api.LogEntryStructPayload(p)
  205. }
  206. return ent, nil
  207. }
  208. // LogSync logs e synchronously without any buffering.
  209. // This is mostly intended for debugging or critical errors.
  210. func (c *Client) LogSync(e Entry) error {
  211. ent, err := c.apiEntry(e)
  212. if err != nil {
  213. return err
  214. }
  215. _, err = c.logs.Write(c.projID, c.logName, &api.WriteLogEntriesRequest{
  216. CommonLabels: c.CommonLabels,
  217. Entries: []*api.LogEntry{ent},
  218. }).Do()
  219. return err
  220. }
  221. var ErrOverflow = errors.New("logging: log entry overflowed buffer limits")
  222. // Log queues an entry to be sent to the logging service, subject to the
  223. // Client's parameters. By default, the log will be flushed within
  224. // one second.
  225. // Log only returns an error if the entry is invalid or the queue is at
  226. // capacity. If the queue is at capacity and the entry can't be added,
  227. // Log returns either ErrOverflow when c.Overflow is nil, or the
  228. // value returned by c.Overflow.
  229. func (c *Client) Log(e Entry) error {
  230. ent, err := c.apiEntry(e)
  231. if err != nil {
  232. return err
  233. }
  234. c.mu.Lock()
  235. buffered := len(c.queued) + c.inFlight
  236. if buffered >= c.bufferLimit() {
  237. c.mu.Unlock()
  238. if fn := c.Overflow; fn != nil {
  239. return fn(c, e)
  240. }
  241. return ErrOverflow
  242. }
  243. defer c.mu.Unlock()
  244. c.queued = append(c.queued, ent)
  245. if len(c.queued) >= c.flushAfter() {
  246. c.scheduleFlushLocked(0)
  247. return nil
  248. }
  249. c.scheduleFlushLocked(c.bufferInterval())
  250. return nil
  251. }
  252. // c.mu must be held.
  253. //
  254. // d will be one of two values: either c.BufferInterval (or its
  255. // default value) or 0.
  256. func (c *Client) scheduleFlushLocked(d time.Duration) {
  257. if c.inFlight > 0 {
  258. // For now to keep things simple, only allow one HTTP
  259. // request in flight at a time.
  260. return
  261. }
  262. switch {
  263. case c.flushTimer == nil:
  264. // First flush.
  265. c.timerActive = true
  266. c.flushTimer = time.AfterFunc(d, c.timeoutFlush)
  267. case c.timerActive && d == 0:
  268. // Make it happen sooner. For example, this is the
  269. // case of transitioning from a 1 second flush after
  270. // the 1st item to an immediate flush after the 10th
  271. // item.
  272. c.flushTimer.Reset(0)
  273. case !c.timerActive:
  274. c.timerActive = true
  275. c.flushTimer.Reset(d)
  276. default:
  277. // else timer was already active, also at d > 0,
  278. // so we don't touch it and let it fire as previously
  279. // scheduled.
  280. }
  281. }
  282. // timeoutFlush runs in its own goroutine (from time.AfterFunc) and
  283. // flushes c.queued.
  284. func (c *Client) timeoutFlush() {
  285. c.mu.Lock()
  286. c.timerActive = false
  287. c.mu.Unlock()
  288. if err := c.Flush(); err != nil {
  289. // schedule another try
  290. // TODO: smarter back-off?
  291. c.mu.Lock()
  292. c.scheduleFlushLocked(5 * time.Second)
  293. c.mu.Unlock()
  294. }
  295. }
  296. // Ping reports whether the client's connection to Google Cloud
  297. // Logging and the authentication configuration are valid.
  298. func (c *Client) Ping() error {
  299. _, err := c.logs.Write(c.projID, c.logName, &api.WriteLogEntriesRequest{
  300. Entries: []*api.LogEntry{},
  301. }).Do()
  302. return err
  303. }
  304. // Flush flushes any buffered log entries.
  305. func (c *Client) Flush() error {
  306. var numFlush int
  307. c.mu.Lock()
  308. for {
  309. // We're already flushing (or we just started flushing
  310. // ourselves), so wait for it to finish.
  311. if f := c.curFlush; f != nil {
  312. wasEmpty := len(c.queued) == 0
  313. c.mu.Unlock()
  314. <-f.donec // wait for it
  315. numFlush++
  316. // Terminate whenever there's an error, we've
  317. // already flushed twice (one that was already
  318. // in-flight when flush was called, and then
  319. // one we instigated), or the queue was empty
  320. // when we released the locked (meaning this
  321. // in-flight flush removes everything present
  322. // when Flush was called, and we don't need to
  323. // kick off a new flush for things arriving
  324. // afterward)
  325. if f.err != nil || numFlush == 2 || wasEmpty {
  326. return f.err
  327. }
  328. // Otherwise, re-obtain the lock and loop,
  329. // starting over with seeing if a flush is in
  330. // progress, which might've been started by a
  331. // different goroutine before aquiring this
  332. // lock again.
  333. c.mu.Lock()
  334. continue
  335. }
  336. // Terminal case:
  337. if len(c.queued) == 0 {
  338. c.mu.Unlock()
  339. return nil
  340. }
  341. c.startFlushLocked()
  342. }
  343. }
  344. // requires c.mu be held.
  345. func (c *Client) startFlushLocked() {
  346. if c.curFlush != nil {
  347. panic("internal error: flush already in flight")
  348. }
  349. if len(c.queued) == 0 {
  350. panic("internal error: no items queued")
  351. }
  352. logEntries := c.queued
  353. c.inFlight = len(logEntries)
  354. c.queued = nil
  355. flush := &flushCall{
  356. donec: make(chan struct{}),
  357. }
  358. c.curFlush = flush
  359. go func() {
  360. defer close(flush.donec)
  361. _, err := c.logs.Write(c.projID, c.logName, &api.WriteLogEntriesRequest{
  362. CommonLabels: c.CommonLabels,
  363. Entries: logEntries,
  364. }).Do()
  365. flush.err = err
  366. c.mu.Lock()
  367. defer c.mu.Unlock()
  368. c.inFlight = 0
  369. c.curFlush = nil
  370. if err != nil {
  371. c.queued = append(c.queued, logEntries...)
  372. } else if len(c.queued) > 0 {
  373. c.scheduleFlushLocked(c.bufferInterval())
  374. }
  375. }()
  376. }
  377. const prodAddr = "https://logging.googleapis.com/"
  378. const userAgent = "gcloud-golang-logging/20150922"
  379. // NewClient returns a new log client, logging to the named log in the
  380. // provided project.
  381. //
  382. // The exported fields on the returned client may be modified before
  383. // the client is used for logging. Once log entries are in flight,
  384. // the fields must not be modified.
  385. func NewClient(ctx context.Context, projectID, logName string, opts ...cloud.ClientOption) (*Client, error) {
  386. httpClient, endpoint, err := transport.NewHTTPClient(ctx, append([]cloud.ClientOption{
  387. cloud.WithEndpoint(prodAddr),
  388. cloud.WithScopes(api.CloudPlatformScope),
  389. cloud.WithUserAgent(userAgent),
  390. }, opts...)...)
  391. if err != nil {
  392. return nil, err
  393. }
  394. svc, err := api.New(httpClient)
  395. if err != nil {
  396. return nil, err
  397. }
  398. svc.BasePath = endpoint
  399. c := &Client{
  400. svc: svc,
  401. logs: api.NewProjectsLogsEntriesService(svc),
  402. logName: logName,
  403. projID: projectID,
  404. }
  405. for i := range c.writer {
  406. level := Level(i)
  407. c.writer[level] = levelWriter{level, c}
  408. c.logger[level] = log.New(c.writer[level], "", 0)
  409. }
  410. return c, nil
  411. }
  412. // flushCall is an in-flight or completed flush.
  413. type flushCall struct {
  414. donec chan struct{} // closed when response is in
  415. err error // error is valid after wg is Done
  416. }