read.go 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260
  1. // Copyright 2015 RedHat, Inc.
  2. // Copyright 2015 CoreOS, Inc.
  3. //
  4. // Licensed under the Apache License, Version 2.0 (the "License");
  5. // you may not use this file except in compliance with the License.
  6. // You may obtain a copy of the License at
  7. //
  8. // http://www.apache.org/licenses/LICENSE-2.0
  9. //
  10. // Unless required by applicable law or agreed to in writing, software
  11. // distributed under the License is distributed on an "AS IS" BASIS,
  12. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. // See the License for the specific language governing permissions and
  14. // limitations under the License.
  15. package sdjournal
  16. import (
  17. "errors"
  18. "fmt"
  19. "io"
  20. "log"
  21. "strings"
  22. "time"
  23. )
  24. var (
  25. ErrExpired = errors.New("Timeout expired")
  26. )
  27. // JournalReaderConfig represents options to drive the behavior of a JournalReader.
  28. type JournalReaderConfig struct {
  29. // The Since, NumFromTail and Cursor options are mutually exclusive and
  30. // determine where the reading begins within the journal. The order in which
  31. // options are written is exactly the order of precedence.
  32. Since time.Duration // start relative to a Duration from now
  33. NumFromTail uint64 // start relative to the tail
  34. Cursor string // start relative to the cursor
  35. // Show only journal entries whose fields match the supplied values. If
  36. // the array is empty, entries will not be filtered.
  37. Matches []Match
  38. // If not empty, the journal instance will point to a journal residing
  39. // in this directory. The supplied path may be relative or absolute.
  40. Path string
  41. }
  42. // JournalReader is an io.ReadCloser which provides a simple interface for iterating through the
  43. // systemd journal. A JournalReader is not safe for concurrent use by multiple goroutines.
  44. type JournalReader struct {
  45. journal *Journal
  46. msgReader *strings.Reader
  47. }
  48. // NewJournalReader creates a new JournalReader with configuration options that are similar to the
  49. // systemd journalctl tool's iteration and filtering features.
  50. func NewJournalReader(config JournalReaderConfig) (*JournalReader, error) {
  51. r := &JournalReader{}
  52. // Open the journal
  53. var err error
  54. if config.Path != "" {
  55. r.journal, err = NewJournalFromDir(config.Path)
  56. } else {
  57. r.journal, err = NewJournal()
  58. }
  59. if err != nil {
  60. return nil, err
  61. }
  62. // Add any supplied matches
  63. for _, m := range config.Matches {
  64. r.journal.AddMatch(m.String())
  65. }
  66. // Set the start position based on options
  67. if config.Since != 0 {
  68. // Start based on a relative time
  69. start := time.Now().Add(config.Since)
  70. if err := r.journal.SeekRealtimeUsec(uint64(start.UnixNano() / 1000)); err != nil {
  71. return nil, err
  72. }
  73. } else if config.NumFromTail != 0 {
  74. // Start based on a number of lines before the tail
  75. if err := r.journal.SeekTail(); err != nil {
  76. return nil, err
  77. }
  78. // Move the read pointer into position near the tail. Go one further than
  79. // the option so that the initial cursor advancement positions us at the
  80. // correct starting point.
  81. skip, err := r.journal.PreviousSkip(config.NumFromTail + 1)
  82. if err != nil {
  83. return nil, err
  84. }
  85. // If we skipped fewer lines than expected, we have reached journal start.
  86. // Thus, we seek to head so that next invocation can read the first line.
  87. if skip != config.NumFromTail+1 {
  88. if err := r.journal.SeekHead(); err != nil {
  89. return nil, err
  90. }
  91. }
  92. } else if config.Cursor != "" {
  93. // Start based on a custom cursor
  94. if err := r.journal.SeekCursor(config.Cursor); err != nil {
  95. return nil, err
  96. }
  97. }
  98. return r, nil
  99. }
  100. // Read reads entries from the journal. Read follows the Reader interface so
  101. // it must be able to read a specific amount of bytes. Journald on the other
  102. // hand only allows us to read full entries of arbitrary size (without byte
  103. // granularity). JournalReader is therefore internally buffering entries that
  104. // don't fit in the read buffer. Callers should keep calling until 0 and/or an
  105. // error is returned.
  106. func (r *JournalReader) Read(b []byte) (int, error) {
  107. var err error
  108. if r.msgReader == nil {
  109. var c uint64
  110. // Advance the journal cursor. It has to be called at least one time
  111. // before reading
  112. c, err = r.journal.Next()
  113. // An unexpected error
  114. if err != nil {
  115. return 0, err
  116. }
  117. // EOF detection
  118. if c == 0 {
  119. return 0, io.EOF
  120. }
  121. // Build a message
  122. var msg string
  123. msg, err = r.buildMessage()
  124. if err != nil {
  125. return 0, err
  126. }
  127. r.msgReader = strings.NewReader(msg)
  128. }
  129. // Copy and return the message
  130. var sz int
  131. sz, err = r.msgReader.Read(b)
  132. if err == io.EOF {
  133. // The current entry has been fully read. Don't propagate this
  134. // EOF, so the next entry can be read at the next Read()
  135. // iteration.
  136. r.msgReader = nil
  137. return sz, nil
  138. }
  139. if err != nil {
  140. return sz, err
  141. }
  142. if r.msgReader.Len() == 0 {
  143. r.msgReader = nil
  144. }
  145. return sz, nil
  146. }
  147. // Close closes the JournalReader's handle to the journal.
  148. func (r *JournalReader) Close() error {
  149. return r.journal.Close()
  150. }
  151. // Rewind attempts to rewind the JournalReader to the first entry.
  152. func (r *JournalReader) Rewind() error {
  153. r.msgReader = nil
  154. return r.journal.SeekHead()
  155. }
  156. // Follow synchronously follows the JournalReader, writing each new journal entry to writer. The
  157. // follow will continue until a single time.Time is received on the until channel.
  158. func (r *JournalReader) Follow(until <-chan time.Time, writer io.Writer) (err error) {
  159. // Process journal entries and events. Entries are flushed until the tail or
  160. // timeout is reached, and then we wait for new events or the timeout.
  161. var msg = make([]byte, 64*1<<(10))
  162. process:
  163. for {
  164. c, err := r.Read(msg)
  165. if err != nil && err != io.EOF {
  166. break process
  167. }
  168. select {
  169. case <-until:
  170. return ErrExpired
  171. default:
  172. if c > 0 {
  173. if _, err = writer.Write(msg[:c]); err != nil {
  174. break process
  175. }
  176. continue process
  177. }
  178. }
  179. // We're at the tail, so wait for new events or time out.
  180. // Holds journal events to process. Tightly bounded for now unless there's a
  181. // reason to unblock the journal watch routine more quickly.
  182. events := make(chan int, 1)
  183. pollDone := make(chan bool, 1)
  184. go func() {
  185. for {
  186. select {
  187. case <-pollDone:
  188. return
  189. default:
  190. events <- r.journal.Wait(time.Duration(1) * time.Second)
  191. }
  192. }
  193. }()
  194. select {
  195. case <-until:
  196. pollDone <- true
  197. return ErrExpired
  198. case e := <-events:
  199. pollDone <- true
  200. switch e {
  201. case SD_JOURNAL_NOP, SD_JOURNAL_APPEND, SD_JOURNAL_INVALIDATE:
  202. // TODO: need to account for any of these?
  203. default:
  204. log.Printf("Received unknown event: %d\n", e)
  205. }
  206. continue process
  207. }
  208. }
  209. return
  210. }
  211. // buildMessage returns a string representing the current journal entry in a simple format which
  212. // includes the entry timestamp and MESSAGE field.
  213. func (r *JournalReader) buildMessage() (string, error) {
  214. var msg string
  215. var usec uint64
  216. var err error
  217. if msg, err = r.journal.GetData("MESSAGE"); err != nil {
  218. return "", err
  219. }
  220. if usec, err = r.journal.GetRealtimeUsec(); err != nil {
  221. return "", err
  222. }
  223. timestamp := time.Unix(0, int64(usec)*int64(time.Microsecond))
  224. return fmt.Sprintf("%s %s\n", timestamp, msg), nil
  225. }