123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189 |
- // Copyright 2016 CoreOS Inc
- //
- // Licensed under the Apache License, Version 2.0 (the "License");
- // you may not use this file except in compliance with the License.
- // You may obtain a copy of the License at
- //
- // http://www.apache.org/licenses/LICENSE-2.0
- //
- // Unless required by applicable law or agreed to in writing, software
- // distributed under the License is distributed on an "AS IS" BASIS,
- // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- // See the License for the specific language governing permissions and
- // limitations under the License.
- package progressutil
- import (
- "errors"
- "fmt"
- "io"
- "sync"
- "time"
- )
- var (
- ErrAlreadyStarted = errors.New("cannot add copies after PrintAndWait has been called")
- )
- type copyReader struct {
- reader io.Reader
- current int64
- total int64
- pb *ProgressBar
- }
- func (cr *copyReader) Read(p []byte) (int, error) {
- n, err := cr.reader.Read(p)
- cr.current += int64(n)
- err1 := cr.updateProgressBar()
- if err == nil {
- err = err1
- }
- return n, err
- }
- func (cr *copyReader) updateProgressBar() error {
- cr.pb.SetPrintAfter(cr.formattedProgress())
- progress := float64(cr.current) / float64(cr.total)
- if progress > 1 {
- progress = 1
- }
- return cr.pb.SetCurrentProgress(progress)
- }
- // NewCopyProgressPrinter returns a new CopyProgressPrinter
- func NewCopyProgressPrinter() *CopyProgressPrinter {
- return &CopyProgressPrinter{results: make(chan error), cancel: make(chan struct{})}
- }
- // CopyProgressPrinter will perform an arbitrary number of io.Copy calls, while
- // continually printing the progress of each copy.
- type CopyProgressPrinter struct {
- results chan error
- cancel chan struct{}
- // `lock` mutex protects all fields below it in CopyProgressPrinter struct
- lock sync.Mutex
- readers []*copyReader
- started bool
- pbp *ProgressBarPrinter
- }
- // AddCopy adds a copy for this CopyProgressPrinter to perform. An io.Copy call
- // will be made to copy bytes from reader to dest, and name and size will be
- // used to label the progress bar and display how much progress has been made.
- // If size is 0, the total size of the reader is assumed to be unknown.
- // AddCopy can only be called before PrintAndWait; otherwise, ErrAlreadyStarted
- // will be returned.
- func (cpp *CopyProgressPrinter) AddCopy(reader io.Reader, name string, size int64, dest io.Writer) error {
- cpp.lock.Lock()
- defer cpp.lock.Unlock()
- if cpp.started {
- return ErrAlreadyStarted
- }
- if cpp.pbp == nil {
- cpp.pbp = &ProgressBarPrinter{}
- cpp.pbp.PadToBeEven = true
- }
- cr := ©Reader{
- reader: reader,
- current: 0,
- total: size,
- pb: cpp.pbp.AddProgressBar(),
- }
- cr.pb.SetPrintBefore(name)
- cr.pb.SetPrintAfter(cr.formattedProgress())
- cpp.readers = append(cpp.readers, cr)
- go func() {
- _, err := io.Copy(dest, cr)
- select {
- case <-cpp.cancel:
- return
- case cpp.results <- err:
- return
- }
- }()
- return nil
- }
- // PrintAndWait will print the progress for each copy operation added with
- // AddCopy to printTo every printInterval. This will continue until every added
- // copy is finished, or until cancel is written to.
- // PrintAndWait may only be called once; any subsequent calls will immediately
- // return ErrAlreadyStarted. After PrintAndWait has been called, no more
- // copies may be added to the CopyProgressPrinter.
- func (cpp *CopyProgressPrinter) PrintAndWait(printTo io.Writer, printInterval time.Duration, cancel chan struct{}) error {
- cpp.lock.Lock()
- if cpp.started {
- cpp.lock.Unlock()
- return ErrAlreadyStarted
- }
- cpp.started = true
- cpp.lock.Unlock()
- n := len(cpp.readers)
- if n == 0 {
- // Nothing to do.
- return nil
- }
- defer close(cpp.cancel)
- t := time.NewTicker(printInterval)
- allDone := false
- for i := 0; i < n; {
- select {
- case <-cancel:
- return nil
- case <-t.C:
- _, err := cpp.pbp.Print(printTo)
- if err != nil {
- return err
- }
- case err := <-cpp.results:
- i++
- // Once completion is signaled, further on this just drains
- // (unlikely) errors from the channel.
- if err == nil && !allDone {
- allDone, err = cpp.pbp.Print(printTo)
- }
- if err != nil {
- return err
- }
- }
- }
- return nil
- }
- func (cr *copyReader) formattedProgress() string {
- var totalStr string
- if cr.total == 0 {
- totalStr = "?"
- } else {
- totalStr = ByteUnitStr(cr.total)
- }
- return fmt.Sprintf("%s / %s", ByteUnitStr(cr.current), totalStr)
- }
- var byteUnits = []string{"B", "KB", "MB", "GB", "TB", "PB"}
- // ByteUnitStr pretty prints a number of bytes.
- func ByteUnitStr(n int64) string {
- var unit string
- size := float64(n)
- for i := 1; i < len(byteUnits); i++ {
- if size < 1000 {
- unit = byteUnits[i-1]
- break
- }
- size = size / 1000
- }
- return fmt.Sprintf("%.3g %s", size, unit)
- }
|