-
Notifications
You must be signed in to change notification settings - Fork 4
/
main.go
153 lines (131 loc) · 4.03 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
package main
import (
"flag"
"fmt"
"image"
"log"
"math/rand"
"os"
"os/signal"
"runtime/pprof"
"sync"
"time"
"github.com/SpeckiJ/Hochwasser/pixelflut"
"github.com/SpeckiJ/Hochwasser/render"
"github.com/SpeckiJ/Hochwasser/rpc"
)
var (
imgPath = flag.String("image", "", "Filepath of an image to flut")
ránAddr = flag.String("rán", "", "Start RPC server to distribute jobs, listening on the given address/port")
hevringAddr = flag.String("hevring", "", "Connect to RPC server at given address/port")
address = flag.String("host", ":1234", "Target server address")
connections = flag.Int("connections", 4, "Number of simultaneous connections. Each connection posts a subimage")
x = flag.Int("x", 0, "Offset of posted image from left border")
y = flag.Int("y", 0, "Offset of posted image from top border")
order = flag.String("order", "rtl", "Draw order (shuffle, ltr, rtl, ttb, btt)")
fetchImgPath = flag.String("fetch", "", "Enable fetching the screen area to the given local file, updating it each second")
hevringImgPath = flag.String("hevring-preview", "", "Write the current task image to the given PNG file")
cpuprofile = flag.String("cpuprofile", "", "Destination file for CPU Profile")
)
func main() {
flag.Parse()
rand.Seed(time.Now().UnixNano())
task := runWithExitHandler(taskFromFlags)
if *cpuprofile != "" {
runWithProfiler(*cpuprofile, task)
} else {
task()
}
}
func taskFromFlags(stop chan bool, wg *sync.WaitGroup) {
rán := *ránAddr
hev := *hevringAddr
startServer := rán != "" || (hev == "" && *imgPath != "")
startClient := hev != "" || (rán == "" && *imgPath != "")
fetchImg := *fetchImgPath != ""
if !(startServer || startClient || fetchImg) {
fmt.Println("Error: At least one of the following flags is needed:\n -image -rán -hevring\n")
flag.Usage()
os.Exit(1)
}
if startServer && startClient && rán == "" && hev == "" {
rán = fmt.Sprintf(":%d", rand.Intn(30000)+1000)
hev = rán
}
if startServer {
r := rpc.SummonRán(rán, stop, wg)
var img *image.NRGBA
if *imgPath != "" {
var err error
if img, err = render.ReadImage(*imgPath); err != nil {
log.Fatal(err)
}
}
r.SetTask(pixelflut.FlutTask{
FlutTaskOpts: pixelflut.FlutTaskOpts{
Address: *address,
MaxConns: *connections,
Offset: image.Pt(*x, *y),
RenderOrder: pixelflut.NewOrder(*order),
},
Img: img,
})
}
if startClient {
hevring := rpc.ConnectHevring(hev, stop, wg)
hevring.PreviewPath = *hevringImgPath
}
if fetchImg {
canvasToFile(*fetchImgPath, *address, time.Second, stop, wg)
}
}
func canvasToFile(filepath, server string, interval time.Duration, stop chan bool, wg *sync.WaitGroup) {
// async fetch the image
fetchedImg := pixelflut.FetchImage(nil, server, 1, stop)
// write it in a fixed interval
go func() {
wg.Add(1)
defer wg.Done()
for loop := true; loop; {
select {
case <-stop:
loop = false
case <-time.Tick(interval):
}
render.WriteImage(filepath, fetchedImg)
}
}()
}
// Takes a non-blocking function, and provides it an interface for graceful shutdown:
// stop chan is closed if the routine should be stopped. before quitting, wg is awaited.
func runWithExitHandler(task func(stop chan bool, wg *sync.WaitGroup)) func() {
return func() {
wg := sync.WaitGroup{}
stopChan := make(chan bool)
interruptChan := make(chan os.Signal)
signal.Notify(interruptChan, os.Interrupt)
task(stopChan, &wg)
// block until we get an interrupt, or somebody says we need to quit (by closing stopChan)
select {
case <-interruptChan:
case <-stopChan:
stopChan = nil
}
if stopChan != nil {
// notify all async tasks to stop on interrupt, if channel wasn't closed already
close(stopChan)
}
// then wait for clean shutdown of all tasks before exiting
wg.Wait()
}
}
func runWithProfiler(outfile string, task func()) {
f, err := os.Create(outfile)
if err != nil {
log.Fatal(err)
}
defer f.Close()
pprof.StartCPUProfile(f)
defer pprof.StopCPUProfile()
task()
}