85 lines
2.1 KiB
Go
85 lines
2.1 KiB
Go
package main
|
|
|
|
import(
|
|
"log"
|
|
"os"
|
|
"time"
|
|
"strconv"
|
|
"github.com/eclipse/paho.mqtt.golang"
|
|
"math/rand"
|
|
"strings"
|
|
"encoding/json"
|
|
)
|
|
|
|
type CommandInfo struct {
|
|
Command int `json:"command"`
|
|
Addr string `json:"addr"`
|
|
}
|
|
|
|
/**
|
|
* mqtt接收到msg后的回调函数
|
|
**/
|
|
func onMessageReceived(client mqtt.Client, msg mqtt.Message){
|
|
topic := strings.Split(msg.Topic(),"/")
|
|
if topic[len(topic)-1] == "rt"{
|
|
//重启sensor或板子 rt 重启命令message
|
|
var commandInfo CommandInfo
|
|
err := json.Unmarshal([]byte(msg.Payload()), &commandInfo)
|
|
if err != nil {
|
|
log.Println("Can't decode json message", err)
|
|
}else{
|
|
log.Println("get command:",commandInfo)
|
|
if commandInfo.Addr =="" && (commandInfo.Command !=1 && commandInfo.Command !=2) {
|
|
log.Println("received.fail.invalid command or addr")
|
|
}else{
|
|
sendCommand(commandInfo.Addr,commandInfo.Command)
|
|
}
|
|
}
|
|
}
|
|
// log.Println(topic[len(topic)-1])
|
|
log.Printf("mqtt.received:%s|%s\n",msg.Topic(),msg.Payload())
|
|
}
|
|
|
|
var mqttClient mqtt.Client
|
|
|
|
func doSubscribe(){
|
|
//subscribe topic
|
|
if token := mqttClient.Subscribe(conf.MQTT_TOPIC, 1, onMessageReceived); token.Wait() && token.Error() != nil {
|
|
log.Println(token.Error())
|
|
os.Exit(1)
|
|
}
|
|
}
|
|
|
|
func init() {
|
|
if !conf.MQTT_OPEN {
|
|
return;
|
|
}
|
|
//var data []byte = []byte{0x01,0x01,0x01,0x00}
|
|
mqtt.ERROR = log.New(os.Stdout, "", 0)
|
|
|
|
//connect mqtt-server and set clientID
|
|
b := strconv.Itoa(rand.Intn(100))
|
|
clientId := "unit"+b
|
|
opts := mqtt.NewClientOptions().AddBroker(conf.MQTT_BROKER).SetClientID(clientId)
|
|
if conf.MQTT_USERNAME!="" && conf.MQTT_PASSWORD !=""{
|
|
opts.SetUsername(conf.MQTT_USERNAME)
|
|
opts.SetPassword(conf.MQTT_PASSWORD)
|
|
}
|
|
|
|
opts.SetKeepAlive(time.Duration(conf.MQTT_KEEPALIVE) * time.Second)
|
|
opts.SetDefaultPublishHandler(onMessageReceived)
|
|
opts.SetPingTimeout(time.Duration(conf.MQTT_TIMEOUT) * time.Second)
|
|
opts.OnConnect = func(c mqtt.Client) {
|
|
log.Println("OnConnect success!")
|
|
doSubscribe()
|
|
}
|
|
|
|
//create object
|
|
mqttClient = mqtt.NewClient(opts)
|
|
if token := mqttClient.Connect(); token.Wait() && token.Error() != nil {
|
|
panic(token.Error())
|
|
}
|
|
doSubscribe()
|
|
|
|
}
|