iocopy.go 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189
  1. // Copyright 2016 CoreOS Inc
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package progressutil
  15. import (
  16. "errors"
  17. "fmt"
  18. "io"
  19. "sync"
  20. "time"
  21. )
  22. var (
  23. ErrAlreadyStarted = errors.New("cannot add copies after PrintAndWait has been called")
  24. )
  25. type copyReader struct {
  26. reader io.Reader
  27. current int64
  28. total int64
  29. pb *ProgressBar
  30. }
  31. func (cr *copyReader) Read(p []byte) (int, error) {
  32. n, err := cr.reader.Read(p)
  33. cr.current += int64(n)
  34. err1 := cr.updateProgressBar()
  35. if err == nil {
  36. err = err1
  37. }
  38. return n, err
  39. }
  40. func (cr *copyReader) updateProgressBar() error {
  41. cr.pb.SetPrintAfter(cr.formattedProgress())
  42. progress := float64(cr.current) / float64(cr.total)
  43. if progress > 1 {
  44. progress = 1
  45. }
  46. return cr.pb.SetCurrentProgress(progress)
  47. }
  48. // NewCopyProgressPrinter returns a new CopyProgressPrinter
  49. func NewCopyProgressPrinter() *CopyProgressPrinter {
  50. return &CopyProgressPrinter{results: make(chan error), cancel: make(chan struct{})}
  51. }
  52. // CopyProgressPrinter will perform an arbitrary number of io.Copy calls, while
  53. // continually printing the progress of each copy.
  54. type CopyProgressPrinter struct {
  55. results chan error
  56. cancel chan struct{}
  57. // `lock` mutex protects all fields below it in CopyProgressPrinter struct
  58. lock sync.Mutex
  59. readers []*copyReader
  60. started bool
  61. pbp *ProgressBarPrinter
  62. }
  63. // AddCopy adds a copy for this CopyProgressPrinter to perform. An io.Copy call
  64. // will be made to copy bytes from reader to dest, and name and size will be
  65. // used to label the progress bar and display how much progress has been made.
  66. // If size is 0, the total size of the reader is assumed to be unknown.
  67. // AddCopy can only be called before PrintAndWait; otherwise, ErrAlreadyStarted
  68. // will be returned.
  69. func (cpp *CopyProgressPrinter) AddCopy(reader io.Reader, name string, size int64, dest io.Writer) error {
  70. cpp.lock.Lock()
  71. defer cpp.lock.Unlock()
  72. if cpp.started {
  73. return ErrAlreadyStarted
  74. }
  75. if cpp.pbp == nil {
  76. cpp.pbp = &ProgressBarPrinter{}
  77. cpp.pbp.PadToBeEven = true
  78. }
  79. cr := &copyReader{
  80. reader: reader,
  81. current: 0,
  82. total: size,
  83. pb: cpp.pbp.AddProgressBar(),
  84. }
  85. cr.pb.SetPrintBefore(name)
  86. cr.pb.SetPrintAfter(cr.formattedProgress())
  87. cpp.readers = append(cpp.readers, cr)
  88. go func() {
  89. _, err := io.Copy(dest, cr)
  90. select {
  91. case <-cpp.cancel:
  92. return
  93. case cpp.results <- err:
  94. return
  95. }
  96. }()
  97. return nil
  98. }
  99. // PrintAndWait will print the progress for each copy operation added with
  100. // AddCopy to printTo every printInterval. This will continue until every added
  101. // copy is finished, or until cancel is written to.
  102. // PrintAndWait may only be called once; any subsequent calls will immediately
  103. // return ErrAlreadyStarted. After PrintAndWait has been called, no more
  104. // copies may be added to the CopyProgressPrinter.
  105. func (cpp *CopyProgressPrinter) PrintAndWait(printTo io.Writer, printInterval time.Duration, cancel chan struct{}) error {
  106. cpp.lock.Lock()
  107. if cpp.started {
  108. cpp.lock.Unlock()
  109. return ErrAlreadyStarted
  110. }
  111. cpp.started = true
  112. cpp.lock.Unlock()
  113. n := len(cpp.readers)
  114. if n == 0 {
  115. // Nothing to do.
  116. return nil
  117. }
  118. defer close(cpp.cancel)
  119. t := time.NewTicker(printInterval)
  120. allDone := false
  121. for i := 0; i < n; {
  122. select {
  123. case <-cancel:
  124. return nil
  125. case <-t.C:
  126. _, err := cpp.pbp.Print(printTo)
  127. if err != nil {
  128. return err
  129. }
  130. case err := <-cpp.results:
  131. i++
  132. // Once completion is signaled, further on this just drains
  133. // (unlikely) errors from the channel.
  134. if err == nil && !allDone {
  135. allDone, err = cpp.pbp.Print(printTo)
  136. }
  137. if err != nil {
  138. return err
  139. }
  140. }
  141. }
  142. return nil
  143. }
  144. func (cr *copyReader) formattedProgress() string {
  145. var totalStr string
  146. if cr.total == 0 {
  147. totalStr = "?"
  148. } else {
  149. totalStr = ByteUnitStr(cr.total)
  150. }
  151. return fmt.Sprintf("%s / %s", ByteUnitStr(cr.current), totalStr)
  152. }
  153. var byteUnits = []string{"B", "KB", "MB", "GB", "TB", "PB"}
  154. // ByteUnitStr pretty prints a number of bytes.
  155. func ByteUnitStr(n int64) string {
  156. var unit string
  157. size := float64(n)
  158. for i := 1; i < len(byteUnits); i++ {
  159. if size < 1000 {
  160. unit = byteUnits[i-1]
  161. break
  162. }
  163. size = size / 1000
  164. }
  165. return fmt.Sprintf("%.3g %s", size, unit)
  166. }