123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468 |
- // Copyright 2015 Google Inc. All Rights Reserved.
- //
- // Licensed under the Apache License, Version 2.0 (the "License");
- // you may not use this file except in compliance with the License.
- // You may obtain a copy of the License at
- //
- // http://www.apache.org/licenses/LICENSE-2.0
- //
- // Unless required by applicable law or agreed to in writing, software
- // distributed under the License is distributed on an "AS IS" BASIS,
- // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- // See the License for the specific language governing permissions and
- // limitations under the License.
- // Package logging contains a Google Cloud Logging client.
- //
- // This package is experimental and subject to API changes.
- package logging // import "google.golang.org/cloud/logging"
- import (
- "errors"
- "io"
- "log"
- "sync"
- "time"
- "golang.org/x/net/context"
- api "google.golang.org/api/logging/v1beta3"
- "google.golang.org/cloud"
- "google.golang.org/cloud/internal/transport"
- )
- // Scope is the OAuth2 scope necessary to use Google Cloud Logging.
- const Scope = api.LoggingWriteScope
- // Level is the log level.
- type Level int
- const (
- // Default means no assigned severity level.
- Default Level = iota
- Debug
- Info
- Warning
- Error
- Critical
- Alert
- Emergency
- nLevel
- )
- var levelName = [nLevel]string{
- Default: "",
- Debug: "DEBUG",
- Info: "INFO",
- Warning: "WARNING",
- Error: "ERROR",
- Critical: "CRITICAL",
- Alert: "ALERT",
- Emergency: "EMERGENCY",
- }
- func (v Level) String() string {
- return levelName[v]
- }
- // Client is a Google Cloud Logging client.
- // It must be constructed via NewClient.
- type Client struct {
- svc *api.Service
- logs *api.ProjectsLogsEntriesService
- projID string
- logName string
- writer [nLevel]io.Writer
- logger [nLevel]*log.Logger
- mu sync.Mutex
- queued []*api.LogEntry
- curFlush *flushCall // currently in-flight flush
- flushTimer *time.Timer // nil before first use
- timerActive bool // whether flushTimer is armed
- inFlight int // number of log entries sent to API service but not yet ACKed
- // For testing:
- timeNow func() time.Time // optional
- // ServiceName may be "appengine.googleapis.com",
- // "compute.googleapis.com" or "custom.googleapis.com".
- //
- // The default is "custom.googleapis.com".
- //
- // The service name is only used by the API server to
- // determine which of the labels are used to index the logs.
- ServiceName string
- // CommonLabels are metadata labels that apply to all log
- // entries in this request, so that you don't have to repeat
- // them in each log entry's metadata.labels field. If any of
- // the log entries contains a (key, value) with the same key
- // that is in CommonLabels, then the entry's (key, value)
- // overrides the one in CommonLabels.
- CommonLabels map[string]string
- // BufferLimit is the maximum number of items to keep in memory
- // before flushing. Zero means automatic. A value of 1 means to
- // flush after each log entry.
- // The default is currently 10,000.
- BufferLimit int
- // FlushAfter optionally specifies a threshold count at which buffered
- // log entries are flushed, even if the BufferInterval has not yet
- // been reached.
- // The default is currently 10.
- FlushAfter int
- // BufferInterval is the maximum amount of time that an item
- // should remain buffered in memory before being flushed to
- // the logging service.
- // The default is currently 1 second.
- BufferInterval time.Duration
- // Overflow is a function which runs when the Log function
- // overflows its configured buffer limit. If nil, the log
- // entry is dropped. The return value from Overflow is
- // returned by Log.
- Overflow func(*Client, Entry) error
- }
- func (c *Client) flushAfter() int {
- if v := c.FlushAfter; v > 0 {
- return v
- }
- return 10
- }
- func (c *Client) bufferInterval() time.Duration {
- if v := c.BufferInterval; v > 0 {
- return v
- }
- return time.Second
- }
- func (c *Client) bufferLimit() int {
- if v := c.BufferLimit; v > 0 {
- return v
- }
- return 10000
- }
- func (c *Client) serviceName() string {
- if v := c.ServiceName; v != "" {
- return v
- }
- return "custom.googleapis.com"
- }
- func (c *Client) now() time.Time {
- if now := c.timeNow; now != nil {
- return now()
- }
- return time.Now()
- }
- // Writer returns an io.Writer for the provided log level.
- //
- // Each Write call on the returned Writer generates a log entry.
- //
- // This Writer accessor does not allocate, so callers do not need to
- // cache.
- func (c *Client) Writer(v Level) io.Writer { return c.writer[v] }
- // Logger returns a *log.Logger for the provided log level.
- //
- // A Logger for each Level is pre-allocated by NewClient with an empty
- // prefix and no flags. This Logger accessor does not allocate.
- // Callers wishing to use alternate flags (such as log.Lshortfile) may
- // mutate the returned Logger with SetFlags. Such mutations affect all
- // callers in the program.
- func (c *Client) Logger(v Level) *log.Logger { return c.logger[v] }
- type levelWriter struct {
- level Level
- c *Client
- }
- func (w levelWriter) Write(p []byte) (n int, err error) {
- return len(p), w.c.Log(Entry{
- Level: w.level,
- Payload: string(p),
- })
- }
- // Entry is a log entry.
- type Entry struct {
- // Time is the time of the entry. If the zero value, the current time is used.
- Time time.Time
- // Level is log entry's severity level.
- // The zero value means no assigned severity level.
- Level Level
- // Payload must be either a string, []byte, or something that
- // marshals via the encoding/json package to a JSON object
- // (and not any other type of JSON value).
- Payload interface{}
- // Labels optionally specifies key/value labels for the log entry.
- // Depending on the Client's ServiceName, these are indexed differently
- // by the Cloud Logging Service.
- // See https://cloud.google.com/logging/docs/logs_index
- // The Client.Log method takes ownership of this map.
- Labels map[string]string
- // TODO: de-duping id
- }
- func (c *Client) apiEntry(e Entry) (*api.LogEntry, error) {
- t := e.Time
- if t.IsZero() {
- t = c.now()
- }
- ent := &api.LogEntry{
- Metadata: &api.LogEntryMetadata{
- Timestamp: t.UTC().Format(time.RFC3339Nano),
- ServiceName: c.serviceName(),
- Severity: e.Level.String(),
- Labels: e.Labels,
- },
- }
- switch p := e.Payload.(type) {
- case string:
- ent.TextPayload = p
- case []byte:
- ent.TextPayload = string(p)
- default:
- ent.StructPayload = api.LogEntryStructPayload(p)
- }
- return ent, nil
- }
- // LogSync logs e synchronously without any buffering.
- // This is mostly intended for debugging or critical errors.
- func (c *Client) LogSync(e Entry) error {
- ent, err := c.apiEntry(e)
- if err != nil {
- return err
- }
- _, err = c.logs.Write(c.projID, c.logName, &api.WriteLogEntriesRequest{
- CommonLabels: c.CommonLabels,
- Entries: []*api.LogEntry{ent},
- }).Do()
- return err
- }
- var ErrOverflow = errors.New("logging: log entry overflowed buffer limits")
- // Log queues an entry to be sent to the logging service, subject to the
- // Client's parameters. By default, the log will be flushed within
- // one second.
- // Log only returns an error if the entry is invalid or the queue is at
- // capacity. If the queue is at capacity and the entry can't be added,
- // Log returns either ErrOverflow when c.Overflow is nil, or the
- // value returned by c.Overflow.
- func (c *Client) Log(e Entry) error {
- ent, err := c.apiEntry(e)
- if err != nil {
- return err
- }
- c.mu.Lock()
- buffered := len(c.queued) + c.inFlight
- if buffered >= c.bufferLimit() {
- c.mu.Unlock()
- if fn := c.Overflow; fn != nil {
- return fn(c, e)
- }
- return ErrOverflow
- }
- defer c.mu.Unlock()
- c.queued = append(c.queued, ent)
- if len(c.queued) >= c.flushAfter() {
- c.scheduleFlushLocked(0)
- return nil
- }
- c.scheduleFlushLocked(c.bufferInterval())
- return nil
- }
- // c.mu must be held.
- //
- // d will be one of two values: either c.BufferInterval (or its
- // default value) or 0.
- func (c *Client) scheduleFlushLocked(d time.Duration) {
- if c.inFlight > 0 {
- // For now to keep things simple, only allow one HTTP
- // request in flight at a time.
- return
- }
- switch {
- case c.flushTimer == nil:
- // First flush.
- c.timerActive = true
- c.flushTimer = time.AfterFunc(d, c.timeoutFlush)
- case c.timerActive && d == 0:
- // Make it happen sooner. For example, this is the
- // case of transitioning from a 1 second flush after
- // the 1st item to an immediate flush after the 10th
- // item.
- c.flushTimer.Reset(0)
- case !c.timerActive:
- c.timerActive = true
- c.flushTimer.Reset(d)
- default:
- // else timer was already active, also at d > 0,
- // so we don't touch it and let it fire as previously
- // scheduled.
- }
- }
- // timeoutFlush runs in its own goroutine (from time.AfterFunc) and
- // flushes c.queued.
- func (c *Client) timeoutFlush() {
- c.mu.Lock()
- c.timerActive = false
- c.mu.Unlock()
- if err := c.Flush(); err != nil {
- // schedule another try
- // TODO: smarter back-off?
- c.mu.Lock()
- c.scheduleFlushLocked(5 * time.Second)
- c.mu.Unlock()
- }
- }
- // Ping reports whether the client's connection to Google Cloud
- // Logging and the authentication configuration are valid.
- func (c *Client) Ping() error {
- _, err := c.logs.Write(c.projID, c.logName, &api.WriteLogEntriesRequest{
- Entries: []*api.LogEntry{},
- }).Do()
- return err
- }
- // Flush flushes any buffered log entries.
- func (c *Client) Flush() error {
- var numFlush int
- c.mu.Lock()
- for {
- // We're already flushing (or we just started flushing
- // ourselves), so wait for it to finish.
- if f := c.curFlush; f != nil {
- wasEmpty := len(c.queued) == 0
- c.mu.Unlock()
- <-f.donec // wait for it
- numFlush++
- // Terminate whenever there's an error, we've
- // already flushed twice (one that was already
- // in-flight when flush was called, and then
- // one we instigated), or the queue was empty
- // when we released the locked (meaning this
- // in-flight flush removes everything present
- // when Flush was called, and we don't need to
- // kick off a new flush for things arriving
- // afterward)
- if f.err != nil || numFlush == 2 || wasEmpty {
- return f.err
- }
- // Otherwise, re-obtain the lock and loop,
- // starting over with seeing if a flush is in
- // progress, which might've been started by a
- // different goroutine before aquiring this
- // lock again.
- c.mu.Lock()
- continue
- }
- // Terminal case:
- if len(c.queued) == 0 {
- c.mu.Unlock()
- return nil
- }
- c.startFlushLocked()
- }
- }
- // requires c.mu be held.
- func (c *Client) startFlushLocked() {
- if c.curFlush != nil {
- panic("internal error: flush already in flight")
- }
- if len(c.queued) == 0 {
- panic("internal error: no items queued")
- }
- logEntries := c.queued
- c.inFlight = len(logEntries)
- c.queued = nil
- flush := &flushCall{
- donec: make(chan struct{}),
- }
- c.curFlush = flush
- go func() {
- defer close(flush.donec)
- _, err := c.logs.Write(c.projID, c.logName, &api.WriteLogEntriesRequest{
- CommonLabels: c.CommonLabels,
- Entries: logEntries,
- }).Do()
- flush.err = err
- c.mu.Lock()
- defer c.mu.Unlock()
- c.inFlight = 0
- c.curFlush = nil
- if err != nil {
- c.queued = append(c.queued, logEntries...)
- } else if len(c.queued) > 0 {
- c.scheduleFlushLocked(c.bufferInterval())
- }
- }()
- }
- const prodAddr = "https://logging.googleapis.com/"
- const userAgent = "gcloud-golang-logging/20150922"
- // NewClient returns a new log client, logging to the named log in the
- // provided project.
- //
- // The exported fields on the returned client may be modified before
- // the client is used for logging. Once log entries are in flight,
- // the fields must not be modified.
- func NewClient(ctx context.Context, projectID, logName string, opts ...cloud.ClientOption) (*Client, error) {
- httpClient, endpoint, err := transport.NewHTTPClient(ctx, append([]cloud.ClientOption{
- cloud.WithEndpoint(prodAddr),
- cloud.WithScopes(api.CloudPlatformScope),
- cloud.WithUserAgent(userAgent),
- }, opts...)...)
- if err != nil {
- return nil, err
- }
- svc, err := api.New(httpClient)
- if err != nil {
- return nil, err
- }
- svc.BasePath = endpoint
- c := &Client{
- svc: svc,
- logs: api.NewProjectsLogsEntriesService(svc),
- logName: logName,
- projID: projectID,
- }
- for i := range c.writer {
- level := Level(i)
- c.writer[level] = levelWriter{level, c}
- c.logger[level] = log.New(c.writer[level], "", 0)
- }
- return c, nil
- }
- // flushCall is an in-flight or completed flush.
- type flushCall struct {
- donec chan struct{} // closed when response is in
- err error // error is valid after wg is Done
- }
|