1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
|
#
# Serial communication
#
# Copyright (c) 2013 Michael Buesch <m@bues.ch>
# Licensed under the terms of the GNU General Public License version 2.
#
import sys
import struct
import serial
import time
class SerialError(Exception):
pass
class SerialMessage(object):
SER_PAYLOAD_LEN = 8
SER_HDR_LEN = 4
SER_FCS_LEN = 2
MSG_SIZE = SER_HDR_LEN + SER_PAYLOAD_LEN + SER_FCS_LEN
@classmethod
def crc16Update(cls, crc, data):
crc ^= data
for i in range(8):
if crc & 1:
crc = (crc >> 1) ^ 0xA001
else:
crc = crc >> 1
return crc
@classmethod
def crc16(cls, data):
crc = 0xFFFF
for d in data:
crc = cls.crc16Update(crc, d)
return crc ^ 0xFFFF
def __init__(self, fc=0, payload=b''):
self.fc = fc
self.seq = 0
self.sa = 0
self.da = 0
pl = list(payload)
pl.extend( [ 0 ] * (SerialMessage.SER_PAYLOAD_LEN - len(pl)) )
self.payload = bytes(pl)
self.fcs = 0
def __calcFcs(self):
return self.crc16(self.__getBytes()[0:-2])
def __getBytes(self):
return struct.pack("14B", self.fc, self.seq,
(self.sa & 0xF) | ((self.da & 0xF) << 4),
0,
self.payload[0], self.payload[1],
self.payload[2], self.payload[3],
self.payload[4], self.payload[5],
self.payload[6], self.payload[7],
self.fcs & 0xFF, (self.fcs >> 8) & 0xFF)
def getBytes(self):
self.fcs = self.__calcFcs()
return self.__getBytes()
def __setBytes(self, data):
if len(data) != self.MSG_SIZE:
raise SerialError("Msg: Invalid number of bytes")
fields = struct.unpack("14B", data)
self.fc = fields[0]
self.seq = fields[1]
self.sa = fields[2] & 0xF
self.da = (fields[2] >> 4) & 0xF
self.payload = bytes(fields[4:12])
self.fcs = fields[12] | (fields[13] << 8)
def setBytes(self, data):
self.__setBytes(data)
if self.__calcFcs() != self.fcs:
raise SerialError("Msg: FCS mismatch")
def __repr__(self):
ret = "SerialMessage: "
for b in self.getBytes():
ret += "%02X" % b
return ret
class SerialComm(object):
def __init__(self, device, baudrate=9600, nrbits=8,
parity=serial.PARITY_NONE, nrstop=1,
localAddress=1):
self.sendDelay = 0
self.serial = serial.Serial(device, baudrate, nrbits,
parity, nrstop)
self.localAddress = localAddress
self.seq = 0
def close(self):
self.serial.close()
def setSendDelay(self, seconds):
self.sendDelay = seconds
def receive(self):
while 1:
b = self.serial.read(SerialMessage.MSG_SIZE)
msg = SerialMessage()
msg.setBytes(b)
if msg.da == self.localAddress:
return msg
def send(self, msg, destinationAddress=0):
msg.da = destinationAddress & 0xF
msg.seq = self.seq
self.seq += 1
data = msg.getBytes()
if self.sendDelay:
for b in data:
self.serial.write(bytes((b,)))
self.serial.flush()
time.sleep(self.sendDelay)
else:
self.serial.write(data)
self.serial.flush()
|