hat.drivers.modbus
Modbus communication protocol
1"""Modbus communication protocol""" 2 3from hat.drivers.modbus.common import (ModbusType, 4 DataType, 5 Error, 6 apply_mask) 7from hat.drivers.modbus.master import (create_tcp_master, 8 create_serial_master, 9 Master) 10from hat.drivers.modbus.slave import (SlaveCb, 11 ReadCb, 12 WriteCb, 13 WriteMaskCb, 14 create_tcp_server, 15 create_serial_slave, 16 Slave) 17 18 19__all__ = ['ModbusType', 20 'DataType', 21 'Error', 22 'apply_mask', 23 'create_tcp_master', 24 'create_serial_master', 25 'Master', 26 'SlaveCb', 27 'ReadCb', 28 'WriteCb', 29 'WriteMaskCb', 30 'create_tcp_server', 31 'create_serial_slave', 32 'Slave']
class
ModbusType(enum.Enum):
An enumeration.
TCP =
<ModbusType.TCP: 0>
RTU =
<ModbusType.RTU: 1>
ASCII =
<ModbusType.ASCII: 2>
class
DataType(enum.Enum):
13class DataType(enum.Enum): 14 COIL = 1 15 DISCRETE_INPUT = 2 16 HOLDING_REGISTER = 3 17 INPUT_REGISTER = 4 18 QUEUE = 5
An enumeration.
COIL =
<DataType.COIL: 1>
DISCRETE_INPUT =
<DataType.DISCRETE_INPUT: 2>
HOLDING_REGISTER =
<DataType.HOLDING_REGISTER: 3>
INPUT_REGISTER =
<DataType.INPUT_REGISTER: 4>
QUEUE =
<DataType.QUEUE: 5>
class
Error(enum.Enum):
21class Error(enum.Enum): 22 INVALID_FUNCTION_CODE = 0x01 23 INVALID_DATA_ADDRESS = 0x02 24 INVALID_DATA_VALUE = 0x03 25 FUNCTION_ERROR = 0x04 26 GATEWAY_PATH_UNAVAILABLE = 0x0a 27 GATEWAY_TARGET_DEVICE_FAILED_TO_RESPOND = 0x0b
An enumeration.
INVALID_FUNCTION_CODE =
<Error.INVALID_FUNCTION_CODE: 1>
INVALID_DATA_ADDRESS =
<Error.INVALID_DATA_ADDRESS: 2>
INVALID_DATA_VALUE =
<Error.INVALID_DATA_VALUE: 3>
FUNCTION_ERROR =
<Error.FUNCTION_ERROR: 4>
GATEWAY_PATH_UNAVAILABLE =
<Error.GATEWAY_PATH_UNAVAILABLE: 10>
GATEWAY_TARGET_DEVICE_FAILED_TO_RESPOND =
<Error.GATEWAY_TARGET_DEVICE_FAILED_TO_RESPOND: 11>
def
apply_mask(value: int, and_mask: int, or_mask: int) -> int:
30def apply_mask(value: int, and_mask: int, or_mask: int) -> int: 31 """Apply mask to value""" 32 return (value & and_mask) | (or_mask & (~and_mask))
Apply mask to value
async def
create_tcp_master( modbus_type: ModbusType, addr: hat.drivers.tcp.Address, response_timeout: float | None = None, **kwargs) -> Master:
20async def create_tcp_master(modbus_type: common.ModbusType, 21 addr: tcp.Address, 22 response_timeout: float | None = None, 23 **kwargs 24 ) -> 'Master': 25 """Create TCP master 26 27 Args: 28 modbus_type: modbus type 29 addr: remote host address 30 response_timeout: response timeout in seconds 31 kwargs: additional arguments used for creating TCP connection 32 (see `tcp.connect`) 33 34 """ 35 conn = await transport.tcp_connect(addr, **kwargs) 36 return Master(conn, modbus_type, response_timeout)
Create TCP master
Arguments:
- modbus_type: modbus type
- addr: remote host address
- response_timeout: response timeout in seconds
- kwargs: additional arguments used for creating TCP connection
(see
tcp.connect
)
async def
create_serial_master( modbus_type: ModbusType, port: str, *, silent_interval: float = 0.005, response_timeout: float | None = None, **kwargs) -> Master:
39async def create_serial_master(modbus_type: common.ModbusType, 40 port: str, *, 41 silent_interval: float = 0.005, 42 response_timeout: float | None = None, 43 **kwargs 44 ) -> 'Master': 45 """Create serial master 46 47 Args: 48 modbus_type: modbus type 49 port: port name (see `serial.create`) 50 silent_interval: silent interval (see `serial.create`) 51 response_timeout: response timeout in seconds 52 kwargs: additional arguments used for opening serial connection 53 (see `serial.create`) 54 55 """ 56 conn = await transport.serial_create(port, 57 silent_interval=silent_interval, 58 **kwargs) 59 return Master(conn, modbus_type, response_timeout)
Create serial master
Arguments:
- modbus_type: modbus type
- port: port name (see
serial.create
) - silent_interval: silent interval (see
serial.create
) - response_timeout: response timeout in seconds
- kwargs: additional arguments used for opening serial connection
(see
serial.create
)
class
Master(hat.aio.group.Resource):
62class Master(aio.Resource): 63 """Modbus master""" 64 65 def __init__(self, 66 conn: transport.Connection, 67 modbus_type: common.ModbusType, 68 response_timeout: float | None): 69 self._conn = conn 70 self._modbus_type = modbus_type 71 self._response_timeout = response_timeout 72 self._send_queue = aio.Queue() 73 74 if modbus_type == common.ModbusType.TCP: 75 self._next_transaction_ids = iter(i % 0x10000 76 for i in itertools.count(1)) 77 78 self.async_group.spawn(self._send_loop) 79 80 @property 81 def async_group(self) -> aio.Group: 82 """Async group""" 83 return self._conn.async_group 84 85 @property 86 def log_prefix(self) -> str: 87 """Logging prefix""" 88 return self._conn.log_prefix 89 90 async def read(self, 91 device_id: int, 92 data_type: common.DataType, 93 start_address: int, 94 quantity: int = 1 95 ) -> list[int] | common.Error: 96 """Read data from modbus device 97 98 Argument `quantity` is ignored if `data_type` is `QUEUE`. 99 100 Args: 101 device_id: slave device identifier 102 data_type: data type 103 start_address: starting modbus data address 104 quantity: number of data values 105 106 Raises: 107 ConnectionError 108 TimeoutError 109 110 """ 111 if device_id == 0: 112 raise ValueError('unsupported device id') 113 114 if data_type == common.DataType.COIL: 115 req = transport.ReadCoilsReq(address=start_address, 116 quantity=quantity) 117 118 elif data_type == common.DataType.DISCRETE_INPUT: 119 req = transport.ReadDiscreteInputsReq(address=start_address, 120 quantity=quantity) 121 122 elif data_type == common.DataType.HOLDING_REGISTER: 123 req = transport.ReadHoldingRegistersReq(address=start_address, 124 quantity=quantity) 125 126 elif data_type == common.DataType.INPUT_REGISTER: 127 req = transport.ReadInputRegistersReq(address=start_address, 128 quantity=quantity) 129 130 elif data_type == common.DataType.QUEUE: 131 req = transport.ReadFifoQueueReq(address=start_address) 132 133 else: 134 raise ValueError('unsupported data type') 135 136 res = await self._send(device_id, req) 137 138 if isinstance(res, transport.ErrorRes): 139 return res.error 140 141 if isinstance(res, (transport.ReadCoilsRes, 142 transport.ReadDiscreteInputsRes, 143 transport.ReadHoldingRegistersRes, 144 transport.ReadInputRegistersRes)): 145 return res.values[:quantity] 146 147 if isinstance(res, transport.ReadFifoQueueRes): 148 return res.values 149 150 raise ValueError("unsupported response pdu") 151 152 async def write(self, 153 device_id: int, 154 data_type: common.DataType, 155 start_address: int, 156 values: typing.List[int] 157 ) -> common.Error | None: 158 """Write data to modbus device 159 160 Data types `DISCRETE_INPUT`, `INPUT_REGISTER` and `QUEUE` are not 161 supported. 162 163 Args: 164 device_id: slave device identifier 165 data_type: data type 166 start_address: starting modbus data address 167 values: values 168 169 Raises: 170 ConnectionError 171 TimeoutError 172 173 """ 174 if data_type == common.DataType.COIL: 175 if len(values) == 1: 176 req = transport.WriteSingleCoilReq(address=start_address, 177 value=values[0]) 178 179 else: 180 req = transport.WriteMultipleCoilsReq(address=start_address, 181 values=values) 182 183 elif data_type == common.DataType.HOLDING_REGISTER: 184 if len(values) == 1: 185 req = transport.WriteSingleRegisterReq(address=start_address, 186 value=values[0]) 187 188 else: 189 req = transport.WriteMultipleRegistersReq( 190 address=start_address, 191 values=values) 192 193 else: 194 raise ValueError('unsupported data type') 195 196 res = await self._send(device_id, req) 197 198 if isinstance(res, transport.ErrorRes): 199 return res.error 200 201 if not isinstance(res, (transport.WriteSingleCoilRes, 202 transport.WriteMultipleCoilsRes, 203 transport.WriteSingleRegisterRes, 204 transport.WriteMultipleRegistersRes)): 205 raise ValueError("unsupported response pdu") 206 207 if (res.address != start_address): 208 raise Exception("invalid response pdu address") 209 210 if isinstance(res, (transport.WriteSingleCoilRes, 211 transport.WriteSingleRegisterRes)): 212 if (res.value != values[0]): 213 raise Exception("invalid response pdu value") 214 215 if isinstance(res, (transport.WriteMultipleCoilsRes, 216 transport.WriteMultipleRegistersRes)): 217 if (res.quantity != len(values)): 218 raise Exception("invalid response pdu quantity") 219 220 async def write_mask(self, 221 device_id: int, 222 address: int, 223 and_mask: int, 224 or_mask: int 225 ) -> common.Error | None: 226 """Write mask to modbus device HOLDING_REGISTER 227 228 Args: 229 device_id: slave device identifier 230 address: modbus data address 231 and_mask: and mask 232 or_mask: or mask 233 234 Raises: 235 ConnectionError 236 TimeoutError 237 238 """ 239 req = transport.MaskWriteRegisterReq(address=address, 240 and_mask=and_mask, 241 or_mask=or_mask) 242 243 res = await self._send(device_id, req) 244 245 if isinstance(res, transport.ErrorRes): 246 return res.error 247 248 if not isinstance(res, transport.MaskWriteRegisterRes): 249 raise ValueError("unsupported response pdu") 250 251 if (res.address != address): 252 raise Exception("invalid response pdu address") 253 254 if (res.and_mask != and_mask): 255 raise Exception("invalid response pdu and mask") 256 257 if (res.or_mask != or_mask): 258 raise Exception("invalid response pdu or mask") 259 260 async def _send(self, device_id, req): 261 if self._modbus_type == common.ModbusType.TCP: 262 req_adu = transport.TcpAdu( 263 transaction_id=next(self._next_transaction_ids), 264 device_id=device_id, 265 pdu=req) 266 267 elif self._modbus_type == common.ModbusType.RTU: 268 req_adu = transport.RtuAdu(device_id=device_id, 269 pdu=req) 270 271 elif self._modbus_type == common.ModbusType.ASCII: 272 req_adu = transport.AsciiAdu(device_id=device_id, 273 pdu=req) 274 275 else: 276 raise ValueError("unsupported modbus type") 277 278 future = asyncio.Future() 279 try: 280 self._send_queue.put_nowait((req_adu, future)) 281 res_adu = await future 282 283 except aio.QueueClosedError: 284 raise ConnectionError() 285 286 return res_adu.pdu 287 288 async def _receive(self, req_adu): 289 while True: 290 res_adu = await self._conn.receive(self._modbus_type, 291 transport.Direction.RESPONSE) 292 293 if isinstance(res_adu, transport.TcpAdu): 294 if res_adu.transaction_id != req_adu.transaction_id: 295 self._log(logging.WARNING, 296 "discarding response adu: " 297 "invalid response transaction id") 298 continue 299 300 if res_adu.device_id != req_adu.device_id: 301 self._log(logging.WARNING, 302 "discarding response adu: " 303 "invalid response device id") 304 continue 305 306 req_fc = transport.get_pdu_function_code(req_adu.pdu) 307 res_fc = transport.get_pdu_function_code(res_adu.pdu) 308 if req_fc != res_fc: 309 self._log(logging.WARNING, 310 "discarding response adu: " 311 "invalid response function code") 312 continue 313 314 return res_adu 315 316 async def _send_loop(self): 317 self._log(logging.DEBUG, "starting master send loop") 318 future = None 319 try: 320 while self.is_open: 321 # req_adu, future = await self._send_queue.get() 322 323 async with self.async_group.create_subgroup() as subgroup: 324 subgroup.spawn(self._reset_input_buffer_loop) 325 self._log(logging.DEBUG, 326 "started discarding incomming data") 327 328 while not future or future.done(): 329 req_adu, future = await self._send_queue.get() 330 331 await self._reset_input_buffer() 332 self._log(logging.DEBUG, "stopped discarding incomming data") 333 334 await self._conn.send(req_adu) 335 await self._conn.drain() 336 337 async with self.async_group.create_subgroup( 338 log_exceptions=False) as subgroup: 339 receive_task = subgroup.spawn(self._receive, req_adu) 340 341 await asyncio.wait([receive_task, future], 342 timeout=self._response_timeout, 343 return_when=asyncio.FIRST_COMPLETED) 344 345 if future.done(): 346 continue 347 348 if receive_task.done(): 349 future.set_result(receive_task.result()) 350 351 else: 352 future.set_exception(TimeoutError()) 353 354 except ConnectionError: 355 pass 356 357 except Exception as e: 358 self._log(logging.ERROR, "error in send loop: %s", e, exc_info=e) 359 360 finally: 361 self._log(logging.DEBUG, "stopping master send loop") 362 self.close() 363 self._send_queue.close() 364 365 while True: 366 if future and not future.done(): 367 future.set_exception(ConnectionError()) 368 if self._send_queue.empty(): 369 break 370 _, future = self._send_queue.get_nowait() 371 372 async def _reset_input_buffer_loop(self): 373 try: 374 while True: 375 await self._reset_input_buffer() 376 377 await self._conn.read_byte() 378 self._log(logging.DEBUG, "discarded 1 byte from input buffer") 379 380 except ConnectionError: 381 self.close() 382 383 except Exception as e: 384 self._log(logging.ERROR, "error in reset input buffer loop: %s", e, 385 exc_info=e) 386 self.close() 387 388 async def _reset_input_buffer(self): 389 count = await self._conn.reset_input_buffer() 390 if not count: 391 return 392 self._log(logging.DEBUG, "discarded %s bytes from input buffer", count) 393 394 def _log(self, level, msg, *args, **kwargs): 395 if not mlog.isEnabledFor(level): 396 return 397 398 mlog.log(level, f"{self.log_prefix}: {msg}", *args, **kwargs)
Modbus master
Master( conn: hat.drivers.modbus.transport.connection.Connection, modbus_type: ModbusType, response_timeout: float | None)
65 def __init__(self, 66 conn: transport.Connection, 67 modbus_type: common.ModbusType, 68 response_timeout: float | None): 69 self._conn = conn 70 self._modbus_type = modbus_type 71 self._response_timeout = response_timeout 72 self._send_queue = aio.Queue() 73 74 if modbus_type == common.ModbusType.TCP: 75 self._next_transaction_ids = iter(i % 0x10000 76 for i in itertools.count(1)) 77 78 self.async_group.spawn(self._send_loop)
async_group: hat.aio.group.Group
80 @property 81 def async_group(self) -> aio.Group: 82 """Async group""" 83 return self._conn.async_group
Async group
log_prefix: str
85 @property 86 def log_prefix(self) -> str: 87 """Logging prefix""" 88 return self._conn.log_prefix
Logging prefix
async def
read( self, device_id: int, data_type: DataType, start_address: int, quantity: int = 1) -> list[int] | Error:
90 async def read(self, 91 device_id: int, 92 data_type: common.DataType, 93 start_address: int, 94 quantity: int = 1 95 ) -> list[int] | common.Error: 96 """Read data from modbus device 97 98 Argument `quantity` is ignored if `data_type` is `QUEUE`. 99 100 Args: 101 device_id: slave device identifier 102 data_type: data type 103 start_address: starting modbus data address 104 quantity: number of data values 105 106 Raises: 107 ConnectionError 108 TimeoutError 109 110 """ 111 if device_id == 0: 112 raise ValueError('unsupported device id') 113 114 if data_type == common.DataType.COIL: 115 req = transport.ReadCoilsReq(address=start_address, 116 quantity=quantity) 117 118 elif data_type == common.DataType.DISCRETE_INPUT: 119 req = transport.ReadDiscreteInputsReq(address=start_address, 120 quantity=quantity) 121 122 elif data_type == common.DataType.HOLDING_REGISTER: 123 req = transport.ReadHoldingRegistersReq(address=start_address, 124 quantity=quantity) 125 126 elif data_type == common.DataType.INPUT_REGISTER: 127 req = transport.ReadInputRegistersReq(address=start_address, 128 quantity=quantity) 129 130 elif data_type == common.DataType.QUEUE: 131 req = transport.ReadFifoQueueReq(address=start_address) 132 133 else: 134 raise ValueError('unsupported data type') 135 136 res = await self._send(device_id, req) 137 138 if isinstance(res, transport.ErrorRes): 139 return res.error 140 141 if isinstance(res, (transport.ReadCoilsRes, 142 transport.ReadDiscreteInputsRes, 143 transport.ReadHoldingRegistersRes, 144 transport.ReadInputRegistersRes)): 145 return res.values[:quantity] 146 147 if isinstance(res, transport.ReadFifoQueueRes): 148 return res.values 149 150 raise ValueError("unsupported response pdu")
Read data from modbus device
Argument quantity
is ignored if data_type
is QUEUE
.
Arguments:
- device_id: slave device identifier
- data_type: data type
- start_address: starting modbus data address
- quantity: number of data values
Raises:
- ConnectionError
- TimeoutError
async def
write( self, device_id: int, data_type: DataType, start_address: int, values: List[int]) -> Error | None:
152 async def write(self, 153 device_id: int, 154 data_type: common.DataType, 155 start_address: int, 156 values: typing.List[int] 157 ) -> common.Error | None: 158 """Write data to modbus device 159 160 Data types `DISCRETE_INPUT`, `INPUT_REGISTER` and `QUEUE` are not 161 supported. 162 163 Args: 164 device_id: slave device identifier 165 data_type: data type 166 start_address: starting modbus data address 167 values: values 168 169 Raises: 170 ConnectionError 171 TimeoutError 172 173 """ 174 if data_type == common.DataType.COIL: 175 if len(values) == 1: 176 req = transport.WriteSingleCoilReq(address=start_address, 177 value=values[0]) 178 179 else: 180 req = transport.WriteMultipleCoilsReq(address=start_address, 181 values=values) 182 183 elif data_type == common.DataType.HOLDING_REGISTER: 184 if len(values) == 1: 185 req = transport.WriteSingleRegisterReq(address=start_address, 186 value=values[0]) 187 188 else: 189 req = transport.WriteMultipleRegistersReq( 190 address=start_address, 191 values=values) 192 193 else: 194 raise ValueError('unsupported data type') 195 196 res = await self._send(device_id, req) 197 198 if isinstance(res, transport.ErrorRes): 199 return res.error 200 201 if not isinstance(res, (transport.WriteSingleCoilRes, 202 transport.WriteMultipleCoilsRes, 203 transport.WriteSingleRegisterRes, 204 transport.WriteMultipleRegistersRes)): 205 raise ValueError("unsupported response pdu") 206 207 if (res.address != start_address): 208 raise Exception("invalid response pdu address") 209 210 if isinstance(res, (transport.WriteSingleCoilRes, 211 transport.WriteSingleRegisterRes)): 212 if (res.value != values[0]): 213 raise Exception("invalid response pdu value") 214 215 if isinstance(res, (transport.WriteMultipleCoilsRes, 216 transport.WriteMultipleRegistersRes)): 217 if (res.quantity != len(values)): 218 raise Exception("invalid response pdu quantity")
Write data to modbus device
Data types DISCRETE_INPUT
, INPUT_REGISTER
and QUEUE
are not
supported.
Arguments:
- device_id: slave device identifier
- data_type: data type
- start_address: starting modbus data address
- values: values
Raises:
- ConnectionError
- TimeoutError
async def
write_mask( self, device_id: int, address: int, and_mask: int, or_mask: int) -> Error | None:
220 async def write_mask(self, 221 device_id: int, 222 address: int, 223 and_mask: int, 224 or_mask: int 225 ) -> common.Error | None: 226 """Write mask to modbus device HOLDING_REGISTER 227 228 Args: 229 device_id: slave device identifier 230 address: modbus data address 231 and_mask: and mask 232 or_mask: or mask 233 234 Raises: 235 ConnectionError 236 TimeoutError 237 238 """ 239 req = transport.MaskWriteRegisterReq(address=address, 240 and_mask=and_mask, 241 or_mask=or_mask) 242 243 res = await self._send(device_id, req) 244 245 if isinstance(res, transport.ErrorRes): 246 return res.error 247 248 if not isinstance(res, transport.MaskWriteRegisterRes): 249 raise ValueError("unsupported response pdu") 250 251 if (res.address != address): 252 raise Exception("invalid response pdu address") 253 254 if (res.and_mask != and_mask): 255 raise Exception("invalid response pdu and mask") 256 257 if (res.or_mask != or_mask): 258 raise Exception("invalid response pdu or mask")
Write mask to modbus device HOLDING_REGISTER
Arguments:
- device_id: slave device identifier
- address: modbus data address
- and_mask: and mask
- or_mask: or mask
Raises:
- ConnectionError
- TimeoutError
SlaveCb =
typing.Callable[[ForwardRef('Slave')], typing.Optional[typing.Awaitable[NoneType]]]
async def
create_tcp_server( modbus_type: ModbusType, addr: hat.drivers.tcp.Address, slave_cb: Optional[Callable[[Slave], Optional[Awaitable[NoneType]]]] = None, read_cb: Optional[Callable[[Slave, int, DataType, int, int | None], Union[list[int], Error, Awaitable[list[int] | Error]]]] = None, write_cb: Optional[Callable[[Slave, int, DataType, int, list[int]], Union[Error, NoneType, Awaitable[Error | None]]]] = None, write_mask_cb: Optional[Callable[[Slave, int, int, int, int], Union[Error, NoneType, Awaitable[Error | None]]]] = None, **kwargs) -> hat.drivers.tcp.Server:
81async def create_tcp_server(modbus_type: common.ModbusType, 82 addr: tcp.Address, 83 slave_cb: typing.Optional[SlaveCb] = None, 84 read_cb: typing.Optional[ReadCb] = None, 85 write_cb: typing.Optional[WriteCb] = None, 86 write_mask_cb: typing.Optional[WriteMaskCb] = None, 87 **kwargs 88 ) -> tcp.Server: 89 """Create TCP server 90 91 Closing server closes all active associated slaves. 92 93 Args: 94 modbus_type: modbus type 95 addr: local listening host address 96 slave_cb: slave callback 97 read_cb: read callback 98 write_cb: write callback 99 write_mask_cb: write mask callback 100 kwargs: additional arguments used for creating TCP server 101 (see `tcp.listen`) 102 103 """ 104 105 async def on_connection(conn): 106 slave = Slave(conn=conn, 107 modbus_type=modbus_type, 108 read_cb=read_cb, 109 write_cb=write_cb, 110 write_mask_cb=write_mask_cb) 111 112 try: 113 if slave_cb: 114 await aio.call(slave_cb, slave) 115 116 await slave.wait_closing() 117 118 except Exception as e: 119 if mlog.isEnabledFor(logging.ERROR): 120 mlog.error(f"tcp local ({addr.host}:{addr.port}): " 121 f"error in slave callback: %s", e, exc_info=e) 122 123 finally: 124 slave.close() 125 126 return await transport.tcp_listen(on_connection, addr, **kwargs)
Create TCP server
Closing server closes all active associated slaves.
Arguments:
- modbus_type: modbus type
- addr: local listening host address
- slave_cb: slave callback
- read_cb: read callback
- write_cb: write callback
- write_mask_cb: write mask callback
- kwargs: additional arguments used for creating TCP server
(see
tcp.listen
)
async def
create_serial_slave( modbus_type: ModbusType, port: str, read_cb: Optional[Callable[[Slave, int, DataType, int, int | None], Union[list[int], Error, Awaitable[list[int] | Error]]]] = None, write_cb: Optional[Callable[[Slave, int, DataType, int, list[int]], Union[Error, NoneType, Awaitable[Error | None]]]] = None, write_mask_cb: Optional[Callable[[Slave, int, int, int, int], Union[Error, NoneType, Awaitable[Error | None]]]] = None, silent_interval: float = 0.005, **kwargs) -> Slave:
129async def create_serial_slave(modbus_type: common.ModbusType, 130 port: str, 131 read_cb: typing.Optional[ReadCb] = None, 132 write_cb: typing.Optional[WriteCb] = None, 133 write_mask_cb: typing.Optional[WriteMaskCb] = None, # NOQA 134 silent_interval: float = 0.005, 135 **kwargs 136 ) -> 'Slave': 137 """Create serial slave 138 139 Args: 140 modbus_type: modbus type 141 port: port name (see `serial.create`) 142 read_cb: read callback 143 write_cb: write callback 144 write_mask_cb: write mask callback 145 silent_interval: silent interval (see `serial.create`) 146 kwargs: additional arguments used for opening serial connection 147 (see `serial.create`) 148 149 """ 150 conn = await transport.serial_create(port, 151 silent_interval=silent_interval, 152 **kwargs) 153 return Slave(conn=conn, 154 modbus_type=modbus_type, 155 read_cb=read_cb, 156 write_cb=write_cb, 157 write_mask_cb=write_mask_cb)
Create serial slave
Arguments:
- modbus_type: modbus type
- port: port name (see
serial.create
) - read_cb: read callback
- write_cb: write callback
- write_mask_cb: write mask callback
- silent_interval: silent interval (see
serial.create
) - kwargs: additional arguments used for opening serial connection
(see
serial.create
)
class
Slave(hat.aio.group.Resource):
160class Slave(aio.Resource): 161 """Modbus slave""" 162 163 def __init__(self, 164 conn: transport.Connection, 165 modbus_type: common.ModbusType, 166 read_cb: typing.Optional[ReadCb] = None, 167 write_cb: typing.Optional[WriteCb] = None, 168 write_mask_cb: typing.Optional[WriteMaskCb] = None): 169 self._conn = conn 170 self._modbus_type = modbus_type 171 self._read_cb = read_cb 172 self._write_cb = write_cb 173 self._write_mask_cb = write_mask_cb 174 175 self.async_group.spawn(self._receive_loop) 176 177 @property 178 def async_group(self) -> aio.Group: 179 """Async group""" 180 return self._conn.async_group 181 182 @property 183 def log_prefix(self) -> str: 184 """Logging prefix""" 185 return self._conn.log_prefix 186 187 async def _receive_loop(self): 188 self._log(logging.DEBUG, "starting slave receive loop") 189 try: 190 while True: 191 try: 192 self._log(logging.DEBUG, "waiting for request") 193 req_adu = await self._conn.receive( 194 modbus_type=self._modbus_type, 195 direction=transport.Direction.REQUEST) 196 197 except ConnectionError: 198 break 199 200 except Exception as e: 201 self._log(logging.WARNING, "error receiving request: %s", 202 e, exc_info=e) 203 continue 204 205 device_id = req_adu.device_id 206 req = req_adu.pdu 207 208 try: 209 self._log(logging.DEBUG, 210 "processing request (device_id %s): %s", 211 device_id, req) 212 res = await self._process_request(device_id, req) 213 214 except Exception as e: 215 self._log(logging.WARNING, "error processing request: %s", 216 e, exc_info=e) 217 continue 218 219 if device_id == 0: 220 self._log(logging.DEBUG, 221 "skip sending response (broadcast request): %s", 222 res) 223 continue 224 225 self._log(logging.DEBUG, "sending response: %s", res) 226 227 if self._modbus_type == common.ModbusType.TCP: 228 res_adu = transport.TcpAdu( 229 transaction_id=req_adu.transaction_id, 230 device_id=req_adu.device_id, 231 pdu=res) 232 233 elif self._modbus_type == common.ModbusType.RTU: 234 res_adu = transport.RtuAdu(device_id=req_adu.device_id, 235 pdu=res) 236 237 elif self._modbus_type == common.ModbusType.ASCII: 238 res_adu = transport.AsciiAdu(device_id=req_adu.device_id, 239 pdu=res) 240 241 else: 242 raise ValueError("invalid modbus type") 243 244 try: 245 await self._conn.send(res_adu) 246 247 except ConnectionError: 248 break 249 250 except Exception as e: 251 self._log(logging.WARNING, "error sending response: %s", e, 252 exc_info=e) 253 continue 254 255 except Exception as e: 256 self._log(logging.ERROR, "receive loop error: %s", e, exc_info=e) 257 258 finally: 259 self._log(logging.DEBUG, "closing slave receive loop") 260 self.close() 261 262 async def _process_request(self, device_id, req): 263 if isinstance(req, transport.ReadCoilsReq): 264 result = await self._call_read_cb( 265 device_id=device_id, 266 data_type=common.DataType.COIL, 267 start_address=req.address, 268 quantity=req.quantity) 269 270 if isinstance(result, common.Error): 271 return transport.ErrorRes( 272 fc=transport.FunctionCode.READ_COILS, 273 error=result) 274 275 return transport.ReadCoilsRes(values=result) 276 277 if isinstance(req, transport.ReadDiscreteInputsReq): 278 result = await self._call_read_cb( 279 device_id=device_id, 280 data_type=common.DataType.DISCRETE_INPUT, 281 start_address=req.address, 282 quantity=req.quantity) 283 284 if isinstance(result, common.Error): 285 return transport.ErrorRes( 286 fc=transport.FunctionCode.READ_DISCRETE_INPUTS, 287 error=result) 288 289 return transport.ReadDiscreteInputsRes(values=result) 290 291 if isinstance(req, transport.ReadHoldingRegistersReq): 292 result = await self._call_read_cb( 293 device_id=device_id, 294 data_type=common.DataType.HOLDING_REGISTER, 295 start_address=req.address, 296 quantity=req.quantity) 297 298 if isinstance(result, common.Error): 299 return transport.ErrorRes( 300 fc=transport.FunctionCode.READ_HOLDING_REGISTERS, 301 error=result) 302 303 return transport.ReadHoldingRegistersRes(values=result) 304 305 if isinstance(req, transport.ReadInputRegistersReq): 306 result = await self._call_read_cb( 307 device_id=device_id, 308 data_type=common.DataType.INPUT_REGISTER, 309 start_address=req.address, 310 quantity=req.quantity) 311 312 if isinstance(result, common.Error): 313 return transport.ErrorRes( 314 fc=transport.FunctionCode.READ_INPUT_REGISTERS, 315 error=result) 316 317 return transport.ReadInputRegistersRes(values=result) 318 319 if isinstance(req, transport.WriteSingleCoilReq): 320 result = await self._call_write_cb( 321 device_id=device_id, 322 data_type=common.DataType.COIL, 323 start_address=req.address, 324 values=[req.value]) 325 326 if isinstance(result, common.Error): 327 return transport.ErrorRes( 328 fc=transport.FunctionCode.WRITE_SINGLE_COIL, 329 error=result) 330 331 return transport.WriteSingleCoilRes(address=req.address, 332 value=req.value) 333 334 if isinstance(req, transport.WriteSingleRegisterReq): 335 result = await self._call_write_cb( 336 device_id=device_id, 337 data_type=common.DataType.HOLDING_REGISTER, 338 start_address=req.address, 339 values=[req.value]) 340 341 if isinstance(result, common.Error): 342 return transport.ErrorRes( 343 fc=transport.FunctionCode.WRITE_SINGLE_REGISTER, 344 error=result) 345 346 return transport.WriteSingleRegisterRes(address=req.address, 347 value=req.value) 348 349 if isinstance(req, transport.WriteMultipleCoilsReq): 350 result = await self._call_write_cb( 351 device_id=device_id, 352 data_type=common.DataType.COIL, 353 start_address=req.address, 354 values=req.values) 355 356 if isinstance(result, common.Error): 357 return transport.ErrorRes( 358 fc=transport.FunctionCode.WRITE_MULTIPLE_COILS, 359 error=result) 360 361 return transport.WriteMultipleCoilsRes(address=req.address, 362 quantity=len(req.values)) 363 364 if isinstance(req, transport.WriteMultipleRegistersReq): 365 result = await self._call_write_cb( 366 device_id=device_id, 367 data_type=common.DataType.HOLDING_REGISTER, 368 start_address=req.address, 369 values=req.values) 370 371 if isinstance(result, common.Error): 372 return transport.ErrorRes( 373 fc=transport.FunctionCode.WRITE_MULTIPLE_REGISTER, 374 error=result) 375 376 return transport.WriteMultipleRegistersRes( 377 address=req.address, 378 quantity=len(req.values)) 379 380 if isinstance(req, transport.MaskWriteRegisterReq): 381 result = await self._call_write_mask_cb( 382 device_id=device_id, 383 address=req.address, 384 and_mask=req.and_mask, 385 or_mask=req.or_mask) 386 387 if isinstance(result, common.Error): 388 return transport.ErrorRes( 389 fc=transport.FunctionCode.MASK_WRITE_REGISTER, 390 error=result) 391 392 return transport.MaskWriteRegisterRes(address=req.address, 393 and_mask=req.and_mask, 394 or_mask=req.or_mask) 395 396 if isinstance(req, transport.ReadFifoQueueReq): 397 result = await self._call_read_cb( 398 device_id=device_id, 399 data_type=common.DataType.QUEUE, 400 start_address=req.address, 401 quantity=None) 402 403 if isinstance(result, common.Error): 404 return transport.ErrorRes( 405 fc=transport.FunctionCode.READ_FIFO_QUEUE, 406 error=result) 407 408 return transport.ReadFifoQueueRes(values=result) 409 410 return transport.ErrorRes(fc=transport.get_pdu_function_code(req), 411 error=common.Error.INVALID_FUNCTION_CODE) 412 413 async def _call_read_cb(self, device_id, data_type, start_address, 414 quantity): 415 if not self._read_cb: 416 self._log(logging.DEBUG, "read callback not defined") 417 return common.Error.FUNCTION_ERROR 418 419 try: 420 return await aio.call(self._read_cb, self, device_id, data_type, 421 start_address, quantity) 422 423 except Exception as e: 424 self._log(logging.WARNING, "error in read callback: %s", e, 425 exc_info=e) 426 return common.Error.FUNCTION_ERROR 427 428 async def _call_write_cb(self, device_id, data_type, start_address, 429 values): 430 if not self._write_cb: 431 self._log(logging.DEBUG, "write callback not defined") 432 return common.Error.FUNCTION_ERROR 433 434 try: 435 return await aio.call(self._write_cb, self, device_id, data_type, 436 start_address, values) 437 438 except Exception as e: 439 self._log(logging.WARNING, "error in write callback: %s", e, 440 exc_info=e) 441 return common.Error.FUNCTION_ERROR 442 443 async def _call_write_mask_cb(self, device_id, address, and_mask, 444 or_mask): 445 if not self._write_mask_cb: 446 self._log(logging.DEBUG, "write mask callback not defined") 447 return common.Error.FUNCTION_ERROR 448 449 try: 450 return await aio.call(self._write_mask_cb, self, device_id, 451 address, and_mask, or_mask) 452 453 except Exception as e: 454 self._log(logging.WARNING, "error in write mask callback: %s", e, 455 exc_info=e) 456 return common.Error.FUNCTION_ERROR 457 458 def _log(self, level, msg, *args, **kwargs): 459 if not mlog.isEnabledFor(level): 460 return 461 462 mlog.log(level, f"{self.log_prefix}: {msg}", *args, **kwargs)
Modbus slave
Slave( conn: hat.drivers.modbus.transport.connection.Connection, modbus_type: ModbusType, read_cb: Optional[Callable[[Slave, int, DataType, int, int | None], Union[list[int], Error, Awaitable[list[int] | Error]]]] = None, write_cb: Optional[Callable[[Slave, int, DataType, int, list[int]], Union[Error, NoneType, Awaitable[Error | None]]]] = None, write_mask_cb: Optional[Callable[[Slave, int, int, int, int], Union[Error, NoneType, Awaitable[Error | None]]]] = None)
163 def __init__(self, 164 conn: transport.Connection, 165 modbus_type: common.ModbusType, 166 read_cb: typing.Optional[ReadCb] = None, 167 write_cb: typing.Optional[WriteCb] = None, 168 write_mask_cb: typing.Optional[WriteMaskCb] = None): 169 self._conn = conn 170 self._modbus_type = modbus_type 171 self._read_cb = read_cb 172 self._write_cb = write_cb 173 self._write_mask_cb = write_mask_cb 174 175 self.async_group.spawn(self._receive_loop)