YHGW/worker.go

732 lines
17 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package main
import (
"encoding/binary"
"github.com/tarm/serial"
"log"
"os"
_ "math/rand"
"strconv"
"sync"
"encoding/json"
"time"
"bytes"
)
type Devicedatas struct {
sync.RWMutex
DATA map[string]Device
}
type Device struct {
sync.RWMutex
ID string `json:"id"`
VERSION string `json:"version"`
ONLINE bool `json:"online"`
DATA map[string]interface{} `json:"data"`
}
//处理通过tcp发送过来的数据
//广州板子版
func yunhorn_wireless(data_buf []byte) {
//至少5个字节数据
log.Println("wireless.tcp.data.len:", len(data_buf))
if len(data_buf) > 4 {
adrh := data_buf[0]
adrl := data_buf[1]
if (adrh == 0 && adrl == 0) || (adrh == 0 && adrl == 1) {
go yunhorn_ys_l_v1(data_buf)
} else {
dataType := data_buf[2]
data_int := Bytes2Bits(data_buf[3:4])
data := data_int[0]
//第3个字节表示本数据帧的类型00表示NPN数据类型01表示磁开关数据类型02表示485传感器上报数据类型03表示485传感器数据采集类型
if dataType != 3 {
for _, item := range conf.WIRELESSSLAVEDEVICES {
//数据是否需要反转,检测到有信号是1的情况
// log.Println("adrh:%d,adrl:%d,conf.adrh:%d,conf.adrl:%d",int(adrh),int(adrl),item.ADRH,item.ADRL)
if int(adrh) != item.ADRH || int(adrl) != item.ADRL {
continue
}
if item.FLIP {
if data == 0 {
data = 1
} else {
data = 0
}
}
var device Device
//TODO 配置device
device.ID = item.ID
device.VERSION = item.VERSION
device.ONLINE = true
device.DATA = make(map[string]interface{})
device.DATA["data"] = data
send_data(CODE1005, device)
}
}
}
}
}
func yunhorn_xdy_l_v1(data_buf []byte) {
}
//wirte data to log channel
func write_log_channel(data []byte){
logch <- data
}
//get data from log channel and write file
func record_log_channel(){
for {
data_buf := <- logch
addrs := data_buf[:4]
buffer := new(bytes.Buffer)
for i:=0;i<len(addrs);i++{
s := strconv.FormatInt(int64(addrs[i]&0xff), 16)
buffer.WriteString(s)
}
adr := buffer.String()
// log.Println("addrs and data",adr,data_buf)
now := time.Now()
oldTime,ok := nodeUpdateMap.Load(adr);
if(ok){
//have data
var duration = now.Sub(oldTime.(time.Time)).Seconds()
var durationAsInt64 = int(duration)
if(durationAsInt64>1){
record_log(data_buf)
}
//same data
}else{
//not data
// log.Println("save adr:",adr)
nodeUpdateMap.Store(adr,now)
record_log(data_buf)
}
}
}
//打印原始数据进log
func record_log(data_buf []byte){
time := today
folder := conf.RECORD_LOG_PATH+"/"+time
err2 :=os.MkdirAll(folder,os.ModePerm)
if err2!=nil{
log.Println(err2)
}
f, err := os.OpenFile(folder+"/"+time+".log", os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666)
if err != nil{
log.Fatalln("HHHHH")
}
record := log.New(f,"",log.Llongfile)
//debugLog.SetPrefix("[Debug]")
record.SetFlags(log.Ldate | log.Ltime | log.Lshortfile)
record.Println(data_buf)
}
func yunhorn_lora_l_v1(data_buf []byte) {
// for _, i := range conf.RS485DEVICES {
// log.Println("################################",i)
// }
//log.Println(data_buf)
// for i, k := range b_buf {
// log.Println(binary.BigEndian.Uint16(append([]byte{0x00}, k))
// }
log.Println("ys_l_v1_data:", data_buf)
data_int := Bytes2Bits(data_buf[3:4])
data_int0 := Bytes2Bits(data_buf[0:1])
data_int1 := Bytes2Bits(data_buf[1:2])
data_int2 := Bytes2Bits(data_buf[2:3])
log.Println("data_int:", data_int[0])
log.Printf("ADRH:%d,ADRL:%d,dataType:%d,", data_int0[0], data_int1[0], data_int2[0])
var device Device
//TODO 配置device
device.ID = demoDevice.ID
device.VERSION = demoDevice.VERSION
device.ONLINE = demoDevice.ONLINE
demoDevice.DATA["data"] = data_int[0]
device.DATA = demoDevice.DATA
if conf.DEBUG {
log.Println(device)
}
}
//发送心跳包
func yunhorn_hb_dpark_v1(){
log.Println("begin yunhorn_hb_dpark_v1")
// var datalist [len(conf.LORANODES)]LoraNode
datalist := make([]LoraNode,len(conf.LORANODES))
var R_data P1004_6
//心跳数据
R_data.CODE = CODE1009
//spao点的公厕
R_data.ID = conf.DPARKLOCATION
//尿布台
R_data.VERSION = "yunhorn_hb_dpark"
R_data.ONLINE = true
R_data.DATA = make(map[string]interface{})
R_data.DATA["data"]=datalist
for i,v := range conf.LORANODES {
loraNode,ok :=loraNodeMap.Load(v.ADR)
if !ok {
log.Println("hb load loraNode fail ", ok)
continue;
}
newLoraNode := LoraNode{}
newLoraNode.CURRENT_DATA = loraNode.(LoraNode).CURRENT_DATA
newLoraNode.TIME = loraNode.(LoraNode).TIME
newLoraNode.LOCATION_TYPE = ""
newLoraNode.LOCATION_CODE = ""
newLoraNode.LOCATION_DATA = loraNode.(LoraNode).LOCATION_DATA
newLoraNode.ADR = loraNode.(LoraNode).ADR
datalist[i]=newLoraNode
}
R_data.TIMESTAMP=time.Now().Unix()
data_bufTmp, _ := json.Marshal(R_data)
data_buf := string(data_bufTmp)
go post_to_server(data_buf)
}
//用于演示,展示在大屏上一个厕位
func yunhorn_ys_l_v1(data_buf []byte) {
// for _, i := range conf.RS485DEVICES {
// log.Println("################################",i)
// }
//log.Println(data_buf)
// for i, k := range b_buf {
// log.Println(binary.BigEndian.Uint16(append([]byte{0x00}, k))
// }
log.Println("ys_l_v1_data:", data_buf)
data_int := Bytes2Bits(data_buf[3:4])
data_int0 := Bytes2Bits(data_buf[0:1])
data_int1 := Bytes2Bits(data_buf[1:2])
data_type := Bytes2Bits(data_buf[2:3])
log.Println("data_int:", data_int)
log.Printf("ADRH:%d,ADRL:%d,dataType:%d,", data_int0[0], data_int1[0], data_type[0])
var device Device
device.ID = demoDevice.ID
device.VERSION = demoDevice.VERSION
device.ONLINE = demoDevice.ONLINE
demoDevice.DATA["data"+strconv.Itoa(1)] = 0
demoDevice.DATA["data"+strconv.Itoa(2)] = 0
demoDevice.DATA["data"+strconv.Itoa(3)] = 0
if data_int0[0] == 0 && data_int1[0] == 0 && data_type[0] == 0 {
demoDevice.DATA["data"+strconv.Itoa(8)] = data_int[0]
var device Device
device.Lock()
device.ID = demoDevice.ID
device.VERSION = demoDevice.VERSION
device.ONLINE = demoDevice.ONLINE
device.DATA = make(map[string]interface{})
device.DATA["data8"] = data_int[0]
device.Unlock()
DEVICEDATAS.DATA[demoDevice.VERSION+"_1"].DATA["data8"] = data_int[0]
} else {
if data_int[0] == 0{
demoDevice.DATA["data"+strconv.Itoa(7)] = 1
}else{
demoDevice.DATA["data"+strconv.Itoa(7)] = 0
}
var device Device
device.Lock()
device.ID = demoDevice.ID
device.VERSION = demoDevice.VERSION
device.ONLINE = demoDevice.ONLINE
device.DATA = make(map[string]interface{})
if data_int[0] == 0{
device.DATA["data7"] = 1
}else{
device.DATA["data7"] = 0
}
//device.DATA["data7"] = data_int[0]
device.Unlock()
DEVICEDATAS.DATA[demoDevice.VERSION+"_1"].DATA["data7"] = data_int[0]
// DEVICEDATAS.DATA[demoDevice.VERSION+"_1"].DATA["data7"] = device.DATA["data7"]
}
device.DATA = demoDevice.DATA
if conf.DEBUG {
log.Println(device)
}
send_data(CODE1005, device)
// var device2 Device
// device2.ID = "BDA9D650-5E4C-41A0-A78F-DA8A8D488E45"
// device2.VERSION = "yunhorn_kgl_l_v1"
// device2.ONLINE = demoDevice.ONLINE
// device2.DATA = make(map[string]interface{})
// device2.DATA["data"] = data_int[0]
// send_data(1005, device2)
}
func yunhorn_lorawan_l_v1(s *serial.Port) {
for {
buf := make([]byte, 128)
b_buf := make([]byte, 0)
nn := 0
for {
n, err := s.Read(buf)
nn = n + nn
b_buf = append(b_buf, buf[:n]...)
if err != nil {
break
}
}
switch nn {
case 4:
go yunhorn_kgl_l_v1(b_buf[:nn])
}
}
}
//空气质量数据 有线
func yunhorn_kgl_l_v1(data_buf []byte) {
log.Println(data_buf)
var device Device
device.Lock()
device.ID = "4E5D52DD-EB81-43C0-A2B6-04E432412EBA"
device.VERSION = "yunhorn_kgl_l_v1"
device.ONLINE = true
device.DATA = make(map[string]interface{})
device.DATA["data1"] = 0
device.Unlock()
DEVICEDATAS.Lock()
DEVICEDATAS.DATA["yunhorn_kgl_l_v1_1"] = device
DEVICEDATAS.Unlock()
if conf.DEBUG {
log.Println(device)
}
send_data(CODE1005, device)
}
// 采集并解析水表数据
func yunhorn_sb_c_v1(s *serial.Port, device_id string, command [][]byte, num string) {
_, err = s.Write([]byte(command[0]))
check(err)
if conf.DEBUG {
log.Printf("命令:\t%x", []byte(command[0]))
}
buf := make([]byte, 128)
b_buf := make([]byte, 0)
var nn int
for {
n, err := s.Read(buf)
nn = n + nn
b_buf = append(b_buf, buf[:n]...)
if err != nil {
break
}
}
if nn != 39 {
return
}
data_buf := b_buf[17 : nn-20]
var device Device
device.Lock()
device.ID = device_id
device.VERSION = "yunhorn_sb_c_v1"
device.ONLINE = true
device.DATA = make(map[string]interface{})
device.DATA["totalWater"] = binary.BigEndian.Uint16(data_buf)
device.Unlock()
DEVICEDATAS.Lock()
DEVICEDATAS.DATA["yunhorn_sb_c_v1_"+num] = device
DEVICEDATAS.Unlock()
if conf.DEBUG {
log.Println(device)
}
}
// 采集并解析洗手液数据
func yunhorn_xsy_l_v1(s *serial.Port, device_id string, command [][]byte, num string) {
_, err = s.Write([]byte(command[0]))
check(err)
if conf.DEBUG {
log.Printf("命令:\t%x", []byte(command[0]))
}
buf := make([]byte, 128)
b_buf := make([]byte, 0)
var nn int
for {
n, err := s.Read(buf)
nn = n + nn
b_buf = append(b_buf, buf[:n]...)
if err != nil {
break
}
}
if nn != 9 {
return
}
data_buf := b_buf[3 : nn-4]
DEVICEDATAS.RLock()
var olddata = DEVICEDATAS.DATA["yunhorn_xsy_l_v1_"+num]
DEVICEDATAS.RUnlock()
var device Device
device.Lock()
device.ID = device_id
device.VERSION = "yunhorn_xsy_l_v1"
device.ONLINE = true
device.DATA = make(map[string]interface{})
device.DATA["data"] = binary.BigEndian.Uint16(data_buf)
device.Unlock()
DEVICEDATAS.Lock()
DEVICEDATAS.DATA["yunhorn_xsy_l_v1_"+num] = device
DEVICEDATAS.Unlock()
if device.DATA["data"] != olddata.DATA["data"] {
send_data(CODE1005, device)
}
if conf.DEBUG {
log.Println(device)
}
}
// 采集并解析电表数据
func yunhorn_db_c_v1(s *serial.Port, device_id string, command [][]byte, num string) {
_, err = s.Write([]byte(command[0]))
check(err)
if conf.DEBUG {
log.Printf("命令:\t%x", []byte(command[0]))
}
buf := make([]byte, 128)
bb_buf := make([]byte, 0)
var nn int
for {
n, err := s.Read(buf)
nn = n + nn
bb_buf = append(bb_buf, buf[:n]...)
if err != nil {
break
}
}
if nn != 20 {
return
}
data_buf := bb_buf[nn-6 : nn-2]
for i, x := range data_buf {
data_buf[i] = x - 0x33
}
Len := len(data_buf) - 1
for i, k := 0, Len; i < k; i++ {
data_buf[i], data_buf[k] = data_buf[k], data_buf[i]
k--
}
U8 := make([]byte, 0)
for i := 0; i < 4; i++ {
U8 = append(U8, data_buf[i]>>4)
U8 = append(U8, data_buf[i]<<4>>4)
}
b_buf := make([]byte, 0)
var db_data uint32
b_buf = append([]byte{0, 0, 0}, U8[0])
db_data = binary.BigEndian.Uint32(b_buf) * 10000000
b_buf = append([]byte{0, 0, 0}, U8[1])
db_data = db_data + binary.BigEndian.Uint32(b_buf)*1000000
b_buf = append([]byte{0, 0, 0}, U8[2])
db_data = db_data + binary.BigEndian.Uint32(b_buf)*100000
b_buf = append([]byte{0, 0, 0}, U8[3])
db_data = db_data + binary.BigEndian.Uint32(b_buf)*10000
b_buf = append([]byte{0, 0, 0}, U8[4])
db_data = db_data + binary.BigEndian.Uint32(b_buf)*1000
b_buf = append([]byte{0, 0, 0}, U8[5])
db_data = db_data + binary.BigEndian.Uint32(b_buf)*100
b_buf = append([]byte{0, 0, 0}, U8[6])
db_data = db_data + binary.BigEndian.Uint32(b_buf)*10
b_buf = append([]byte{0, 0, 0}, U8[7])
db_data = db_data + binary.BigEndian.Uint32(b_buf)
var device Device
device.Lock()
device.ID = device_id
device.VERSION = "yunhorn_db_c_v1"
device.ONLINE = true
device.DATA = make(map[string]interface{})
device.DATA["totalElectric"] = float32(db_data) / 100
device.Unlock()
DEVICEDATAS.Lock()
DEVICEDATAS.DATA["yunhorn_db_c_v1_"+num] = device
DEVICEDATAS.Unlock()
if conf.DEBUG {
log.Println(device)
}
}
// 采集并解析空气数据
func yunhorn_kq_c_v1(s *serial.Port, device_id string, command [][]byte, num string) {
_, err = s.Write([]byte(command[0]))
check(err)
if conf.DEBUG {
log.Printf("命令:\t%x", []byte(command[0]))
}
buf := make([]byte, 128)
b_buf := make([]byte, 0)
var nn int
for {
n, err := s.Read(buf)
nn = n + nn
b_buf = append(b_buf, buf[:n]...)
if err != nil {
break
}
}
if nn != 19 {
return
}
data_buf := b_buf[3 : nn-2]
var device Device
device.Lock()
device.ID = device_id
device.VERSION = "yunhorn_kq_c_v1"
device.ONLINE = true
device.DATA = make(map[string]interface{})
device.DATA["二氧化碳"] = float32(binary.BigEndian.Uint16(data_buf[0:2]))
device.DATA["TVOC"] = float32(binary.BigEndian.Uint16(data_buf[2:4])) / 10
device.DATA["甲醛"] = float32(binary.BigEndian.Uint16(data_buf[4:6])) / 10
device.DATA["pm2.5"] = float32(binary.BigEndian.Uint16(data_buf[6:8]))
device.DATA["湿度"] = float32(binary.BigEndian.Uint16(data_buf[8:10])) / 10
device.DATA["温度"] = float32(binary.BigEndian.Uint16(data_buf[10:13])) / 10
device.DATA["PM10"] = float32(binary.BigEndian.Uint16(data_buf[12:14]))
device.Unlock()
DEVICEDATAS.Lock()
DEVICEDATAS.DATA["yunhorn_kq_c_v1_"+num] = device
DEVICEDATAS.Unlock()
if conf.DEBUG {
log.Println(device)
}
}
// 采集并解析氨气数据
func yunhorn_aq_c_v1(s *serial.Port, device_id string, command [][]byte, num string) {
_, err = s.Write([]byte(command[0]))
check(err)
if conf.DEBUG {
log.Printf("命令:\t%x", []byte(command[0]))
}
buf := make([]byte, 128)
b_buf := make([]byte, 0)
var nn int
for {
n, err := s.Read(buf)
nn = n + nn
b_buf = append(b_buf, buf[:n]...)
if err != nil {
break
}
}
if nn != 7 {
return
}
data_buf := b_buf[3 : nn-2]
var device Device
device.Lock()
device.ID = device_id
device.VERSION = "yunhorn_aq_c_v1"
device.ONLINE = true
device.DATA = make(map[string]interface{})
device.DATA["氨气"] = binary.BigEndian.Uint16(data_buf)
device.Unlock()
DEVICEDATAS.Lock()
DEVICEDATAS.DATA["yunhorn_aq_c_v1_"+num] = device
DEVICEDATAS.Unlock()
if conf.DEBUG {
log.Println(device)
}
}
// 采集并解析硫化氢数据
func yunhorn_lhq_c_v1(s *serial.Port, device_id string, command [][]byte, num string) {
_, err = s.Write([]byte(command[0]))
check(err)
if conf.DEBUG {
log.Printf("命令:\t%x", []byte(command[0]))
}
buf := make([]byte, 128)
b_buf := make([]byte, 0)
var nn int
for {
n, err := s.Read(buf)
nn = n + nn
b_buf = append(b_buf, buf[:n]...)
if err != nil {
break
}
}
if nn != 7 {
return
}
data_buf := b_buf[3 : nn-2]
var device Device
device.Lock()
device.ID = device_id
device.VERSION = "yunhorn_lhq_c_v1"
device.ONLINE = true
device.DATA = make(map[string]interface{})
device.DATA["硫化氢"] = binary.BigEndian.Uint16(data_buf)
device.Unlock()
DEVICEDATAS.Lock()
DEVICEDATAS.DATA["yunhorn_lhq_c_v1_"+num] = device
DEVICEDATAS.Unlock()
if conf.DEBUG {
log.Println(device)
}
}
// 采集并解析开关量数据
func yunhorn_kgl_c8_v1(s *serial.Port, device_id string, command [][]byte, num string) {
buf := make([]byte, 128)
b_buf := make([]byte, 0)
for _, i := range command {
_, err = s.Write([]byte(i))
check(err)
if conf.DEBUG {
log.Printf("命令:\t%x", []byte(i))
}
_, err = s.Read(buf)
if err != nil {
return
}
b_buf = append(b_buf, buf[3])
}
DEVICEDATAS.RLock()
var olddata = DEVICEDATAS.DATA["yunhorn_kgl_c8_v1_"+num]
DEVICEDATAS.RUnlock()
var device Device
device.Lock()
device.ID = device_id
device.VERSION = "yunhorn_kgl_c8_v1"
device.ONLINE = true
device.DATA = make(map[string]interface{})
for i, k := range b_buf {
device.DATA["data"+strconv.Itoa(i+1)] = binary.BigEndian.Uint16(append([]byte{0x00}, k))
}
device.Unlock()
DEVICEDATAS.Lock()
DEVICEDATAS.DATA["yunhorn_kgl_c8_v1_"+num] = device
DEVICEDATAS.Unlock()
for i, k := range device.DATA {
if k != olddata.DATA[i] {
send_data(CODE1005, device)
break
}
}
if conf.DEBUG {
log.Println(device)
}
}
// 采集并解析开关量数据
func yunhorn_kgl_c16_v1(s *serial.Port, device_id string, command [][]byte, num string) {
buf := make([]byte, 128)
_, err = s.Write([]byte(command[0]))
check(err)
if conf.DEBUG {
log.Printf("命令:\t%x", []byte(command[0]))
}
_, err = s.Read(buf)
if err != nil {
return
}
b_buf := Bytes2Bits(buf[3:5])
DEVICEDATAS.RLock()
var olddata = DEVICEDATAS.DATA["yunhorn_kgl_c16_v1_"+num]
DEVICEDATAS.RUnlock()
var device Device
device.Lock()
device.ID = device_id
device.VERSION = "yunhorn_kgl_c16_v1"
device.ONLINE = true
device.DATA = make(map[string]interface{})
for i, k := range b_buf {
device.DATA["data"+strconv.Itoa(i+1)] = k
}
device.Unlock()
DEVICEDATAS.Lock()
DEVICEDATAS.DATA["yunhorn_kgl_c16_v1_"+num] = device
DEVICEDATAS.Unlock()
for i, k := range device.DATA {
if k != olddata.DATA[i] {
send_data(CODE1005, device)
break
}
}
if conf.DEBUG {
log.Println(device)
}
}
// byte数据转换成uint16
func Bytes2Bits(data []byte) []uint16 {
dst := make([]uint16, 0)
for _, v := range data {
for i := 0; i < 8; i++ {
move := uint(i)
dst = append(dst, uint16((v>>move)&1))
}
}
return dst
}