diff --git a/device.go b/device.go index 4d54ad91..8ee0e443 100644 --- a/device.go +++ b/device.go @@ -34,6 +34,10 @@ const ( subscribeChCap = 100 ) +// DeviceClient inplements mqtt.Client interface. +// Publishing messages and subscribing topics are queued in the DeviceClient if +// the network connection is lost. They are re-tried after the connection is +// resumed. type DeviceClient struct { opt *Options mqttOpt *mqtt.ClientOptions @@ -92,12 +96,15 @@ func New(opt *Options) *DeviceClient { d.mqttOpt.SetAutoReconnect(false) // MQTT AutoReconnect doesn't work well for mqtts d.mqttOpt.SetConnectTimeout(time.Second * 5) - d.connect() - go connectionHandler(d) - return d } +func (s *DeviceClient) Connect() mqtt.Token { + s.connect() + go connectionHandler(s) + return &mqtt.DummyToken{} +} + func (s *DeviceClient) connect() { s.cli = mqtt.NewClient(s.mqttOpt) @@ -111,17 +118,46 @@ func (s *DeviceClient) connect() { }() } -func (s *DeviceClient) Disconnect() { +func (s *DeviceClient) Disconnect(quiesce uint) { s.stateUpdateCh <- terminating } -func (s *DeviceClient) Publish(topic string, payload interface{}) { +func (s *DeviceClient) Publish(topic string, qos byte, retained bool, payload interface{}) mqtt.Token { s.publishCh <- &pubqueue.Data{Topic: topic, Payload: payload} + return &mqtt.DummyToken{} } -func (s *DeviceClient) Subscribe(topic string, cb mqtt.MessageHandler) { +func (s *DeviceClient) Subscribe(topic string, qos byte, cb mqtt.MessageHandler) mqtt.Token { s.subscribeCh <- &subqueue.Subscription{Type: subqueue.Subscribe, Topic: topic, Cb: cb} + return &mqtt.DummyToken{} +} + +func (s *DeviceClient) SubscribeMultiple(filters map[string]byte, callback mqtt.MessageHandler) mqtt.Token { + for topic, qos := range filters { + s.Subscribe(topic, qos, callback) + } + return &mqtt.DummyToken{} +} + +func (s *DeviceClient) Unsubscribe(topics ...string) mqtt.Token { + for _, topic := range topics { + s.subscribeCh <- &subqueue.Subscription{Type: subqueue.Unsubscribe, Topic: topic, Cb: nil} + } + return &mqtt.DummyToken{} +} + +func (s *DeviceClient) AddRoute(topic string, callback mqtt.MessageHandler) { + panic("awsiotdev doesn't support AddRoute") +} + +func (s *DeviceClient) IsConnected() bool { + return s.cli.IsConnected() +} +func (s *DeviceClient) IsConnectionOpen() bool { + // paho.mqtt.golang v1.1.1 don't have it. + // this will be added in the next version. + return true // since offline queued } -func (s *DeviceClient) Unsubscribe(topic string) { - s.subscribeCh <- &subqueue.Subscription{Type: subqueue.Unsubscribe, Topic: topic, Cb: nil} +func (s *DeviceClient) OptionsReader() mqtt.ClientOptionsReader { + return s.cli.OptionsReader() } diff --git a/device_test.go b/device_test.go new file mode 100644 index 00000000..f84d8f24 --- /dev/null +++ b/device_test.go @@ -0,0 +1,27 @@ +// Copyright 2018 SEQSENSE, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package awsiotdev + +import ( + "testing" + + mqtt "github.com/eclipse/paho.mqtt.golang" +) + +func TestClientInterface(t *testing.T) { + var cli mqtt.Client + cli = &DeviceClient{} + _ = cli +} diff --git a/go.mod b/go.mod new file mode 100644 index 00000000..3ecb5e44 --- /dev/null +++ b/go.mod @@ -0,0 +1,6 @@ +module github.com/seqsense/aws-iot-device-sdk-go + +require ( + github.com/eclipse/paho.mqtt.golang v1.1.1 + golang.org/x/net v0.0.0-20181102091132-c10e9556a7bc // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 00000000..2a39bb98 --- /dev/null +++ b/go.sum @@ -0,0 +1,4 @@ +github.com/eclipse/paho.mqtt.golang v1.1.1 h1:iPJYXJLaViCshRTW/PSqImSS6HJ2Rf671WR0bXZ2GIU= +github.com/eclipse/paho.mqtt.golang v1.1.1/go.mod h1:H9keYFcgq3Qr5OUJm/JZI/i6U7joQ8SYLhZwfeOo6Ts= +golang.org/x/net v0.0.0-20181102091132-c10e9556a7bc h1:ZMCWScCvS2fUVFw8LOpxyUUW5qiviqr4Dg5NdjLeiLU= +golang.org/x/net v0.0.0-20181102091132-c10e9556a7bc/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= diff --git a/sample/main.go b/sample/main.go index 4581dd7a..38ed4f9e 100644 --- a/sample/main.go +++ b/sample/main.go @@ -73,7 +73,8 @@ func main() { AutoResubscribe: true, } cli := awsiot.New(o) - cli.Subscribe("test", messageHandler) + cli.Connect() + cli.Subscribe("test", 1, messageHandler) sig := make(chan os.Signal, 1) signal.Notify(sig, os.Interrupt, syscall.SIGTERM) @@ -85,7 +86,7 @@ func main() { case <-sig: return case <-tick.C: - cli.Publish("notification", "{\"status\": \"tick\"}") + cli.Publish("notification", 1, false, "{\"status\": \"tick\"}") } } }