subscription.go 7.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251
  1. /*
  2. Copyright 2013 CoreOS Inc.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package dbus
  14. import (
  15. "errors"
  16. "time"
  17. "github.com/godbus/dbus"
  18. )
  19. const (
  20. cleanIgnoreInterval = int64(10 * time.Second)
  21. ignoreInterval = int64(30 * time.Millisecond)
  22. )
  23. // Subscribe sets up this connection to subscribe to all systemd dbus events.
  24. // This is required before calling SubscribeUnits. When the connection closes
  25. // systemd will automatically stop sending signals so there is no need to
  26. // explicitly call Unsubscribe().
  27. func (c *Conn) Subscribe() error {
  28. c.sysconn.BusObject().Call("org.freedesktop.DBus.AddMatch", 0,
  29. "type='signal',interface='org.freedesktop.systemd1.Manager',member='UnitNew'")
  30. c.sysconn.BusObject().Call("org.freedesktop.DBus.AddMatch", 0,
  31. "type='signal',interface='org.freedesktop.DBus.Properties',member='PropertiesChanged'")
  32. err := c.sysobj.Call("org.freedesktop.systemd1.Manager.Subscribe", 0).Store()
  33. if err != nil {
  34. return err
  35. }
  36. return nil
  37. }
  38. // Unsubscribe this connection from systemd dbus events.
  39. func (c *Conn) Unsubscribe() error {
  40. err := c.sysobj.Call("org.freedesktop.systemd1.Manager.Unsubscribe", 0).Store()
  41. if err != nil {
  42. return err
  43. }
  44. return nil
  45. }
  46. func (c *Conn) initSubscription() {
  47. c.subscriber.ignore = make(map[dbus.ObjectPath]int64)
  48. }
  49. func (c *Conn) initDispatch() {
  50. ch := make(chan *dbus.Signal, signalBuffer)
  51. c.sysconn.Signal(ch)
  52. go func() {
  53. for {
  54. signal, ok := <-ch
  55. if !ok {
  56. return
  57. }
  58. switch signal.Name {
  59. case "org.freedesktop.systemd1.Manager.JobRemoved":
  60. c.jobComplete(signal)
  61. unitName := signal.Body[2].(string)
  62. var unitPath dbus.ObjectPath
  63. c.sysobj.Call("org.freedesktop.systemd1.Manager.GetUnit", 0, unitName).Store(&unitPath)
  64. if unitPath != dbus.ObjectPath("") {
  65. c.sendSubStateUpdate(unitPath)
  66. }
  67. case "org.freedesktop.systemd1.Manager.UnitNew":
  68. c.sendSubStateUpdate(signal.Body[1].(dbus.ObjectPath))
  69. case "org.freedesktop.DBus.Properties.PropertiesChanged":
  70. if signal.Body[0].(string) == "org.freedesktop.systemd1.Unit" {
  71. // we only care about SubState updates, which are a Unit property
  72. c.sendSubStateUpdate(signal.Path)
  73. }
  74. }
  75. }
  76. }()
  77. }
  78. // Returns two unbuffered channels which will receive all changed units every
  79. // interval. Deleted units are sent as nil.
  80. func (c *Conn) SubscribeUnits(interval time.Duration) (<-chan map[string]*UnitStatus, <-chan error) {
  81. return c.SubscribeUnitsCustom(interval, 0, func(u1, u2 *UnitStatus) bool { return *u1 != *u2 }, nil)
  82. }
  83. // SubscribeUnitsCustom is like SubscribeUnits but lets you specify the buffer
  84. // size of the channels, the comparison function for detecting changes and a filter
  85. // function for cutting down on the noise that your channel receives.
  86. func (c *Conn) SubscribeUnitsCustom(interval time.Duration, buffer int, isChanged func(*UnitStatus, *UnitStatus) bool, filterUnit func (string) bool) (<-chan map[string]*UnitStatus, <-chan error) {
  87. old := make(map[string]*UnitStatus)
  88. statusChan := make(chan map[string]*UnitStatus, buffer)
  89. errChan := make(chan error, buffer)
  90. go func() {
  91. for {
  92. timerChan := time.After(interval)
  93. units, err := c.ListUnits()
  94. if err == nil {
  95. cur := make(map[string]*UnitStatus)
  96. for i := range units {
  97. if filterUnit != nil && filterUnit(units[i].Name) {
  98. continue
  99. }
  100. cur[units[i].Name] = &units[i]
  101. }
  102. // add all new or changed units
  103. changed := make(map[string]*UnitStatus)
  104. for n, u := range cur {
  105. if oldU, ok := old[n]; !ok || isChanged(oldU, u) {
  106. changed[n] = u
  107. }
  108. delete(old, n)
  109. }
  110. // add all deleted units
  111. for oldN := range old {
  112. changed[oldN] = nil
  113. }
  114. old = cur
  115. if len(changed) != 0 {
  116. statusChan <- changed
  117. }
  118. } else {
  119. errChan <- err
  120. }
  121. <-timerChan
  122. }
  123. }()
  124. return statusChan, errChan
  125. }
  126. type SubStateUpdate struct {
  127. UnitName string
  128. SubState string
  129. }
  130. // SetSubStateSubscriber writes to updateCh when any unit's substate changes.
  131. // Although this writes to updateCh on every state change, the reported state
  132. // may be more recent than the change that generated it (due to an unavoidable
  133. // race in the systemd dbus interface). That is, this method provides a good
  134. // way to keep a current view of all units' states, but is not guaranteed to
  135. // show every state transition they go through. Furthermore, state changes
  136. // will only be written to the channel with non-blocking writes. If updateCh
  137. // is full, it attempts to write an error to errCh; if errCh is full, the error
  138. // passes silently.
  139. func (c *Conn) SetSubStateSubscriber(updateCh chan<- *SubStateUpdate, errCh chan<- error) {
  140. c.subscriber.Lock()
  141. defer c.subscriber.Unlock()
  142. c.subscriber.updateCh = updateCh
  143. c.subscriber.errCh = errCh
  144. }
  145. func (c *Conn) sendSubStateUpdate(path dbus.ObjectPath) {
  146. c.subscriber.Lock()
  147. defer c.subscriber.Unlock()
  148. if c.subscriber.updateCh == nil {
  149. return
  150. }
  151. if c.shouldIgnore(path) {
  152. return
  153. }
  154. info, err := c.GetUnitProperties(string(path))
  155. if err != nil {
  156. select {
  157. case c.subscriber.errCh <- err:
  158. default:
  159. }
  160. }
  161. name := info["Id"].(string)
  162. substate := info["SubState"].(string)
  163. update := &SubStateUpdate{name, substate}
  164. select {
  165. case c.subscriber.updateCh <- update:
  166. default:
  167. select {
  168. case c.subscriber.errCh <- errors.New("update channel full!"):
  169. default:
  170. }
  171. }
  172. c.updateIgnore(path, info)
  173. }
  174. // The ignore functions work around a wart in the systemd dbus interface.
  175. // Requesting the properties of an unloaded unit will cause systemd to send a
  176. // pair of UnitNew/UnitRemoved signals. Because we need to get a unit's
  177. // properties on UnitNew (as that's the only indication of a new unit coming up
  178. // for the first time), we would enter an infinite loop if we did not attempt
  179. // to detect and ignore these spurious signals. The signal themselves are
  180. // indistinguishable from relevant ones, so we (somewhat hackishly) ignore an
  181. // unloaded unit's signals for a short time after requesting its properties.
  182. // This means that we will miss e.g. a transient unit being restarted
  183. // *immediately* upon failure and also a transient unit being started
  184. // immediately after requesting its status (with systemctl status, for example,
  185. // because this causes a UnitNew signal to be sent which then causes us to fetch
  186. // the properties).
  187. func (c *Conn) shouldIgnore(path dbus.ObjectPath) bool {
  188. t, ok := c.subscriber.ignore[path]
  189. return ok && t >= time.Now().UnixNano()
  190. }
  191. func (c *Conn) updateIgnore(path dbus.ObjectPath, info map[string]interface{}) {
  192. c.cleanIgnore()
  193. // unit is unloaded - it will trigger bad systemd dbus behavior
  194. if info["LoadState"].(string) == "not-found" {
  195. c.subscriber.ignore[path] = time.Now().UnixNano() + ignoreInterval
  196. }
  197. }
  198. // without this, ignore would grow unboundedly over time
  199. func (c *Conn) cleanIgnore() {
  200. now := time.Now().UnixNano()
  201. if c.subscriber.cleanIgnore < now {
  202. c.subscriber.cleanIgnore = now + cleanIgnoreInterval
  203. for p, t := range c.subscriber.ignore {
  204. if t < now {
  205. delete(c.subscriber.ignore, p)
  206. }
  207. }
  208. }
  209. }