1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
|
# -*- coding: utf-8 -*-
#
# PROFIBUS DP - Communication Processor PHY access library
#
# Copyright (c) 2016-2021 Michael Buesch <m@bues.ch>
#
# Licensed under the terms of the GNU General Public License version 2,
# or (at your option) any later version.
#
from __future__ import division, absolute_import, print_function, unicode_literals
from pyprofibus.compat import *
from pyprofibus.phy import *
from pyprofibus.fdl import *
from pyprofibus.dp import *
from pyprofibus.util import *
__all__ = [
"CpPhyDummySlave",
]
class CpPhyDummySlave(CpPhy):
"""Dummy slave PROFIBUS CP PHYsical layer
"""
__slots__ = (
"__echoDX",
"__echoDXSize",
"__pollQueue",
)
def __init__(self, *args, **kwargs):
super(CpPhyDummySlave, self).__init__(*args, **kwargs)
self.__echoDX = kwargs.get("echoDX", True)
self.__echoDXSize = kwargs.get("echoDXSize", None)
self.__pollQueue = []
def __msg(self, message):
if self.debug:
print("CpPhyDummySlave: %s" % message)
def close(self):
"""Close the PHY device.
"""
self.__pollQueue = []
super(CpPhyDummySlave, self).close()
def sendData(self, telegramData, srd):
"""Send data to the physical line.
"""
telegramData = bytearray(telegramData)
self.__msg("Sending %s %s" % ("SRD" if srd else "SDN",
bytesToHex(telegramData)))
self.__mockSend(telegramData, srd = srd)
def pollData(self, timeout=0.0):
"""Poll received data from the physical line.
timeout => timeout in seconds.
0.0 = no timeout, return immediately.
negative = unlimited.
"""
try:
telegramData = self.__pollQueue.pop(0)
except IndexError as e:
return None
self.__msg("Receiving %s" % bytesToHex(telegramData))
return telegramData
def setConfig(self, baudrate=CpPhy.BAUD_9600, *args, **kwargs):
self.__msg("Baudrate = %d" % baudrate)
self.__pollQueue = []
super(CpPhyDummySlave, self).setConfig(baudrate=baudrate, *args, **kwargs)
def __mockSend(self, telegramData, srd):
if not srd:
return
try:
fdl = FdlTelegram.fromRawData(telegramData)
if (fdl.fc & FdlTelegram.FC_REQFUNC_MASK) == FdlTelegram.FC_FDL_STAT:
telegram = FdlTelegram_FdlStat_Con(da = fdl.sa,
sa = fdl.da)
self.__pollQueue.append(telegram.getRawData())
return
dp = DpTelegram.fromFdlTelegram(fdl, thisIsMaster = False)
if DpTelegram_SlaveDiag_Req.checkType(dp):
telegram = DpTelegram_SlaveDiag_Con(da = fdl.sa,
sa = fdl.da)
telegram.b1 |= DpTelegram_SlaveDiag_Con.B1_ONE
self.__pollQueue.append(telegram.toFdlTelegram().getRawData())
return
if DpTelegram_SetPrm_Req.checkType(dp):
telegram = FdlTelegram_ack()
self.__pollQueue.append(telegram.getRawData())
return
if DpTelegram_ChkCfg_Req.checkType(dp):
telegram = FdlTelegram_ack()
self.__pollQueue.append(telegram.getRawData())
return
if DpTelegram_DataExchange_Req.checkType(dp):
if self.__echoDX:
du = bytearray([ d ^ 0xFF for d in dp.du ])
if self.__echoDXSize is not None:
if len(du) > self.__echoDXSize:
du = du[ : self.__echoDXSize]
if len(du) < self.__echoDXSize:
du += bytearray(self.__echoDXSize - len(du))
telegram = DpTelegram_DataExchange_Con(da = fdl.sa,
sa = fdl.da,
du = du)
self.__pollQueue.append(telegram.toFdlTelegram().getRawData())
else:
telegram = FdlTelegram_ack()
self.__pollQueue.append(telegram.getRawData())
return
self.__msg("Dropping SRD telegram: %s" % str(fdl))
except ProfibusError as e:
text = "SRD mock-send error: %s" % str(e)
self.__msg(text)
raise PhyError(text)
|