package main import( "log" "os" "time" "strconv" "github.com/eclipse/paho.mqtt.golang" "math/rand" "strings" "encoding/json" ) type CommandInfo struct { Command int `json:"command"` Addr string `json:"addr"` } /** * mqtt接收到msg后的回调函数 **/ func onMessageReceived(client mqtt.Client, msg mqtt.Message){ topic := strings.Split(msg.Topic(),"/") if topic[len(topic)-1] == "rt"{ //重启sensor或板子 rt 重启命令message var commandInfo CommandInfo err := json.Unmarshal([]byte(msg.Payload()), &commandInfo) if err != nil { log.Println("Can't decode json message", err) }else{ log.Println("get command:",commandInfo) if commandInfo.Addr =="" && (commandInfo.Command !=1 && commandInfo.Command !=2) { log.Println("received.fail.invalid command or addr") }else{ sendCommand(commandInfo.Addr,commandInfo.Command) } } } // log.Println(topic[len(topic)-1]) log.Printf("mqtt.received:%s|%s\n",msg.Topic(),msg.Payload()) } var mqttClient mqtt.Client func doSubscribe(){ //subscribe topic if token := mqttClient.Subscribe(conf.MQTT_TOPIC, 1, onMessageReceived); token.Wait() && token.Error() != nil { log.Println(token.Error()) os.Exit(1) } } func init() { if !conf.MQTT_OPEN { return; } //var data []byte = []byte{0x01,0x01,0x01,0x00} mqtt.ERROR = log.New(os.Stdout, "", 0) //connect mqtt-server and set clientID b := strconv.Itoa(rand.Intn(100)) clientId := "unit"+b opts := mqtt.NewClientOptions().AddBroker(conf.MQTT_BROKER).SetClientID(clientId) if conf.MQTT_USERNAME!="" && conf.MQTT_PASSWORD !=""{ opts.SetUsername(conf.MQTT_USERNAME) opts.SetPassword(conf.MQTT_PASSWORD) } opts.SetKeepAlive(time.Duration(conf.MQTT_KEEPALIVE) * time.Second) opts.SetDefaultPublishHandler(onMessageReceived) opts.SetPingTimeout(time.Duration(conf.MQTT_TIMEOUT) * time.Second) opts.OnConnect = func(c mqtt.Client) { log.Println("OnConnect success!") doSubscribe() } //create object mqttClient = mqtt.NewClient(opts) if token := mqttClient.Connect(); token.Wait() && token.Error() != nil { panic(token.Error()) } doSubscribe() }