Skip to content

Commit

Permalink
Merge pull request #15 from seqsense/mqtt-cli-interface-compat
Browse files Browse the repository at this point in the history
Make DeviceClient compatible with mqtt.Client interface
  • Loading branch information
at-wat authored Nov 5, 2018
2 parents b2be7c8 + e48051d commit 3807b97
Show file tree
Hide file tree
Showing 5 changed files with 84 additions and 10 deletions.
52 changes: 44 additions & 8 deletions device.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ const (
subscribeChCap = 100
)

// DeviceClient inplements mqtt.Client interface.
// Publishing messages and subscribing topics are queued in the DeviceClient if
// the network connection is lost. They are re-tried after the connection is
// resumed.
type DeviceClient struct {
opt *Options
mqttOpt *mqtt.ClientOptions
Expand Down Expand Up @@ -92,12 +96,15 @@ func New(opt *Options) *DeviceClient {
d.mqttOpt.SetAutoReconnect(false) // MQTT AutoReconnect doesn't work well for mqtts
d.mqttOpt.SetConnectTimeout(time.Second * 5)

d.connect()
go connectionHandler(d)

return d
}

func (s *DeviceClient) Connect() mqtt.Token {
s.connect()
go connectionHandler(s)
return &mqtt.DummyToken{}
}

func (s *DeviceClient) connect() {
s.cli = mqtt.NewClient(s.mqttOpt)

Expand All @@ -111,17 +118,46 @@ func (s *DeviceClient) connect() {
}()
}

func (s *DeviceClient) Disconnect() {
func (s *DeviceClient) Disconnect(quiesce uint) {
s.stateUpdateCh <- terminating
}

func (s *DeviceClient) Publish(topic string, payload interface{}) {
func (s *DeviceClient) Publish(topic string, qos byte, retained bool, payload interface{}) mqtt.Token {
s.publishCh <- &pubqueue.Data{Topic: topic, Payload: payload}
return &mqtt.DummyToken{}
}

func (s *DeviceClient) Subscribe(topic string, cb mqtt.MessageHandler) {
func (s *DeviceClient) Subscribe(topic string, qos byte, cb mqtt.MessageHandler) mqtt.Token {
s.subscribeCh <- &subqueue.Subscription{Type: subqueue.Subscribe, Topic: topic, Cb: cb}
return &mqtt.DummyToken{}
}

func (s *DeviceClient) SubscribeMultiple(filters map[string]byte, callback mqtt.MessageHandler) mqtt.Token {
for topic, qos := range filters {
s.Subscribe(topic, qos, callback)
}
return &mqtt.DummyToken{}
}

func (s *DeviceClient) Unsubscribe(topics ...string) mqtt.Token {
for _, topic := range topics {
s.subscribeCh <- &subqueue.Subscription{Type: subqueue.Unsubscribe, Topic: topic, Cb: nil}
}
return &mqtt.DummyToken{}
}

func (s *DeviceClient) AddRoute(topic string, callback mqtt.MessageHandler) {
panic("awsiotdev doesn't support AddRoute")
}

func (s *DeviceClient) IsConnected() bool {
return s.cli.IsConnected()
}
func (s *DeviceClient) IsConnectionOpen() bool {
// paho.mqtt.golang v1.1.1 don't have it.
// this will be added in the next version.
return true // since offline queued
}
func (s *DeviceClient) Unsubscribe(topic string) {
s.subscribeCh <- &subqueue.Subscription{Type: subqueue.Unsubscribe, Topic: topic, Cb: nil}
func (s *DeviceClient) OptionsReader() mqtt.ClientOptionsReader {
return s.cli.OptionsReader()
}
27 changes: 27 additions & 0 deletions device_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
// Copyright 2018 SEQSENSE, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package awsiotdev

import (
"testing"

mqtt "github.com/eclipse/paho.mqtt.golang"
)

func TestClientInterface(t *testing.T) {
var cli mqtt.Client
cli = &DeviceClient{}
_ = cli
}
6 changes: 6 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
module github.com/seqsense/aws-iot-device-sdk-go

require (
github.com/eclipse/paho.mqtt.golang v1.1.1
golang.org/x/net v0.0.0-20181102091132-c10e9556a7bc // indirect
)
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
github.com/eclipse/paho.mqtt.golang v1.1.1 h1:iPJYXJLaViCshRTW/PSqImSS6HJ2Rf671WR0bXZ2GIU=
github.com/eclipse/paho.mqtt.golang v1.1.1/go.mod h1:H9keYFcgq3Qr5OUJm/JZI/i6U7joQ8SYLhZwfeOo6Ts=
golang.org/x/net v0.0.0-20181102091132-c10e9556a7bc h1:ZMCWScCvS2fUVFw8LOpxyUUW5qiviqr4Dg5NdjLeiLU=
golang.org/x/net v0.0.0-20181102091132-c10e9556a7bc/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
5 changes: 3 additions & 2 deletions sample/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,8 @@ func main() {
AutoResubscribe: true,
}
cli := awsiot.New(o)
cli.Subscribe("test", messageHandler)
cli.Connect()
cli.Subscribe("test", 1, messageHandler)

sig := make(chan os.Signal, 1)
signal.Notify(sig, os.Interrupt, syscall.SIGTERM)
Expand All @@ -85,7 +86,7 @@ func main() {
case <-sig:
return
case <-tick.C:
cli.Publish("notification", "{\"status\": \"tick\"}")
cli.Publish("notification", 1, false, "{\"status\": \"tick\"}")
}
}
}

0 comments on commit 3807b97

Please sign in to comment.