Skip to content

Commit

Permalink
fix virt-v2v progress
Browse files Browse the repository at this point in the history
Signed-off-by: Bella Khizgiyaev <[email protected]>
  • Loading branch information
bkhizgiy committed May 19, 2024
1 parent a674470 commit f7b6f8c
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 21 deletions.
24 changes: 19 additions & 5 deletions cmd/virt-v2v-monitor/virt-v2v-monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package main
import (
"bufio"
"flag"
"fmt"
"net/http"
"os"
"regexp"
Expand All @@ -15,7 +16,7 @@ import (
)

var COPY_DISK_RE = regexp.MustCompile(`^.*Copying disk (\d+)/(\d+)`)
var DISK_PROGRESS_RE = regexp.MustCompile(`^..\s*(\d+)% \[.*\]`)
var DISK_PROGRESS_RE = regexp.MustCompile(`\s+\((\d+).*|.+ (\d+)% \[[*-]+\]`)
var FINISHED_RE = regexp.MustCompile(`^\[[ .0-9]*\] Finishing off`)

// Here is a scan function that imposes limit on returned line length. virt-v2v
Expand All @@ -28,7 +29,7 @@ func LimitedScanLines(data []byte, atEOF bool) (advance int, token []byte, err e
if token != nil || err != nil {
return
}
if len(data) == bufio.MaxScanTokenSize {
if len(data) == 1*1024*1024 {
// Line is too long for the buffer. Trim it.
advance = len(data)
token = data
Expand Down Expand Up @@ -57,13 +58,22 @@ func updateProgress(progressCounter *prometheus.CounterVec, disk, progress uint6
return
}

func NewBufferedScanner(r *bufio.Reader, bufferSize int) *bufio.Scanner {
scanner := bufio.NewScanner(r)
buf := make([]byte, bufferSize)
scanner.Buffer(buf, bufferSize)
scanner.Split(LimitedScanLines)
return scanner
}

func main() {
klog.InitFlags(nil)
defer klog.Flush()
flag.Parse()

// Start prometheus metrics HTTP handler
klog.Info("Setting up prometheus endpoint :2112/metrics")
klog.Info("this is Bella test")
http.Handle("/metrics", promhttp.Handler())
go http.ListenAndServe(":2112", nil)

Expand All @@ -87,8 +97,9 @@ func main() {
var disks uint64 = 0
var progress uint64 = 0

scanner := bufio.NewScanner(os.Stdin)
scanner.Split(LimitedScanLines)
reader := bufio.NewReader(os.Stdin)
scanner := NewBufferedScanner(reader, 1*1024*1024)

for scanner.Scan() {
line := scanner.Bytes()
os.Stdout.Write(line)
Expand All @@ -98,13 +109,16 @@ func main() {
klog.Fatal("Output monitoring failed! ", err)
}

fmt.Println("this is the line we scanning now ", string(line))

if match := COPY_DISK_RE.FindSubmatch(line); match != nil {
diskNumber, _ = strconv.ParseUint(string(match[1]), 10, 0)
disks, _ = strconv.ParseUint(string(match[2]), 10, 0)
klog.Infof("Copying disk %d out of %d", diskNumber, disks)
progress = 0
err = updateProgress(progressCounter, diskNumber, progress)
} else if match := DISK_PROGRESS_RE.FindSubmatch(line); match != nil {
klog.Info("we are here at progress ", line)
progress, _ = strconv.ParseUint(string(match[1]), 10, 0)
klog.Infof("Progress update, completed %d %%", progress)
err = updateProgress(progressCounter, diskNumber, progress)
Expand All @@ -116,7 +130,7 @@ func main() {
err = updateProgress(progressCounter, disk, 100)
}
} else {
klog.V(1).Info("Ignoring line: ", string(line))
klog.Infof("Ignoring line: ", string(line))
}
if err != nil {
// Don't make processing errors fatal.
Expand Down
25 changes: 9 additions & 16 deletions virt-v2v/cold/entrypoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func main() {
}

func buildCommand() []string {
virtV2vArgs := []string{"virt-v2v", "-v", "-x"}
virtV2vArgs := []string{"virt-v2v"}
source := os.Getenv("V2V_source")

if !isValidSource(source) {
Expand Down Expand Up @@ -194,33 +194,26 @@ func LinkDisks(diskKind string, num int) (err error) {

func executeVirtV2v(source string, args []string) (err error) {
virtV2vCmd := exec.Command(args[0], args[1:]...)
virtV2vStdoutPipe, err := virtV2vCmd.StdoutPipe()
stdoutPipe, err := virtV2vCmd.StdoutPipe()
if err != nil {
fmt.Printf("Error setting up stdout pipe: %v\n", err)
return
}
teeOut := io.TeeReader(virtV2vStdoutPipe, os.Stdout)

var teeErr io.Reader
if source == OVA {
virtV2vStderrPipe, err := virtV2vCmd.StderrPipe()
if err != nil {
fmt.Printf("Error setting up stdout pipe: %v\n", err)
return err
}
teeErr = io.TeeReader(virtV2vStderrPipe, os.Stderr)
} else {
virtV2vCmd.Stderr = os.Stderr
stderrPipe, err := virtV2vCmd.StderrPipe()
if err != nil {
fmt.Printf("Error setting up stderr pipe: %v\n", err)
return
}

fmt.Println("exec ", virtV2vCmd)
if err = virtV2vCmd.Start(); err != nil {
fmt.Printf("Error executing command: %v\n", err)
return
}

combinedReader := io.MultiReader(stdoutPipe, stderrPipe)
virtV2vMonitorCmd := exec.Command("/usr/local/bin/virt-v2v-monitor")
virtV2vMonitorCmd.Stdin = teeOut
virtV2vMonitorCmd.Stdin = combinedReader
virtV2vMonitorCmd.Stdout = os.Stdout
virtV2vMonitorCmd.Stderr = os.Stderr

Expand All @@ -230,7 +223,7 @@ func executeVirtV2v(source string, args []string) (err error) {
}

if source == OVA {
scanner := bufio.NewScanner(teeErr)
scanner := bufio.NewScanner(stderrPipe)
const maxCapacity = 1024 * 1024
buf := make([]byte, 0, 64*1024)
scanner.Buffer(buf, maxCapacity)
Expand Down

0 comments on commit f7b6f8c

Please sign in to comment.