-
Notifications
You must be signed in to change notification settings - Fork 1
/
blem.go
117 lines (94 loc) · 3.4 KB
/
blem.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
/*
binlogTop
Author: 3manuek
License: GNU
*/
package main
import (
"flag"
"fmt"
"os"
"os/signal"
"syscall"
"time"
"github.com/juju/errors"
"github.com/siddontang/go-mysql/client"
"github.com/siddontang/go-mysql/mysql"
"github.com/siddontang/go-mysql/replication"
//https://github.com/percona/go-mysql
)
var (
// ./blem -port=22695 -password="msandbox" -user="msandbox" -interval=5
serverid = flag.Uint("serverid", 9999, "Server Id (must be unique)")
flavor = flag.String("flavor", "mysql", "Flavor: mysql or mariadb")
user = flag.String("user", "root", "MySQL user, must have replication privilege")
password = flag.String("password", "", "MySQL password.")
port = flag.Uint("port", 3306, "MySQL port.")
host = flag.String("host", "127.0.0.1", "MySQL host.")
interval = flag.Int("interval", 5, "Interval in seconds.")
binfile = flag.String("binfile", "", "Binlog File name")
binpos = flag.Int("binpos", 0, "Binglog File Pos")
keyParsing = flag.String("keyParsing", "", "Which keys do you want to analyze?")
tableParsing = flag.String("tableParsing", "", "Which table do you want to trace?")
columnParsing = flag.String("columnParsing", "", "Which column do you want to examinate?")
dbpath = flag.String("dbpath", "./binlogtop.db", "Generally you won't set this unless you have issues with space")
//removeStatsDB = flag.Bool("removeStatsDB", true, "Do I remove the stats DB or do you want to keep it?")
mode = flag.String("mode", "aggregated", "Possible modes: full, aggregated")
)
func main() {
flag.Parse()
mode_required := []string{"full", "aggregated"}
if IsValidMode(*mode, mode_required) == false {
fmt.Printf("Mode is not valid.")
os.Exit(2)
}
var hostport = fmt.Sprintf("%s:%d", *host, *port)
var convTime = (time.Duration)(*interval * 1000)
c := make(chan os.Signal, 2)
signal.Notify(c, os.Interrupt, syscall.SIGTERM)
go func() {
<-c
os.Exit(2)
}()
//MapTable := make(map[uint64]replication.TableMapEvent)
//MapCounters := make(map[TypeKeyEvent]uint64)
TableMap := make(map[uint64]TypeTableName)
MapStats := make(map[TypeKeyEvent]TypeDataEvent)
//
// Connection
//
cfg := replication.BinlogSyncerConfig{
ServerID: uint32(*serverid),
Flavor: *flavor,
Host: *host,
Port: uint16(*port),
User: *user,
Password: *password,
SemiSyncEnabled: false,
//RawModeEnabled: false,
}
conn, err := client.Connect(hostport, cfg.User, cfg.Password, "test")
if err != nil {
fmt.Printf("Cannot connect to host: %v \n", errors.ErrorStack(err))
os.Exit(3)
}
currPos := mysql.Position{*binfile, uint32(*binpos)}
if *binfile == "" || *binpos == 0 {
currPos = getCoordinates(conn) //currPos is mysql.Position
}
conn.Close()
Syncer := replication.NewBinlogSyncer(&cfg)
defer Syncer.Close()
streamer, _ := Syncer.StartSync(currPos)
go feedingThread(streamer, TableMap, MapStats) //better to add a time context and send info by channel?
//basic timer method.
for {
for ix, val := range MapStats {
//fmt.Println("Ix", ix)
fmt.Printf("Table: %s.%s | Event: %s | Accum Size: %d | Counted Events: %d \n", TableMap[ix.TableID].Schema, TableMap[ix.TableID].Table, ix.Event, val.AccumSize, val.Counted)
//fmt.Println(MapStats)
}
//fmt.Printf("--\n")
time.Sleep(convTime * time.Millisecond) //Calculate time by getting the Event timestamp
}
}