完善mqtt机制,接收下行数据,完善重启sensor机制

This commit is contained in:
fish 2019-08-16 17:07:52 +08:00
parent 99bfea5d5a
commit caa7c40311
9 changed files with 183 additions and 29 deletions

View File

@ -56,6 +56,15 @@ lora_nodes:
- {"adr":"0c0c1d18",location_type":"changing_desk","location_code":"spao","location_data":"data12","current_data":0,"device":"yunhorn_kgl_c16_v1_1"}
- {"adr":"0c3b1518",location_type":"changing_desk","location_code":"spao","location_data":"data12","current_data":0,"device":"yunhorn_kgl_c16_v1_1"}
- {"adr":"0c4c4c17",location_type":"","location_code":"spao","location_data":"data12","current_data":0,"device":"yunhorn_kgl_c16_v1_1"}
#吸顶
- {"adr":"0c4b5a17",location_type":"","location_code":"spao","location_data":"data12","current_data":0,"device":"yunhorn_kgl_c16_v1_1"}
#门磁
- {"adr":"0c255d17",location_type":"","location_code":"spao","location_data":"data12","current_data":0,"device":"yunhorn_kgl_c16_v1_1"}
#AQ
- {"adr":"dc390000",location_type":"","location_code":"spao","location_data":"data12","current_data":0,"device":"yunhorn_kgl_c16_v1_1"}
devices_list:
# num0 设备ID号
- ["4E5D52DD-EB81-43C0-A2B6-04E432412EBA", "loraport", "yunhorn_kgl_l_v1", "1"]
@ -95,7 +104,7 @@ loradevices_list:
- [85, 2, 242, 170]
iconurl: "http://192.168.3.188/icons/"
websocketurl: ["192.168.3.216:8080", "/echo"]
post_to_server: "https://smartoilets.cn/socketServer/statis/push" #数据收集服务器
post_to_server: "http://192.168.3.154:8082/statis/push" #"https://smartoilets.cn/socketServer/statis/push" #数据收集服务器
debug: true #是否打印debug log
openserial: false #是否采集串口数据
open_tcp_server: true #是否开启TCP端口接收无线数据
@ -106,8 +115,11 @@ api_port: 10086 #本地web api port
pro_category: 1
syn_data: false
syn_extension_Time: false
record_log_path: "/usr/local/dpark" #/usr/local/dpark #无线原始数据收集的log目录
record_log_path: "" #/usr/local/dpark #无线原始数据收集的log目录
mqtt_open: false
mqtt_broker: "tcp://localhost:1883"
mqtt_username: ""
mqtt_password: ""
mqtt_keepAlive: 30 #mqtt心跳时间间隔 单位是s
mqtt_timeout: 10 #mqtt超时时间 单位s
mqtt_topic: "yunhorn/dpark/1/#"
mqtt_broker: "tcp://192.168.3.159:1883"
mqtt_username: "admin"
mqtt_password: "admin"

View File

@ -107,15 +107,22 @@ func build_data(code int, data []Device) []byte {
// post到服务器
func post_to_server(post_data string) {
url := conf.POST_TO_SERVER
//完善 如果没有http前缀则自动加上
payload := strings.NewReader("------WebKitFormBoundary7MA4YWxkTrZu0gW\r\nContent-Disposition: form-data; name=\"data\"\r\n\r\n" + post_data + "\r\n------WebKitFormBoundary7MA4YWxkTrZu0gW--")
req, _ := http.NewRequest("POST", url, payload)
req.Header.Add("content-type", "multipart/form-data; boundary=----WebKitFormBoundary7MA4YWxkTrZu0gW")
res, err := http.DefaultClient.Do(req)
if err != nil {
log.Println(err)
} else {
defer res.Body.Close()
body, _ := ioutil.ReadAll(res.Body)
log.Println("post_to_server:", string(body))
req, error := http.NewRequest("POST", url, payload)
if error != nil{
log.Println("new request err:",error)
}else{
req.Header.Add("content-type", "multipart/form-data; boundary=----WebKitFormBoundary7MA4YWxkTrZu0gW")
res, err := http.DefaultClient.Do(req)
if err != nil {
log.Println("do post err:",err)
} else {
// log.Println("end close resp")
defer res.Body.Close()
body, _ := ioutil.ReadAll(res.Body)
log.Println("post_to_server:", string(body))
}
}
}

1
go.mod
View File

@ -3,6 +3,7 @@ module yunhorn_gateway
go 1.12
require (
github.com/eclipse/paho.mqtt.golang v1.2.0 // indirect
github.com/gin-contrib/cors v1.3.0
github.com/gin-gonic/gin v1.4.0
github.com/mattn/go-sqlite3 v1.11.0

2
go.sum
View File

@ -6,6 +6,8 @@ github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSs
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/docker/spdystream v0.0.0-20160310174837-449fdfce4d96/go.mod h1:Qh8CwZgvJUkLughtfhJv5dyTYa91l1fOUCrgjqmcifM=
github.com/eclipse/paho.mqtt.golang v1.2.0 h1:1F8mhG9+aO5/xpdtFkW4SxOJB67ukuDC3t2y2qayIX0=
github.com/eclipse/paho.mqtt.golang v1.2.0/go.mod h1:H9keYFcgq3Qr5OUJm/JZI/i6U7joQ8SYLhZwfeOo6Ts=
github.com/elazarl/goproxy v0.0.0-20170405201442-c4fc26588b6e/go.mod h1:/Zj4wYkgs4iZTTu3o/KG3Itv/qCCa8VVMlb3i9OVuzc=
github.com/emicklei/go-restful v0.0.0-20170410110728-ff4f55a20633/go.mod h1:otzb+WCGbkyDHkqmQmT5YD2WR4BBwUdeQoFo8l/7tVs=
github.com/evanphx/json-patch v4.2.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk=

View File

@ -55,6 +55,9 @@ type Conf struct {
MQTT_BROKER string `json:"mqtt_broker"`
MQTT_USERNAME string `json:"mqtt_username"`
MQTT_PASSWORD string `json:"mqtt_password"`
MQTT_KEEPALIVE int `json:"mqtt_keepAlive"`
MQTT_TOPIC string `json:"mqtt_topic"`
MQTT_TIMEOUT int `json:"mqtt_timeout"`
}
//无线 下位机

View File

@ -119,8 +119,6 @@ func main() {
if conf.OPENTCPSETVER {
//开启mqtt默认认定是无线方案
go record_log_channel()
tcp_port := strconv.Itoa(conf.TCPPORT)

76
mq.go
View File

@ -1,6 +1,82 @@
package main
import(
"log"
"os"
"time"
"strconv"
"github.com/eclipse/paho.mqtt.golang"
"math/rand"
"strings"
"encoding/json"
)
type CommandInfo struct {
Command int `json:"command"`
Addr string `json:"addr"`
}
/**
* mqtt接收到msg后的回调函数
**/
func onMessageReceived(client mqtt.Client, msg mqtt.Message){
topic := strings.Split(msg.Topic(),"/")
if topic[len(topic)-1] == "rt"{
//重启sensor或板子 rt 重启命令message
var commandInfo CommandInfo
err := json.Unmarshal([]byte(msg.Payload()), &commandInfo)
if err != nil {
log.Println("Can't decode json message", err)
}else{
log.Println("get command:",commandInfo)
if commandInfo.Addr =="" && (commandInfo.Command !=1 && commandInfo.Command !=2) {
log.Println("received.fail.invalid command or addr")
}else{
sendCommand(commandInfo.Addr,commandInfo.Command)
}
}
}
// log.Println(topic[len(topic)-1])
log.Printf("mqtt.received:%s|%s\n",msg.Topic(),msg.Payload())
}
var mqttClient mqtt.Client
func doSubscribe(){
//subscribe topic
if token := mqttClient.Subscribe(conf.MQTT_TOPIC, 1, onMessageReceived); token.Wait() && token.Error() != nil {
log.Println(token.Error())
os.Exit(1)
}
}
func init() {
if !conf.MQTT_OPEN {
return;
}
//var data []byte = []byte{0x01,0x01,0x01,0x00}
mqtt.ERROR = log.New(os.Stdout, "", 0)
//connect mqtt-server and set clientID
b := strconv.Itoa(rand.Intn(100))
clientId := "unit"+b
opts := mqtt.NewClientOptions().AddBroker(conf.MQTT_BROKER).SetClientID(clientId)
opts.SetUsername(conf.MQTT_USERNAME)
opts.SetPassword(conf.MQTT_PASSWORD)
opts.SetKeepAlive(time.Duration(conf.MQTT_KEEPALIVE) * time.Second)
opts.SetDefaultPublishHandler(onMessageReceived)
opts.SetPingTimeout(time.Duration(conf.MQTT_TIMEOUT) * time.Second)
opts.OnConnect = func(c mqtt.Client) {
log.Println("OnConnect success!")
doSubscribe()
}
//create object
mqttClient = mqtt.NewClient(opts)
if token := mqttClient.Connect(); token.Wait() && token.Error() != nil {
panic(token.Error())
}
doSubscribe()
}

View File

@ -38,7 +38,7 @@ func gw_router() {
v1.GET("/getConf")
v1.GET("/sendCommand",sendCommand)
v1.GET("/writeCommand",writeCommand)
}
@ -54,7 +54,7 @@ type DparkConf struct {
ExtensionOutTime2 int `json:"out2"`
}
func sendCommand(c *gin.Context) {
func writeCommand(c *gin.Context) {
// var heartbeat []byte = []byte{0x01}
addr := c.Query("addr")
command := c.Query("command")
@ -446,10 +446,11 @@ func dealDatav3(data []byte,loragwip string) string{
}
buffer.WriteString(wdint2)
log.Println("aqdata|wdfh",data[6])
// log.Println("aqdata|wdfh",data[6])
wddata := buffer.String()
log.Println("aqdata|wddata",wddata)
// log.Println("aqdata|wddata",wddata)
buffer = new(bytes.Buffer)
@ -463,7 +464,7 @@ func dealDatav3(data []byte,loragwip string) string{
buffer.WriteString(sdint2)
sddata := buffer.String()
log.Println("aqdata|sddata",sddata)
// log.Println("aqdata|sddata",sddata)
buffer = new(bytes.Buffer)
@ -477,7 +478,7 @@ func dealDatav3(data []byte,loragwip string) string{
buffer.WriteString(nh3int2)
nh3data := buffer.String()
log.Println("aqdata|nh3",nh3data)
// log.Println("aqdata|nh3",nh3data)
buffer = new(bytes.Buffer)
@ -490,7 +491,7 @@ func dealDatav3(data []byte,loragwip string) string{
}
buffer.WriteString(h2sint2)
h2sdata := buffer.String()
log.Println("aqdata|h2s",h2sdata)
// log.Println("aqdata|h2s",h2sdata)
buffer = new(bytes.Buffer)
ch2o1 := strconv.Itoa(int(data[15]))
@ -504,23 +505,27 @@ func dealDatav3(data []byte,loragwip string) string{
ch2o := buffer.String()
// ch2o := uint32(data[14])*256+uint32(data[15])
log.Println("aqdata|ch2o",ch2o)
// log.Println("aqdata|ch2o",ch2o)
co2data := uint32(data[17])*256+uint32(data[18])
log.Println("aqdata|co2data",co2data)
// log.Println("aqdata|co2data",co2data)
tvocdata := data[19]
log.Println("aqdata|tvocdata",tvocdata)
// log.Println("aqdata|tvocdata",tvocdata)
// log.Println("pm25data1",data[19])
// log.Println("pm25data2",data[20])
pm25data := uint32(data[20])*256+uint32(data[21])
log.Println("aqdata|pm25data",pm25data)
// log.Println("aqdata|pm25data",pm25data)
// log.Println("pm10data1",data[21])
// log.Println("pm10data2",data[22])
pm10data := uint32(data[22])*256+uint32(data[23])
log.Println("aqdata|pm10data",pm10data)
// log.Println("aqdata|pm10data",pm10data)
//wdfh
log.Printf("aqdata|wddata|sddata|nh3data|h2sdata|ch2o|co2data|tvocdata|pm25data|pm10data")
log.Printf("aqdata|%s|%s|%s|%s|%s|%d|%d|%d|%d",wddata,sddata,nh3data,h2sdata,ch2o,co2data,tvocdata,pm25data,pm10data)
var R_data P1004_6
R_data.CODE = 1005

View File

@ -15,6 +15,54 @@ import (
)
/**
* 从loragw的组中获取到范围内的所有node给符合addr的node发送command
**/
func sendCommand(addr string,command int){
log.Println("begin sendCommand")
log.Println("yunhorn.loragw.sendCommand|",addr,command)
commands := []byte{0x01}
byteAddr,ok := sensorMap.Load(addr)
if !ok{
log.Println("send command fail,invalid addr|",addr)
return;
}
loraNode,ok := loraNodeMap.Load(addr)
if !ok{
log.Println("send command fail,invalid addr|",addr)
return;
}
if command == 1{
commands = []byte{0x01}
}else if command == 2{
commands = []byte{0x02}
}else{
log.Println("send command fail,invalid command|",command)
return;
}
// strconv.Itoa
var buffer bytes.Buffer
buffer.Write(byteAddr.([]byte))
buffer.Write(commands)
// log.Println("loragws ",loraNode.(LoraNode).LORA_GWS)
for _,loragwip := range loraNode.(LoraNode).LORA_GWS{
loraconn,ok := loragwMap.Load(loragwip)
if ok{
log.Println("yunhorn.loragw.begin push tcp command ",buffer.Bytes())
_, err := loraconn.(net.Conn).Write(buffer.Bytes())
if err != nil {
log.Println("yunhorn.loragw.write msg error!", err)
return;
}
}
}
}
func handleConn(conn net.Conn) {
@ -239,7 +287,9 @@ func readConn(conn net.Conn, readChan chan<- []byte, stopChan chan<- bool) {
buffer = new(bytes.Buffer)
go write_log_channel(data[i-4:i+end])
if conf.RECORD_LOG_PATH!=""{
go write_log_channel(data[i-4:i+end])
}
go dealDatav3(data[i-4:i+end],conn.RemoteAddr().String())