linux.go 8.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322
  1. // +build linux
  2. /*
  3. Copyright 2015 The Kubernetes Authors.
  4. Licensed under the Apache License, Version 2.0 (the "License");
  5. you may not use this file except in compliance with the License.
  6. You may obtain a copy of the License at
  7. http://www.apache.org/licenses/LICENSE-2.0
  8. Unless required by applicable law or agreed to in writing, software
  9. distributed under the License is distributed on an "AS IS" BASIS,
  10. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  11. See the License for the specific language governing permissions and
  12. limitations under the License.
  13. */
  14. package bandwidth
  15. import (
  16. "bufio"
  17. "bytes"
  18. "encoding/hex"
  19. "fmt"
  20. "net"
  21. "strings"
  22. "k8s.io/kubernetes/pkg/api/resource"
  23. "k8s.io/kubernetes/pkg/util/exec"
  24. "k8s.io/kubernetes/pkg/util/sets"
  25. "github.com/golang/glog"
  26. )
  27. // tcShaper provides an implementation of the BandwidthShaper interface on Linux using the 'tc' tool.
  28. // In general, using this requires that the caller posses the NET_CAP_ADMIN capability, though if you
  29. // do this within an container, it only requires the NS_CAPABLE capability for manipulations to that
  30. // container's network namespace.
  31. // Uses the hierarchical token bucket queuing discipline (htb), this requires Linux 2.4.20 or newer
  32. // or a custom kernel with that queuing discipline backported.
  33. type tcShaper struct {
  34. e exec.Interface
  35. iface string
  36. }
  37. func NewTCShaper(iface string) BandwidthShaper {
  38. shaper := &tcShaper{
  39. e: exec.New(),
  40. iface: iface,
  41. }
  42. return shaper
  43. }
  44. func (t *tcShaper) execAndLog(cmdStr string, args ...string) error {
  45. glog.V(6).Infof("Running: %s %s", cmdStr, strings.Join(args, " "))
  46. cmd := t.e.Command(cmdStr, args...)
  47. out, err := cmd.CombinedOutput()
  48. glog.V(6).Infof("Output from tc: %s", string(out))
  49. return err
  50. }
  51. func (t *tcShaper) nextClassID() (int, error) {
  52. data, err := t.e.Command("tc", "class", "show", "dev", t.iface).CombinedOutput()
  53. if err != nil {
  54. return -1, err
  55. }
  56. scanner := bufio.NewScanner(bytes.NewBuffer(data))
  57. classes := sets.String{}
  58. for scanner.Scan() {
  59. line := strings.TrimSpace(scanner.Text())
  60. // skip empty lines
  61. if len(line) == 0 {
  62. continue
  63. }
  64. parts := strings.Split(line, " ")
  65. // expected tc line:
  66. // class htb 1:1 root prio 0 rate 1000Kbit ceil 1000Kbit burst 1600b cburst 1600b
  67. if len(parts) != 14 {
  68. return -1, fmt.Errorf("unexpected output from tc: %s (%v)", scanner.Text(), parts)
  69. }
  70. classes.Insert(parts[2])
  71. }
  72. // Make sure it doesn't go forever
  73. for nextClass := 1; nextClass < 10000; nextClass++ {
  74. if !classes.Has(fmt.Sprintf("1:%d", nextClass)) {
  75. return nextClass, nil
  76. }
  77. }
  78. // This should really never happen
  79. return -1, fmt.Errorf("exhausted class space, please try again")
  80. }
  81. // Convert a CIDR from text to a hex representation
  82. // Strips any masked parts of the IP, so 1.2.3.4/16 becomes hex(1.2.0.0)/ffffffff
  83. func hexCIDR(cidr string) (string, error) {
  84. ip, ipnet, err := net.ParseCIDR(cidr)
  85. if err != nil {
  86. return "", err
  87. }
  88. ip = ip.Mask(ipnet.Mask)
  89. hexIP := hex.EncodeToString([]byte(ip.To4()))
  90. hexMask := ipnet.Mask.String()
  91. return hexIP + "/" + hexMask, nil
  92. }
  93. // Convert a CIDR from hex representation to text, opposite of the above.
  94. func asciiCIDR(cidr string) (string, error) {
  95. parts := strings.Split(cidr, "/")
  96. if len(parts) != 2 {
  97. return "", fmt.Errorf("unexpected CIDR format: %s", cidr)
  98. }
  99. ipData, err := hex.DecodeString(parts[0])
  100. if err != nil {
  101. return "", err
  102. }
  103. ip := net.IP(ipData)
  104. maskData, err := hex.DecodeString(parts[1])
  105. mask := net.IPMask(maskData)
  106. size, _ := mask.Size()
  107. return fmt.Sprintf("%s/%d", ip.String(), size), nil
  108. }
  109. func (t *tcShaper) findCIDRClass(cidr string) (class, handle string, found bool, err error) {
  110. data, err := t.e.Command("tc", "filter", "show", "dev", t.iface).CombinedOutput()
  111. if err != nil {
  112. return "", "", false, err
  113. }
  114. hex, err := hexCIDR(cidr)
  115. if err != nil {
  116. return "", "", false, err
  117. }
  118. spec := fmt.Sprintf("match %s", hex)
  119. scanner := bufio.NewScanner(bytes.NewBuffer(data))
  120. filter := ""
  121. for scanner.Scan() {
  122. line := strings.TrimSpace(scanner.Text())
  123. if len(line) == 0 {
  124. continue
  125. }
  126. if strings.HasPrefix(line, "filter") {
  127. filter = line
  128. continue
  129. }
  130. if strings.Contains(line, spec) {
  131. parts := strings.Split(filter, " ")
  132. // expected tc line:
  133. // filter parent 1: protocol ip pref 1 u32 fh 800::800 order 2048 key ht 800 bkt 0 flowid 1:1
  134. if len(parts) != 19 {
  135. return "", "", false, fmt.Errorf("unexpected output from tc: %s %d (%v)", filter, len(parts), parts)
  136. }
  137. return parts[18], parts[9], true, nil
  138. }
  139. }
  140. return "", "", false, nil
  141. }
  142. func makeKBitString(rsrc *resource.Quantity) string {
  143. return fmt.Sprintf("%dkbit", (rsrc.Value() / 1000))
  144. }
  145. func (t *tcShaper) makeNewClass(rate string) (int, error) {
  146. class, err := t.nextClassID()
  147. if err != nil {
  148. return -1, err
  149. }
  150. if err := t.execAndLog("tc", "class", "add",
  151. "dev", t.iface,
  152. "parent", "1:",
  153. "classid", fmt.Sprintf("1:%d", class),
  154. "htb", "rate", rate); err != nil {
  155. return -1, err
  156. }
  157. return class, nil
  158. }
  159. func (t *tcShaper) Limit(cidr string, upload, download *resource.Quantity) (err error) {
  160. var downloadClass, uploadClass int
  161. if download != nil {
  162. if downloadClass, err = t.makeNewClass(makeKBitString(download)); err != nil {
  163. return err
  164. }
  165. if err := t.execAndLog("tc", "filter", "add",
  166. "dev", t.iface,
  167. "protocol", "ip",
  168. "parent", "1:0",
  169. "prio", "1", "u32",
  170. "match", "ip", "dst", cidr,
  171. "flowid", fmt.Sprintf("1:%d", downloadClass)); err != nil {
  172. return err
  173. }
  174. }
  175. if upload != nil {
  176. if uploadClass, err = t.makeNewClass(makeKBitString(upload)); err != nil {
  177. return err
  178. }
  179. if err := t.execAndLog("tc", "filter", "add",
  180. "dev", t.iface,
  181. "protocol", "ip",
  182. "parent", "1:0",
  183. "prio", "1", "u32",
  184. "match", "ip", "src", cidr,
  185. "flowid", fmt.Sprintf("1:%d", uploadClass)); err != nil {
  186. return err
  187. }
  188. }
  189. return nil
  190. }
  191. // tests to see if an interface exists, if it does, return true and the status line for the interface
  192. // returns false, "", <err> if an error occurs.
  193. func (t *tcShaper) interfaceExists() (bool, string, error) {
  194. data, err := t.e.Command("tc", "qdisc", "show", "dev", t.iface).CombinedOutput()
  195. if err != nil {
  196. return false, "", err
  197. }
  198. value := strings.TrimSpace(string(data))
  199. if len(value) == 0 {
  200. return false, "", nil
  201. }
  202. // Newer versions of tc and/or the kernel return the following instead of nothing:
  203. // qdisc noqueue 0: root refcnt 2
  204. fields := strings.Fields(value)
  205. if len(fields) > 1 && fields[1] == "noqueue" {
  206. return false, "", nil
  207. }
  208. return true, value, nil
  209. }
  210. func (t *tcShaper) ReconcileCIDR(cidr string, upload, download *resource.Quantity) error {
  211. _, _, found, err := t.findCIDRClass(cidr)
  212. if err != nil {
  213. return err
  214. }
  215. if !found {
  216. return t.Limit(cidr, upload, download)
  217. }
  218. // TODO: actually check bandwidth limits here
  219. return nil
  220. }
  221. func (t *tcShaper) ReconcileInterface() error {
  222. exists, output, err := t.interfaceExists()
  223. if err != nil {
  224. return err
  225. }
  226. if !exists {
  227. glog.V(4).Info("Didn't find bandwidth interface, creating")
  228. return t.initializeInterface()
  229. }
  230. fields := strings.Split(output, " ")
  231. if len(fields) < 12 || fields[1] != "htb" || fields[2] != "1:" {
  232. if err := t.deleteInterface(fields[2]); err != nil {
  233. return err
  234. }
  235. return t.initializeInterface()
  236. }
  237. return nil
  238. }
  239. func (t *tcShaper) initializeInterface() error {
  240. return t.execAndLog("tc", "qdisc", "add", "dev", t.iface, "root", "handle", "1:", "htb", "default", "30")
  241. }
  242. func (t *tcShaper) Reset(cidr string) error {
  243. class, handle, found, err := t.findCIDRClass(cidr)
  244. if err != nil {
  245. return err
  246. }
  247. if !found {
  248. return fmt.Errorf("Failed to find cidr: %s on interface: %s", cidr, t.iface)
  249. }
  250. if err := t.execAndLog("tc", "filter", "del",
  251. "dev", t.iface,
  252. "parent", "1:",
  253. "proto", "ip",
  254. "prio", "1",
  255. "handle", handle, "u32"); err != nil {
  256. return err
  257. }
  258. return t.execAndLog("tc", "class", "del", "dev", t.iface, "parent", "1:", "classid", class)
  259. }
  260. func (t *tcShaper) deleteInterface(class string) error {
  261. return t.execAndLog("tc", "qdisc", "delete", "dev", t.iface, "root", "handle", class)
  262. }
  263. func (t *tcShaper) GetCIDRs() ([]string, error) {
  264. data, err := t.e.Command("tc", "filter", "show", "dev", t.iface).CombinedOutput()
  265. if err != nil {
  266. return nil, err
  267. }
  268. result := []string{}
  269. scanner := bufio.NewScanner(bytes.NewBuffer(data))
  270. for scanner.Scan() {
  271. line := strings.TrimSpace(scanner.Text())
  272. if len(line) == 0 {
  273. continue
  274. }
  275. if strings.Contains(line, "match") {
  276. parts := strings.Split(line, " ")
  277. // expected tc line:
  278. // match <cidr> at <number>
  279. if len(parts) != 4 {
  280. return nil, fmt.Errorf("unexpected output: %v", parts)
  281. }
  282. cidr, err := asciiCIDR(parts[1])
  283. if err != nil {
  284. return nil, err
  285. }
  286. result = append(result, cidr)
  287. }
  288. }
  289. return result, nil
  290. }