YHGW/mq.go

85 lines
2.1 KiB
Go

package main
import(
"log"
"os"
"time"
"strconv"
"github.com/eclipse/paho.mqtt.golang"
"math/rand"
"strings"
"encoding/json"
)
type CommandInfo struct {
Command int `json:"command"`
Addr string `json:"addr"`
}
/**
* mqtt接收到msg后的回调函数
**/
func onMessageReceived(client mqtt.Client, msg mqtt.Message){
topic := strings.Split(msg.Topic(),"/")
if topic[len(topic)-1] == "rt"{
//重启sensor或板子 rt 重启命令message
var commandInfo CommandInfo
err := json.Unmarshal([]byte(msg.Payload()), &commandInfo)
if err != nil {
log.Println("Can't decode json message", err)
}else{
log.Println("get command:",commandInfo)
if commandInfo.Addr =="" && (commandInfo.Command !=1 && commandInfo.Command !=2) {
log.Println("received.fail.invalid command or addr")
}else{
sendCommand(commandInfo.Addr,commandInfo.Command)
}
}
}
// log.Println(topic[len(topic)-1])
log.Printf("mqtt.received:%s|%s\n",msg.Topic(),msg.Payload())
}
var mqttClient mqtt.Client
func doSubscribe(){
//subscribe topic
if token := mqttClient.Subscribe(conf.MQTT_TOPIC, 1, onMessageReceived); token.Wait() && token.Error() != nil {
log.Println(token.Error())
os.Exit(1)
}
}
func init() {
if !conf.MQTT_OPEN {
return;
}
//var data []byte = []byte{0x01,0x01,0x01,0x00}
mqtt.ERROR = log.New(os.Stdout, "", 0)
//connect mqtt-server and set clientID
b := strconv.Itoa(rand.Intn(100))
clientId := "unit"+b
opts := mqtt.NewClientOptions().AddBroker(conf.MQTT_BROKER).SetClientID(clientId)
if conf.MQTT_USERNAME!="" && conf.MQTT_PASSWORD !=""{
opts.SetUsername(conf.MQTT_USERNAME)
opts.SetPassword(conf.MQTT_PASSWORD)
}
opts.SetKeepAlive(time.Duration(conf.MQTT_KEEPALIVE) * time.Second)
opts.SetDefaultPublishHandler(onMessageReceived)
opts.SetPingTimeout(time.Duration(conf.MQTT_TIMEOUT) * time.Second)
opts.OnConnect = func(c mqtt.Client) {
log.Println("OnConnect success!")
doSubscribe()
}
//create object
mqttClient = mqtt.NewClient(opts)
if token := mqttClient.Connect(); token.Wait() && token.Error() != nil {
panic(token.Error())
}
doSubscribe()
}