hat.drivers.modbus
Modbus communication protocol
1"""Modbus communication protocol""" 2 3from hat.drivers.modbus.common import (ModbusType, 4 DataType, 5 Error, 6 apply_mask) 7from hat.drivers.modbus.master import (create_tcp_master, 8 create_serial_master, 9 Master) 10from hat.drivers.modbus.slave import (SlaveCb, 11 ReadCb, 12 WriteCb, 13 WriteMaskCb, 14 create_tcp_server, 15 create_serial_slave, 16 Slave) 17 18 19__all__ = ['ModbusType', 20 'DataType', 21 'Error', 22 'apply_mask', 23 'create_tcp_master', 24 'create_serial_master', 25 'Master', 26 'SlaveCb', 27 'ReadCb', 28 'WriteCb', 29 'WriteMaskCb', 30 'create_tcp_server', 31 'create_serial_slave', 32 'Slave']
class
ModbusType(enum.Enum):
TCP =
<ModbusType.TCP: 0>
RTU =
<ModbusType.RTU: 1>
ASCII =
<ModbusType.ASCII: 2>
class
DataType(enum.Enum):
13class DataType(enum.Enum): 14 COIL = 1 15 DISCRETE_INPUT = 2 16 HOLDING_REGISTER = 3 17 INPUT_REGISTER = 4 18 QUEUE = 5
COIL =
<DataType.COIL: 1>
DISCRETE_INPUT =
<DataType.DISCRETE_INPUT: 2>
HOLDING_REGISTER =
<DataType.HOLDING_REGISTER: 3>
INPUT_REGISTER =
<DataType.INPUT_REGISTER: 4>
QUEUE =
<DataType.QUEUE: 5>
class
Error(enum.Enum):
21class Error(enum.Enum): 22 INVALID_FUNCTION_CODE = 0x01 23 INVALID_DATA_ADDRESS = 0x02 24 INVALID_DATA_VALUE = 0x03 25 FUNCTION_ERROR = 0x04 26 GATEWAY_PATH_UNAVAILABLE = 0x0a 27 GATEWAY_TARGET_DEVICE_FAILED_TO_RESPOND = 0x0b
INVALID_FUNCTION_CODE =
<Error.INVALID_FUNCTION_CODE: 1>
INVALID_DATA_ADDRESS =
<Error.INVALID_DATA_ADDRESS: 2>
INVALID_DATA_VALUE =
<Error.INVALID_DATA_VALUE: 3>
FUNCTION_ERROR =
<Error.FUNCTION_ERROR: 4>
GATEWAY_PATH_UNAVAILABLE =
<Error.GATEWAY_PATH_UNAVAILABLE: 10>
GATEWAY_TARGET_DEVICE_FAILED_TO_RESPOND =
<Error.GATEWAY_TARGET_DEVICE_FAILED_TO_RESPOND: 11>
def
apply_mask(value: int, and_mask: int, or_mask: int) -> int:
30def apply_mask(value: int, and_mask: int, or_mask: int) -> int: 31 """Apply mask to value""" 32 return (value & and_mask) | (or_mask & (~and_mask))
Apply mask to value
async def
create_tcp_master( modbus_type: ModbusType, addr: hat.drivers.tcp.Address, *, response_timeout: float | None = None, **kwargs) -> Master:
21async def create_tcp_master(modbus_type: common.ModbusType, 22 addr: tcp.Address, 23 *, 24 response_timeout: float | None = None, 25 **kwargs 26 ) -> 'Master': 27 """Create TCP master 28 29 Args: 30 modbus_type: modbus type 31 addr: remote host address 32 response_timeout: response timeout in seconds 33 kwargs: additional arguments used for creating TCP connection 34 (see `tcp.connect`) 35 36 """ 37 conn = await tcp.connect(addr, **kwargs) 38 39 try: 40 return Master(link=transport.TcpLink(conn), 41 modbus_type=modbus_type, 42 response_timeout=response_timeout) 43 44 except BaseException: 45 await aio.uncancellable(conn.async_close()) 46 raise
Create TCP master
Arguments:
- modbus_type: modbus type
- addr: remote host address
- response_timeout: response timeout in seconds
- kwargs: additional arguments used for creating TCP connection
(see
tcp.connect)
async def
create_serial_master( modbus_type: ModbusType, port: str, *, silent_interval: float = 0.005, response_timeout: float | None = None, **kwargs) -> Master:
49async def create_serial_master(modbus_type: common.ModbusType, 50 port: str, 51 *, 52 silent_interval: float = 0.005, 53 response_timeout: float | None = None, 54 **kwargs 55 ) -> 'Master': 56 """Create serial master 57 58 Args: 59 modbus_type: modbus type 60 port: port name (see `serial.create`) 61 silent_interval: silent interval (see `serial.create`) 62 response_timeout: response timeout in seconds 63 kwargs: additional arguments used for opening serial connection 64 (see `serial.create`) 65 66 """ 67 endpoint = await serial.create(port, 68 silent_interval=silent_interval, 69 **kwargs) 70 71 try: 72 return Master(link=transport.SerialLink(endpoint), 73 modbus_type=modbus_type, 74 response_timeout=response_timeout) 75 76 except BaseException: 77 await aio.uncancellable(endpoint.async_close()) 78 raise
Create serial master
Arguments:
- modbus_type: modbus type
- port: port name (see
serial.create) - silent_interval: silent interval (see
serial.create) - response_timeout: response timeout in seconds
- kwargs: additional arguments used for opening serial connection
(see
serial.create)
class
Master(hat.aio.group.Resource):
81class Master(aio.Resource): 82 """Modbus master""" 83 84 def __init__(self, 85 link: transport.Link, 86 modbus_type: common.ModbusType, 87 response_timeout: float | None): 88 self._modbus_type = modbus_type 89 self._response_timeout = response_timeout 90 self._conn = transport.Connection(link) 91 self._send_queue = aio.Queue() 92 self._log = _create_logger_adapter(self._conn.info) 93 94 if modbus_type == common.ModbusType.TCP: 95 self._next_transaction_ids = iter(i % 0x10000 96 for i in itertools.count(1)) 97 98 self.async_group.spawn(self._send_loop) 99 100 self._log.debug('master created') 101 102 @property 103 def async_group(self) -> aio.Group: 104 """Async group""" 105 return self._conn.async_group 106 107 @property 108 def info(self) -> tcp.ConnectionInfo | serial.EndpointInfo: 109 """Connection or endpoint info""" 110 return self._conn.info 111 112 async def read(self, 113 device_id: int, 114 data_type: common.DataType, 115 start_address: int, 116 quantity: int = 1 117 ) -> list[int] | common.Error: 118 """Read data from modbus device 119 120 Argument `quantity` is ignored if `data_type` is `QUEUE`. 121 122 Args: 123 device_id: slave device identifier 124 data_type: data type 125 start_address: starting modbus data address 126 quantity: number of data values 127 128 Raises: 129 ConnectionError 130 TimeoutError 131 132 """ 133 if device_id == 0: 134 raise ValueError('unsupported device id') 135 136 if data_type == common.DataType.COIL: 137 req = transport.ReadCoilsReq(address=start_address, 138 quantity=quantity) 139 140 elif data_type == common.DataType.DISCRETE_INPUT: 141 req = transport.ReadDiscreteInputsReq(address=start_address, 142 quantity=quantity) 143 144 elif data_type == common.DataType.HOLDING_REGISTER: 145 req = transport.ReadHoldingRegistersReq(address=start_address, 146 quantity=quantity) 147 148 elif data_type == common.DataType.INPUT_REGISTER: 149 req = transport.ReadInputRegistersReq(address=start_address, 150 quantity=quantity) 151 152 elif data_type == common.DataType.QUEUE: 153 req = transport.ReadFifoQueueReq(address=start_address) 154 155 else: 156 raise ValueError('unsupported data type') 157 158 res = await self._send(device_id, req) 159 160 if isinstance(res, transport.ErrorRes): 161 return res.error 162 163 if isinstance(res, (transport.ReadCoilsRes, 164 transport.ReadDiscreteInputsRes, 165 transport.ReadHoldingRegistersRes, 166 transport.ReadInputRegistersRes)): 167 return res.values[:quantity] 168 169 if isinstance(res, transport.ReadFifoQueueRes): 170 return res.values 171 172 raise ValueError("unsupported response pdu") 173 174 async def write(self, 175 device_id: int, 176 data_type: common.DataType, 177 start_address: int, 178 values: typing.List[int] 179 ) -> common.Error | None: 180 """Write data to modbus device 181 182 Data types `DISCRETE_INPUT`, `INPUT_REGISTER` and `QUEUE` are not 183 supported. 184 185 Args: 186 device_id: slave device identifier 187 data_type: data type 188 start_address: starting modbus data address 189 values: values 190 191 Raises: 192 ConnectionError 193 TimeoutError 194 195 """ 196 if data_type == common.DataType.COIL: 197 if len(values) == 1: 198 req = transport.WriteSingleCoilReq(address=start_address, 199 value=values[0]) 200 201 else: 202 req = transport.WriteMultipleCoilsReq(address=start_address, 203 values=values) 204 205 elif data_type == common.DataType.HOLDING_REGISTER: 206 if len(values) == 1: 207 req = transport.WriteSingleRegisterReq(address=start_address, 208 value=values[0]) 209 210 else: 211 req = transport.WriteMultipleRegistersReq( 212 address=start_address, 213 values=values) 214 215 else: 216 raise ValueError('unsupported data type') 217 218 res = await self._send(device_id, req) 219 220 if isinstance(res, transport.ErrorRes): 221 return res.error 222 223 if not isinstance(res, (transport.WriteSingleCoilRes, 224 transport.WriteMultipleCoilsRes, 225 transport.WriteSingleRegisterRes, 226 transport.WriteMultipleRegistersRes)): 227 raise ValueError("unsupported response pdu") 228 229 if (res.address != start_address): 230 raise Exception("invalid response pdu address") 231 232 if isinstance(res, (transport.WriteSingleCoilRes, 233 transport.WriteSingleRegisterRes)): 234 if (res.value != values[0]): 235 raise Exception("invalid response pdu value") 236 237 if isinstance(res, (transport.WriteMultipleCoilsRes, 238 transport.WriteMultipleRegistersRes)): 239 if (res.quantity != len(values)): 240 raise Exception("invalid response pdu quantity") 241 242 async def write_mask(self, 243 device_id: int, 244 address: int, 245 and_mask: int, 246 or_mask: int 247 ) -> common.Error | None: 248 """Write mask to modbus device HOLDING_REGISTER 249 250 Args: 251 device_id: slave device identifier 252 address: modbus data address 253 and_mask: and mask 254 or_mask: or mask 255 256 Raises: 257 ConnectionError 258 TimeoutError 259 260 """ 261 req = transport.MaskWriteRegisterReq(address=address, 262 and_mask=and_mask, 263 or_mask=or_mask) 264 265 res = await self._send(device_id, req) 266 267 if isinstance(res, transport.ErrorRes): 268 return res.error 269 270 if not isinstance(res, transport.MaskWriteRegisterRes): 271 raise ValueError("unsupported response pdu") 272 273 if (res.address != address): 274 raise Exception("invalid response pdu address") 275 276 if (res.and_mask != and_mask): 277 raise Exception("invalid response pdu and mask") 278 279 if (res.or_mask != or_mask): 280 raise Exception("invalid response pdu or mask") 281 282 async def _send(self, device_id, req): 283 if self._modbus_type == common.ModbusType.TCP: 284 req_adu = transport.TcpAdu( 285 transaction_id=next(self._next_transaction_ids), 286 device_id=device_id, 287 pdu=req) 288 289 elif self._modbus_type == common.ModbusType.RTU: 290 req_adu = transport.RtuAdu(device_id=device_id, 291 pdu=req) 292 293 elif self._modbus_type == common.ModbusType.ASCII: 294 req_adu = transport.AsciiAdu(device_id=device_id, 295 pdu=req) 296 297 else: 298 raise ValueError("unsupported modbus type") 299 300 future = asyncio.Future() 301 try: 302 self._send_queue.put_nowait((req_adu, future)) 303 res_adu = await future 304 305 except aio.QueueClosedError: 306 raise ConnectionError() 307 308 return res_adu.pdu 309 310 async def _receive(self, req_adu): 311 while True: 312 res_adu = await self._conn.receive(self._modbus_type, 313 transport.Direction.RESPONSE) 314 315 if isinstance(res_adu, transport.TcpAdu): 316 if res_adu.transaction_id != req_adu.transaction_id: 317 self._log.warning("discarding response adu: " 318 "invalid response transaction id") 319 continue 320 321 if res_adu.device_id != req_adu.device_id: 322 self._log.warning("discarding response adu: " 323 "invalid response device id") 324 continue 325 326 req_fc = transport.get_pdu_function_code(req_adu.pdu) 327 res_fc = transport.get_pdu_function_code(res_adu.pdu) 328 if req_fc != res_fc: 329 self._log.warning("discarding response adu: " 330 "invalid response function code") 331 continue 332 333 return res_adu 334 335 async def _send_loop(self): 336 self._log.debug("starting master send loop") 337 future = None 338 try: 339 while self.is_open: 340 # req_adu, future = await self._send_queue.get() 341 342 async with self.async_group.create_subgroup() as subgroup: 343 subgroup.spawn(self._clear_input_buffer_loop) 344 self._log.debug("started discarding incomming data") 345 346 while not future or future.done(): 347 req_adu, future = await self._send_queue.get() 348 349 await self._clear_input_buffer() 350 self._log.debug("stopped discarding incomming data") 351 352 await self._conn.send(req_adu) 353 await self._conn.drain() 354 355 async with self.async_group.create_subgroup( 356 log_exceptions=False) as subgroup: 357 receive_task = subgroup.spawn(self._receive, req_adu) 358 359 await asyncio.wait([receive_task, future], 360 timeout=self._response_timeout, 361 return_when=asyncio.FIRST_COMPLETED) 362 363 if future.done(): 364 continue 365 366 if receive_task.done(): 367 future.set_result(receive_task.result()) 368 369 else: 370 future.set_exception(TimeoutError()) 371 372 except ConnectionError: 373 pass 374 375 except Exception as e: 376 self._log.error("error in send loop: %s", e, exc_info=e) 377 378 finally: 379 self._log.debug("stopping master send loop") 380 self.close() 381 self._send_queue.close() 382 383 while True: 384 if future and not future.done(): 385 future.set_exception(ConnectionError()) 386 if self._send_queue.empty(): 387 break 388 _, future = self._send_queue.get_nowait() 389 390 async def _clear_input_buffer_loop(self): 391 try: 392 while True: 393 await self._clear_input_buffer() 394 395 await self._conn.read_byte() 396 self._log.debug("discarded 1 byte from input buffer") 397 398 except ConnectionError: 399 self.close() 400 401 except Exception as e: 402 self._log.error("error in reset input buffer loop: %s", e, 403 exc_info=e) 404 self.close() 405 406 async def _clear_input_buffer(self): 407 count = await self._conn.clear_input_buffer() 408 if not count: 409 return 410 self._log.debug("discarded %s bytes from input buffer", count)
Modbus master
Master( link: hat.drivers.modbus.transport.connection.Link, modbus_type: ModbusType, response_timeout: float | None)
84 def __init__(self, 85 link: transport.Link, 86 modbus_type: common.ModbusType, 87 response_timeout: float | None): 88 self._modbus_type = modbus_type 89 self._response_timeout = response_timeout 90 self._conn = transport.Connection(link) 91 self._send_queue = aio.Queue() 92 self._log = _create_logger_adapter(self._conn.info) 93 94 if modbus_type == common.ModbusType.TCP: 95 self._next_transaction_ids = iter(i % 0x10000 96 for i in itertools.count(1)) 97 98 self.async_group.spawn(self._send_loop) 99 100 self._log.debug('master created')
async_group: hat.aio.group.Group
102 @property 103 def async_group(self) -> aio.Group: 104 """Async group""" 105 return self._conn.async_group
Async group
107 @property 108 def info(self) -> tcp.ConnectionInfo | serial.EndpointInfo: 109 """Connection or endpoint info""" 110 return self._conn.info
Connection or endpoint info
async def
read( self, device_id: int, data_type: DataType, start_address: int, quantity: int = 1) -> list[int] | Error:
112 async def read(self, 113 device_id: int, 114 data_type: common.DataType, 115 start_address: int, 116 quantity: int = 1 117 ) -> list[int] | common.Error: 118 """Read data from modbus device 119 120 Argument `quantity` is ignored if `data_type` is `QUEUE`. 121 122 Args: 123 device_id: slave device identifier 124 data_type: data type 125 start_address: starting modbus data address 126 quantity: number of data values 127 128 Raises: 129 ConnectionError 130 TimeoutError 131 132 """ 133 if device_id == 0: 134 raise ValueError('unsupported device id') 135 136 if data_type == common.DataType.COIL: 137 req = transport.ReadCoilsReq(address=start_address, 138 quantity=quantity) 139 140 elif data_type == common.DataType.DISCRETE_INPUT: 141 req = transport.ReadDiscreteInputsReq(address=start_address, 142 quantity=quantity) 143 144 elif data_type == common.DataType.HOLDING_REGISTER: 145 req = transport.ReadHoldingRegistersReq(address=start_address, 146 quantity=quantity) 147 148 elif data_type == common.DataType.INPUT_REGISTER: 149 req = transport.ReadInputRegistersReq(address=start_address, 150 quantity=quantity) 151 152 elif data_type == common.DataType.QUEUE: 153 req = transport.ReadFifoQueueReq(address=start_address) 154 155 else: 156 raise ValueError('unsupported data type') 157 158 res = await self._send(device_id, req) 159 160 if isinstance(res, transport.ErrorRes): 161 return res.error 162 163 if isinstance(res, (transport.ReadCoilsRes, 164 transport.ReadDiscreteInputsRes, 165 transport.ReadHoldingRegistersRes, 166 transport.ReadInputRegistersRes)): 167 return res.values[:quantity] 168 169 if isinstance(res, transport.ReadFifoQueueRes): 170 return res.values 171 172 raise ValueError("unsupported response pdu")
Read data from modbus device
Argument quantity is ignored if data_type is QUEUE.
Arguments:
- device_id: slave device identifier
- data_type: data type
- start_address: starting modbus data address
- quantity: number of data values
Raises:
- ConnectionError
- TimeoutError
async def
write( self, device_id: int, data_type: DataType, start_address: int, values: List[int]) -> Error | None:
174 async def write(self, 175 device_id: int, 176 data_type: common.DataType, 177 start_address: int, 178 values: typing.List[int] 179 ) -> common.Error | None: 180 """Write data to modbus device 181 182 Data types `DISCRETE_INPUT`, `INPUT_REGISTER` and `QUEUE` are not 183 supported. 184 185 Args: 186 device_id: slave device identifier 187 data_type: data type 188 start_address: starting modbus data address 189 values: values 190 191 Raises: 192 ConnectionError 193 TimeoutError 194 195 """ 196 if data_type == common.DataType.COIL: 197 if len(values) == 1: 198 req = transport.WriteSingleCoilReq(address=start_address, 199 value=values[0]) 200 201 else: 202 req = transport.WriteMultipleCoilsReq(address=start_address, 203 values=values) 204 205 elif data_type == common.DataType.HOLDING_REGISTER: 206 if len(values) == 1: 207 req = transport.WriteSingleRegisterReq(address=start_address, 208 value=values[0]) 209 210 else: 211 req = transport.WriteMultipleRegistersReq( 212 address=start_address, 213 values=values) 214 215 else: 216 raise ValueError('unsupported data type') 217 218 res = await self._send(device_id, req) 219 220 if isinstance(res, transport.ErrorRes): 221 return res.error 222 223 if not isinstance(res, (transport.WriteSingleCoilRes, 224 transport.WriteMultipleCoilsRes, 225 transport.WriteSingleRegisterRes, 226 transport.WriteMultipleRegistersRes)): 227 raise ValueError("unsupported response pdu") 228 229 if (res.address != start_address): 230 raise Exception("invalid response pdu address") 231 232 if isinstance(res, (transport.WriteSingleCoilRes, 233 transport.WriteSingleRegisterRes)): 234 if (res.value != values[0]): 235 raise Exception("invalid response pdu value") 236 237 if isinstance(res, (transport.WriteMultipleCoilsRes, 238 transport.WriteMultipleRegistersRes)): 239 if (res.quantity != len(values)): 240 raise Exception("invalid response pdu quantity")
Write data to modbus device
Data types DISCRETE_INPUT, INPUT_REGISTER and QUEUE are not
supported.
Arguments:
- device_id: slave device identifier
- data_type: data type
- start_address: starting modbus data address
- values: values
Raises:
- ConnectionError
- TimeoutError
async def
write_mask( self, device_id: int, address: int, and_mask: int, or_mask: int) -> Error | None:
242 async def write_mask(self, 243 device_id: int, 244 address: int, 245 and_mask: int, 246 or_mask: int 247 ) -> common.Error | None: 248 """Write mask to modbus device HOLDING_REGISTER 249 250 Args: 251 device_id: slave device identifier 252 address: modbus data address 253 and_mask: and mask 254 or_mask: or mask 255 256 Raises: 257 ConnectionError 258 TimeoutError 259 260 """ 261 req = transport.MaskWriteRegisterReq(address=address, 262 and_mask=and_mask, 263 or_mask=or_mask) 264 265 res = await self._send(device_id, req) 266 267 if isinstance(res, transport.ErrorRes): 268 return res.error 269 270 if not isinstance(res, transport.MaskWriteRegisterRes): 271 raise ValueError("unsupported response pdu") 272 273 if (res.address != address): 274 raise Exception("invalid response pdu address") 275 276 if (res.and_mask != and_mask): 277 raise Exception("invalid response pdu and mask") 278 279 if (res.or_mask != or_mask): 280 raise Exception("invalid response pdu or mask")
Write mask to modbus device HOLDING_REGISTER
Arguments:
- device_id: slave device identifier
- address: modbus data address
- and_mask: and mask
- or_mask: or mask
Raises:
- ConnectionError
- TimeoutError
SlaveCb =
typing.Callable[[ForwardRef('Slave')], None | collections.abc.Awaitable[None]]
async def
create_tcp_server( modbus_type: ModbusType, addr: hat.drivers.tcp.Address, *, slave_cb: Optional[Callable[[Slave], None | Awaitable[None]]] = None, read_cb: Optional[Callable[[Slave, int, DataType, int, int | None], list[int] | Error | Awaitable[list[int] | Error]]] = None, write_cb: Optional[Callable[[Slave, int, DataType, int, list[int]], Error | None | Awaitable[Error | None]]] = None, write_mask_cb: Optional[Callable[[Slave, int, int, int, int], Error | None | Awaitable[Error | None]]] = None, **kwargs) -> hat.drivers.tcp.Server:
82async def create_tcp_server(modbus_type: common.ModbusType, 83 addr: tcp.Address, 84 *, 85 slave_cb: SlaveCb | None = None, 86 read_cb: ReadCb | None = None, 87 write_cb: WriteCb | None = None, 88 write_mask_cb: WriteMaskCb | None = None, 89 **kwargs 90 ) -> tcp.Server: 91 """Create TCP server 92 93 Closing server closes all active associated slaves. 94 95 Args: 96 modbus_type: modbus type 97 addr: local listening host address 98 slave_cb: slave callback 99 read_cb: read callback 100 write_cb: write callback 101 write_mask_cb: write mask callback 102 kwargs: additional arguments used for creating TCP server 103 (see `tcp.listen`) 104 105 """ 106 107 async def on_connection(conn): 108 if not conn.is_open: 109 return 110 111 log = _create_logger_adapter(conn.info) 112 113 slave = Slave(link=transport.TcpLink(conn), 114 modbus_type=modbus_type, 115 read_cb=read_cb, 116 write_cb=write_cb, 117 write_mask_cb=write_mask_cb) 118 119 try: 120 if slave_cb: 121 await aio.call(slave_cb, slave) 122 123 await slave.wait_closing() 124 125 except Exception as e: 126 log.error("error in slave callback: %s", e, exc_info=e) 127 128 finally: 129 await aio.uncancellable(slave.async_close()) 130 131 server = await tcp.listen(on_connection, addr, 132 bind_connections=True, 133 **kwargs) 134 135 return server
Create TCP server
Closing server closes all active associated slaves.
Arguments:
- modbus_type: modbus type
- addr: local listening host address
- slave_cb: slave callback
- read_cb: read callback
- write_cb: write callback
- write_mask_cb: write mask callback
- kwargs: additional arguments used for creating TCP server
(see
tcp.listen)
async def
create_serial_slave( modbus_type: ModbusType, port: str, *, read_cb: Optional[Callable[[Slave, int, DataType, int, int | None], list[int] | Error | Awaitable[list[int] | Error]]] = None, write_cb: Optional[Callable[[Slave, int, DataType, int, list[int]], Error | None | Awaitable[Error | None]]] = None, write_mask_cb: Optional[Callable[[Slave, int, int, int, int], Error | None | Awaitable[Error | None]]] = None, silent_interval: float = 0.005, **kwargs) -> Slave:
138async def create_serial_slave(modbus_type: common.ModbusType, 139 port: str, 140 *, 141 read_cb: ReadCb | None = None, 142 write_cb: WriteCb | None = None, 143 write_mask_cb: WriteMaskCb | None = None, 144 silent_interval: float = 0.005, 145 **kwargs 146 ) -> 'Slave': 147 """Create serial slave 148 149 Args: 150 modbus_type: modbus type 151 port: port name (see `serial.create`) 152 read_cb: read callback 153 write_cb: write callback 154 write_mask_cb: write mask callback 155 silent_interval: silent interval (see `serial.create`) 156 kwargs: additional arguments used for opening serial connection 157 (see `serial.create`) 158 159 """ 160 endpoint = await serial.create(port, 161 silent_interval=silent_interval, 162 **kwargs) 163 164 try: 165 return Slave(link=transport.SerialLink(endpoint), 166 modbus_type=modbus_type, 167 read_cb=read_cb, 168 write_cb=write_cb, 169 write_mask_cb=write_mask_cb) 170 171 except BaseException: 172 await aio.uncancellable(endpoint.async_close()) 173 raise
Create serial slave
Arguments:
- modbus_type: modbus type
- port: port name (see
serial.create) - read_cb: read callback
- write_cb: write callback
- write_mask_cb: write mask callback
- silent_interval: silent interval (see
serial.create) - kwargs: additional arguments used for opening serial connection
(see
serial.create)
class
Slave(hat.aio.group.Resource):
176class Slave(aio.Resource): 177 """Modbus slave""" 178 179 def __init__(self, 180 link: transport.Link, 181 modbus_type: common.ModbusType, 182 read_cb: ReadCb | None = None, 183 write_cb: WriteCb | None = None, 184 write_mask_cb: WriteMaskCb | None = None): 185 self._modbus_type = modbus_type 186 self._read_cb = read_cb 187 self._write_cb = write_cb 188 self._write_mask_cb = write_mask_cb 189 self._conn = transport.Connection(link) 190 self._log = _create_logger_adapter(self._conn.info) 191 192 self.async_group.spawn(self._receive_loop) 193 194 self._log.debug('slave created') 195 196 @property 197 def async_group(self) -> aio.Group: 198 """Async group""" 199 return self._conn.async_group 200 201 @property 202 def info(self) -> tcp.ConnectionInfo | serial.EndpointInfo: 203 """Connection or endpoint info""" 204 return self._conn.info 205 206 async def _receive_loop(self): 207 self._log.debug("starting slave receive loop") 208 try: 209 while True: 210 try: 211 self._log.debug("waiting for request") 212 req_adu = await self._conn.receive( 213 modbus_type=self._modbus_type, 214 direction=transport.Direction.REQUEST) 215 216 except ConnectionError: 217 break 218 219 except Exception as e: 220 self._log.warning("error receiving request: %s", e, 221 exc_info=e) 222 continue 223 224 device_id = req_adu.device_id 225 req = req_adu.pdu 226 227 try: 228 self._log.debug("processing request (device_id %s): %s", 229 device_id, req) 230 res = await self._process_request(device_id, req) 231 232 except Exception as e: 233 self._log.warning("error processing request: %s", e, 234 exc_info=e) 235 continue 236 237 if device_id == 0: 238 self._log.debug( 239 "skip sending response (broadcast request): %s", res) 240 continue 241 242 self._log.debug("sending response: %s", res) 243 244 if self._modbus_type == common.ModbusType.TCP: 245 res_adu = transport.TcpAdu( 246 transaction_id=req_adu.transaction_id, 247 device_id=req_adu.device_id, 248 pdu=res) 249 250 elif self._modbus_type == common.ModbusType.RTU: 251 res_adu = transport.RtuAdu(device_id=req_adu.device_id, 252 pdu=res) 253 254 elif self._modbus_type == common.ModbusType.ASCII: 255 res_adu = transport.AsciiAdu(device_id=req_adu.device_id, 256 pdu=res) 257 258 else: 259 raise ValueError("invalid modbus type") 260 261 try: 262 await self._conn.send(res_adu) 263 264 except ConnectionError: 265 break 266 267 except Exception as e: 268 self._log.warning("error sending response: %s", e, 269 exc_info=e) 270 continue 271 272 except Exception as e: 273 self._log.error("receive loop error: %s", e, exc_info=e) 274 275 finally: 276 self._log.debug("closing slave receive loop") 277 self.close() 278 279 async def _process_request(self, device_id, req): 280 if isinstance(req, transport.ReadCoilsReq): 281 result = await self._call_read_cb( 282 device_id=device_id, 283 data_type=common.DataType.COIL, 284 start_address=req.address, 285 quantity=req.quantity) 286 287 if isinstance(result, common.Error): 288 return transport.ErrorRes( 289 fc=transport.FunctionCode.READ_COILS, 290 error=result) 291 292 return transport.ReadCoilsRes(values=result) 293 294 if isinstance(req, transport.ReadDiscreteInputsReq): 295 result = await self._call_read_cb( 296 device_id=device_id, 297 data_type=common.DataType.DISCRETE_INPUT, 298 start_address=req.address, 299 quantity=req.quantity) 300 301 if isinstance(result, common.Error): 302 return transport.ErrorRes( 303 fc=transport.FunctionCode.READ_DISCRETE_INPUTS, 304 error=result) 305 306 return transport.ReadDiscreteInputsRes(values=result) 307 308 if isinstance(req, transport.ReadHoldingRegistersReq): 309 result = await self._call_read_cb( 310 device_id=device_id, 311 data_type=common.DataType.HOLDING_REGISTER, 312 start_address=req.address, 313 quantity=req.quantity) 314 315 if isinstance(result, common.Error): 316 return transport.ErrorRes( 317 fc=transport.FunctionCode.READ_HOLDING_REGISTERS, 318 error=result) 319 320 return transport.ReadHoldingRegistersRes(values=result) 321 322 if isinstance(req, transport.ReadInputRegistersReq): 323 result = await self._call_read_cb( 324 device_id=device_id, 325 data_type=common.DataType.INPUT_REGISTER, 326 start_address=req.address, 327 quantity=req.quantity) 328 329 if isinstance(result, common.Error): 330 return transport.ErrorRes( 331 fc=transport.FunctionCode.READ_INPUT_REGISTERS, 332 error=result) 333 334 return transport.ReadInputRegistersRes(values=result) 335 336 if isinstance(req, transport.WriteSingleCoilReq): 337 result = await self._call_write_cb( 338 device_id=device_id, 339 data_type=common.DataType.COIL, 340 start_address=req.address, 341 values=[req.value]) 342 343 if isinstance(result, common.Error): 344 return transport.ErrorRes( 345 fc=transport.FunctionCode.WRITE_SINGLE_COIL, 346 error=result) 347 348 return transport.WriteSingleCoilRes(address=req.address, 349 value=req.value) 350 351 if isinstance(req, transport.WriteSingleRegisterReq): 352 result = await self._call_write_cb( 353 device_id=device_id, 354 data_type=common.DataType.HOLDING_REGISTER, 355 start_address=req.address, 356 values=[req.value]) 357 358 if isinstance(result, common.Error): 359 return transport.ErrorRes( 360 fc=transport.FunctionCode.WRITE_SINGLE_REGISTER, 361 error=result) 362 363 return transport.WriteSingleRegisterRes(address=req.address, 364 value=req.value) 365 366 if isinstance(req, transport.WriteMultipleCoilsReq): 367 result = await self._call_write_cb( 368 device_id=device_id, 369 data_type=common.DataType.COIL, 370 start_address=req.address, 371 values=req.values) 372 373 if isinstance(result, common.Error): 374 return transport.ErrorRes( 375 fc=transport.FunctionCode.WRITE_MULTIPLE_COILS, 376 error=result) 377 378 return transport.WriteMultipleCoilsRes(address=req.address, 379 quantity=len(req.values)) 380 381 if isinstance(req, transport.WriteMultipleRegistersReq): 382 result = await self._call_write_cb( 383 device_id=device_id, 384 data_type=common.DataType.HOLDING_REGISTER, 385 start_address=req.address, 386 values=req.values) 387 388 if isinstance(result, common.Error): 389 return transport.ErrorRes( 390 fc=transport.FunctionCode.WRITE_MULTIPLE_REGISTER, 391 error=result) 392 393 return transport.WriteMultipleRegistersRes( 394 address=req.address, 395 quantity=len(req.values)) 396 397 if isinstance(req, transport.MaskWriteRegisterReq): 398 result = await self._call_write_mask_cb( 399 device_id=device_id, 400 address=req.address, 401 and_mask=req.and_mask, 402 or_mask=req.or_mask) 403 404 if isinstance(result, common.Error): 405 return transport.ErrorRes( 406 fc=transport.FunctionCode.MASK_WRITE_REGISTER, 407 error=result) 408 409 return transport.MaskWriteRegisterRes(address=req.address, 410 and_mask=req.and_mask, 411 or_mask=req.or_mask) 412 413 if isinstance(req, transport.ReadFifoQueueReq): 414 result = await self._call_read_cb( 415 device_id=device_id, 416 data_type=common.DataType.QUEUE, 417 start_address=req.address, 418 quantity=None) 419 420 if isinstance(result, common.Error): 421 return transport.ErrorRes( 422 fc=transport.FunctionCode.READ_FIFO_QUEUE, 423 error=result) 424 425 return transport.ReadFifoQueueRes(values=result) 426 427 return transport.ErrorRes(fc=transport.get_pdu_function_code(req), 428 error=common.Error.INVALID_FUNCTION_CODE) 429 430 async def _call_read_cb(self, device_id, data_type, start_address, 431 quantity): 432 if not self._read_cb: 433 self._log.debug("read callback not defined") 434 return common.Error.FUNCTION_ERROR 435 436 try: 437 return await aio.call(self._read_cb, self, device_id, data_type, 438 start_address, quantity) 439 440 except Exception as e: 441 self._log.warning("error in read callback: %s", e, 442 exc_info=e) 443 return common.Error.FUNCTION_ERROR 444 445 async def _call_write_cb(self, device_id, data_type, start_address, 446 values): 447 if not self._write_cb: 448 self._log.debug("write callback not defined") 449 return common.Error.FUNCTION_ERROR 450 451 try: 452 return await aio.call(self._write_cb, self, device_id, data_type, 453 start_address, values) 454 455 except Exception as e: 456 self._log.warning("error in write callback: %s", e, 457 exc_info=e) 458 return common.Error.FUNCTION_ERROR 459 460 async def _call_write_mask_cb(self, device_id, address, and_mask, 461 or_mask): 462 if not self._write_mask_cb: 463 self._log.debug("write mask callback not defined") 464 return common.Error.FUNCTION_ERROR 465 466 try: 467 return await aio.call(self._write_mask_cb, self, device_id, 468 address, and_mask, or_mask) 469 470 except Exception as e: 471 self._log.warning("error in write mask callback: %s", e, 472 exc_info=e) 473 return common.Error.FUNCTION_ERROR
Modbus slave
Slave( link: hat.drivers.modbus.transport.connection.Link, modbus_type: ModbusType, read_cb: Optional[Callable[[Slave, int, DataType, int, int | None], list[int] | Error | Awaitable[list[int] | Error]]] = None, write_cb: Optional[Callable[[Slave, int, DataType, int, list[int]], Error | None | Awaitable[Error | None]]] = None, write_mask_cb: Optional[Callable[[Slave, int, int, int, int], Error | None | Awaitable[Error | None]]] = None)
179 def __init__(self, 180 link: transport.Link, 181 modbus_type: common.ModbusType, 182 read_cb: ReadCb | None = None, 183 write_cb: WriteCb | None = None, 184 write_mask_cb: WriteMaskCb | None = None): 185 self._modbus_type = modbus_type 186 self._read_cb = read_cb 187 self._write_cb = write_cb 188 self._write_mask_cb = write_mask_cb 189 self._conn = transport.Connection(link) 190 self._log = _create_logger_adapter(self._conn.info) 191 192 self.async_group.spawn(self._receive_loop) 193 194 self._log.debug('slave created')
async_group: hat.aio.group.Group
196 @property 197 def async_group(self) -> aio.Group: 198 """Async group""" 199 return self._conn.async_group
Async group