forked from gonium/gosdm630
-
Notifications
You must be signed in to change notification settings - Fork 0
/
mqtt.go
120 lines (107 loc) · 3.01 KB
/
mqtt.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
package sdm630
import (
"fmt"
"log"
"time"
MQTT "github.com/eclipse/paho.mqtt.golang"
)
type MqttClient struct {
client MQTT.Client
mqttTopic string
mqttRate int
mqttQos int
in QuerySnipChannel
verbose bool
}
// Run MQTT client publisher
func (mqttClient *MqttClient) Run() {
mqttRateMap := make(map[string]int64)
for {
snip := <-mqttClient.in
if mqttClient.verbose {
log.Printf("MQTT: got meter data (device %d: data: %s, value: %.3f W, desc: %s)",
snip.DeviceId,
snip.IEC61850,
snip.Value,
GetIecDescription(snip.IEC61850))
}
uniqueID := fmt.Sprintf(UniqueIdFormat, snip.DeviceId)
topic := fmt.Sprintf("%s/%s/%s", mqttClient.mqttTopic, uniqueID, snip.IEC61850)
t := mqttRateMap[topic]
now := time.Now()
if mqttClient.mqttRate == 0 || now.Unix() > t {
message := fmt.Sprintf("%.3f", snip.Value)
token := mqttClient.client.Publish(topic, byte(mqttClient.mqttQos), true, message)
if mqttClient.verbose {
log.Printf("MQTT: push %s, message: %s", topic, message)
}
if token.Wait() && token.Error() != nil {
log.Fatal("MQTT: Error connecting, trying to reconnect: ", token.Error())
}
mqttRateMap[topic] = now.Unix() + int64(mqttClient.mqttRate)
} else {
if mqttClient.verbose {
log.Printf("MQTT: skipped %s, rate to high", topic)
}
}
}
}
func NewMqttClient(
in QuerySnipChannel,
mqttBroker string,
mqttTopic string,
mqttUser string,
mqttPassword string,
mqttClientID string,
mqttQos int,
mqttRate int,
mqttCleanSession bool,
verbose bool,
) *MqttClient {
mqttOpts := MQTT.NewClientOptions()
mqttOpts.AddBroker(mqttBroker)
mqttOpts.SetUsername(mqttUser)
mqttOpts.SetPassword(mqttPassword)
mqttOpts.SetClientID(mqttClientID)
mqttOpts.SetCleanSession(mqttCleanSession)
mqttOpts.SetAutoReconnect(true)
topic := fmt.Sprintf("%s/status", mqttTopic)
message := fmt.Sprintf("disconnected")
mqttOpts.SetWill(topic, message, byte(mqttQos), true)
log.Printf("Connecting MQTT at %s", mqttBroker)
if verbose {
log.Printf("\tclientid: %s\n", mqttClientID)
log.Printf("\tuser: %s\n", mqttUser)
if mqttPassword != "" {
log.Printf("\tpassword: ****\n")
}
log.Printf("\ttopic: %s\n", mqttTopic)
log.Printf("\tqos: %d\n", mqttQos)
log.Printf("\tcleansession: %v\n", mqttCleanSession)
}
mqttClient := MQTT.NewClient(mqttOpts)
if token := mqttClient.Connect(); token.Wait() && token.Error() != nil {
log.Fatal("MQTT: error connecting: ", token.Error())
panic(token.Error())
}
if verbose {
log.Println("MQTT: connected")
}
// notify connection
message = fmt.Sprintf("connected")
token := mqttClient.Publish(topic, byte(mqttQos), true, message)
if verbose {
log.Printf("MQTT: push %s, message: %s", topic, message)
}
if token.Wait() && token.Error() != nil {
log.Fatal("MQTT: Error connecting, trying to reconnect: ", token.Error())
}
return &MqttClient{
in: in,
client: mqttClient,
mqttTopic: mqttTopic,
mqttRate: mqttRate,
mqttQos: mqttQos,
verbose: verbose,
}
}