hat.drivers.modbus
Modbus communication protocol
1"""Modbus communication protocol""" 2 3from hat.drivers.modbus.common import (DeviceId, 4 DataAddress, 5 DataValues, 6 ModbusType, 7 DataType, 8 Error, 9 Success, 10 ReadReq, 11 ReadRes, 12 WriteReq, 13 WriteRes, 14 WriteMaskReq, 15 WriteMaskRes, 16 Request, 17 Response, 18 apply_mask) 19from hat.drivers.modbus.master import (create_tcp_master, 20 create_serial_master, 21 Master) 22from hat.drivers.modbus.slave import (SlaveCb, 23 RequestCb, 24 create_tcp_server, 25 create_serial_slave, 26 Slave) 27 28 29__all__ = ['DeviceId', 30 'DataAddress', 31 'DataValues', 32 'ModbusType', 33 'DataType', 34 'Error', 35 'Success', 36 'ReadReq', 37 'ReadRes', 38 'WriteReq', 39 'WriteRes', 40 'WriteMaskReq', 41 'WriteMaskRes', 42 'Request', 43 'Response', 44 'apply_mask', 45 'create_tcp_master', 46 'create_serial_master', 47 'Master', 48 'SlaveCb', 49 'RequestCb', 50 'create_tcp_server', 51 'create_serial_slave', 52 'Slave']
Modbus type
30class DataType(enum.Enum): 31 """Data type""" 32 COIL = 1 33 DISCRETE_INPUT = 2 34 HOLDING_REGISTER = 3 35 INPUT_REGISTER = 4 36 QUEUE = 5
Data type
39class Error(enum.Enum): 40 """Error""" 41 INVALID_FUNCTION_CODE = 0x01 42 INVALID_DATA_ADDRESS = 0x02 43 INVALID_DATA_VALUE = 0x03 44 FUNCTION_ERROR = 0x04 45 GATEWAY_PATH_UNAVAILABLE = 0x0a 46 GATEWAY_TARGET_DEVICE_FAILED_TO_RESPOND = 0x0b
Error
Success
53class ReadReq(typing.NamedTuple): 54 """Read request 55 56 If `data_type` is ``DataType.QUEUE``, `quantity` is ignored. 57 58 """ 59 device_id: DeviceId 60 data_type: DataType 61 start_address: DataAddress 62 quantity: int
Read request
If data_type is DataType.QUEUE, quantity is ignored.
Create new instance of ReadReq(device_id, data_type, start_address, quantity)
69class WriteReq(typing.NamedTuple): 70 """Write request""" 71 device_id: DeviceId 72 data_type: DataType 73 start_address: DataAddress 74 values: DataValues
Write request
Create new instance of WriteReq(device_id, data_type, start_address, values)
81class WriteMaskReq(typing.NamedTuple): 82 """Write mask request""" 83 device_id: DeviceId 84 address: DataAddress 85 and_mask: int 86 or_mask: int
Write mask request
101def apply_mask(value: int, and_mask: int, or_mask: int) -> int: 102 """Apply mask to value""" 103 return (value & and_mask) | (or_mask & (~and_mask))
Apply mask to value
20async def create_tcp_master(modbus_type: common.ModbusType, 21 addr: tcp.Address, 22 *, 23 response_timeout: float | None = None, 24 **kwargs 25 ) -> 'Master': 26 """Create TCP master 27 28 Additional arguments are passed directly to `hat.drivers.tcp.connect`. 29 30 Args: 31 modbus_type: modbus type 32 addr: remote host address 33 response_timeout: response timeout in seconds 34 35 """ 36 conn = await tcp.connect(addr, **kwargs) 37 38 try: 39 return Master(link=transport.TcpLink(conn), 40 modbus_type=modbus_type, 41 response_timeout=response_timeout) 42 43 except BaseException: 44 await aio.uncancellable(conn.async_close()) 45 raise
Create TCP master
Additional arguments are passed directly to hat.drivers.tcp.connect.
Arguments:
- modbus_type: modbus type
- addr: remote host address
- response_timeout: response timeout in seconds
48async def create_serial_master(modbus_type: common.ModbusType, 49 port: str, 50 *, 51 silent_interval: float = 0.005, 52 response_timeout: float | None = None, 53 **kwargs 54 ) -> 'Master': 55 """Create serial master 56 57 Additional arguments are passed directly to `hat.drivers.serial.create`. 58 59 Args: 60 modbus_type: modbus type 61 port: port name (see `hat.drivers.serial.create`) 62 silent_interval: silent interval (see `hat.drivers.serial.create`) 63 response_timeout: response timeout in seconds 64 65 """ 66 endpoint = await serial.create(port, 67 silent_interval=silent_interval, 68 **kwargs) 69 70 try: 71 return Master(link=transport.SerialLink(endpoint), 72 modbus_type=modbus_type, 73 response_timeout=response_timeout) 74 75 except BaseException: 76 await aio.uncancellable(endpoint.async_close()) 77 raise
Create serial master
Additional arguments are passed directly to hat.drivers.serial.create.
Arguments:
- modbus_type: modbus type
- port: port name (see
hat.drivers.serial.create) - silent_interval: silent interval (see
hat.drivers.serial.create) - response_timeout: response timeout in seconds
80class Master(aio.Resource): 81 """Modbus master""" 82 83 def __init__(self, 84 link: transport.Link, 85 modbus_type: common.ModbusType, 86 response_timeout: float | None): 87 self._modbus_type = modbus_type 88 self._response_timeout = response_timeout 89 self._conn = transport.Connection(link) 90 self._send_queue = aio.Queue() 91 self._loop = asyncio.get_running_loop() 92 self._log = _create_logger_adapter(self._conn.info) 93 94 if modbus_type == common.ModbusType.TCP: 95 self._next_transaction_ids = iter(i % 0x10000 96 for i in itertools.count(1)) 97 98 self.async_group.spawn(self._send_loop) 99 100 self._log.debug('master created') 101 102 @property 103 def async_group(self) -> aio.Group: 104 """Async group""" 105 return self._conn.async_group 106 107 @property 108 def info(self) -> tcp.ConnectionInfo | serial.EndpointInfo: 109 """Connection or endpoint info""" 110 return self._conn.info 111 112 async def send(self, req: common.Request) -> common.Response: 113 """Send request and wait for response 114 115 Args: 116 req: request 117 118 Returns: 119 response 120 121 Raises: 122 ConnectionError 123 TimeoutError 124 125 """ 126 if isinstance(req, common.ReadReq): 127 if req.data_type == common.DataType.COIL: 128 req_pdu = transport.ReadCoilsReq(address=req.start_address, 129 quantity=req.quantity) 130 131 elif req.data_type == common.DataType.DISCRETE_INPUT: 132 req_pdu = transport.ReadDiscreteInputsReq( 133 address=req.start_address, 134 quantity=req.quantity) 135 136 elif req.data_type == common.DataType.HOLDING_REGISTER: 137 req_pdu = transport.ReadHoldingRegistersReq( 138 address=req.start_address, 139 quantity=req.quantity) 140 141 elif req.data_type == common.DataType.INPUT_REGISTER: 142 req_pdu = transport.ReadInputRegistersReq( 143 address=req.start_address, 144 quantity=req.quantity) 145 146 elif req.data_type == common.DataType.QUEUE: 147 req_pdu = transport.ReadFifoQueueReq(address=req.start_address) 148 149 else: 150 raise ValueError('unsupported data type') 151 152 elif isinstance(req, common.WriteReq): 153 if req.data_type == common.DataType.COIL: 154 if len(req.values) == 1: 155 req_pdu = transport.WriteSingleCoilReq( 156 address=req.start_address, 157 value=req.values[0]) 158 159 else: 160 req_pdu = transport.WriteMultipleCoilsReq( 161 address=req.start_address, 162 values=req.values) 163 164 elif req.data_type == common.DataType.HOLDING_REGISTER: 165 if len(req.values) == 1: 166 req_pdu = transport.WriteSingleRegisterReq( 167 address=req.start_address, 168 value=req.values[0]) 169 170 else: 171 req_pdu = transport.WriteMultipleRegistersReq( 172 address=req.start_address, 173 values=req.values) 174 175 else: 176 raise ValueError('unsupported data type') 177 178 elif isinstance(req, common.WriteMaskReq): 179 req_pdu = transport.MaskWriteRegisterReq(address=req.address, 180 and_mask=req.and_mask, 181 or_mask=req.or_mask) 182 183 else: 184 raise TypeError('unsupported request') 185 186 res_pdu = await self._send(req.device_id, req_pdu) 187 188 if isinstance(res_pdu, transport.ErrorRes): 189 return res_pdu.error 190 191 if isinstance(req, common.ReadReq): 192 if isinstance(res_pdu, (transport.ReadCoilsRes, 193 transport.ReadDiscreteInputsRes, 194 transport.ReadHoldingRegistersRes, 195 transport.ReadInputRegistersRes)): 196 return res_pdu.values[:req.quantity] 197 198 if isinstance(res_pdu, transport.ReadFifoQueueRes): 199 return res_pdu.values 200 201 raise ValueError("unsupported response pdu") 202 203 elif isinstance(req, common.WriteReq): 204 if not isinstance(res_pdu, (transport.WriteSingleCoilRes, 205 transport.WriteMultipleCoilsRes, 206 transport.WriteSingleRegisterRes, 207 transport.WriteMultipleRegistersRes)): 208 raise ValueError("unsupported response pdu") 209 210 if (res_pdu.address != req.start_address): 211 raise Exception("invalid response pdu address") 212 213 if isinstance(res_pdu, (transport.WriteSingleCoilRes, 214 transport.WriteSingleRegisterRes)): 215 if (res_pdu.value != req.values[0]): 216 raise Exception("invalid response pdu value") 217 218 if isinstance(res_pdu, (transport.WriteMultipleCoilsRes, 219 transport.WriteMultipleRegistersRes)): 220 if (res_pdu.quantity != len(req.values)): 221 raise Exception("invalid response pdu quantity") 222 223 return common.Success() 224 225 elif isinstance(req, common.WriteMaskReq): 226 if not isinstance(res_pdu, transport.MaskWriteRegisterRes): 227 raise ValueError("unsupported response pdu") 228 229 if (res_pdu.address != req.address): 230 raise Exception("invalid response pdu address") 231 232 if (res_pdu.and_mask != req.and_mask): 233 raise Exception("invalid response pdu and mask") 234 235 if (res_pdu.or_mask != req.or_mask): 236 raise Exception("invalid response pdu or mask") 237 238 return common.Success() 239 240 else: 241 raise TypeError('unsupported request') 242 243 async def _send(self, device_id, req_pdu): 244 if self._modbus_type == common.ModbusType.TCP: 245 req_adu = transport.TcpAdu( 246 transaction_id=next(self._next_transaction_ids), 247 device_id=device_id, 248 pdu=req_pdu) 249 250 elif self._modbus_type == common.ModbusType.RTU: 251 req_adu = transport.RtuAdu(device_id=device_id, 252 pdu=req_pdu) 253 254 elif self._modbus_type == common.ModbusType.ASCII: 255 req_adu = transport.AsciiAdu(device_id=device_id, 256 pdu=req_pdu) 257 258 else: 259 raise ValueError("unsupported modbus type") 260 261 future = self._loop.create_future() 262 try: 263 self._send_queue.put_nowait((req_adu, future)) 264 res_adu = await future 265 266 except aio.QueueClosedError: 267 raise ConnectionError() 268 269 return res_adu.pdu 270 271 async def _receive(self, req_adu): 272 while True: 273 res_adu = await self._conn.receive(self._modbus_type, 274 transport.Direction.RESPONSE) 275 276 if isinstance(res_adu, transport.TcpAdu): 277 if res_adu.transaction_id != req_adu.transaction_id: 278 self._log.warning("discarding response adu: " 279 "invalid response transaction id") 280 continue 281 282 if res_adu.device_id != req_adu.device_id: 283 self._log.warning("discarding response adu: " 284 "invalid response device id") 285 continue 286 287 req_fc = transport.get_pdu_function_code(req_adu.pdu) 288 res_fc = transport.get_pdu_function_code(res_adu.pdu) 289 if req_fc != res_fc: 290 self._log.warning("discarding response adu: " 291 "invalid response function code") 292 continue 293 294 return res_adu 295 296 async def _send_loop(self): 297 self._log.debug("starting master send loop") 298 future = None 299 try: 300 while self.is_open: 301 # req_adu, future = await self._send_queue.get() 302 303 async with self.async_group.create_subgroup() as subgroup: 304 subgroup.spawn(self._clear_input_buffer_loop) 305 self._log.debug("started discarding incomming data") 306 307 while not future or future.done(): 308 req_adu, future = await self._send_queue.get() 309 310 await self._clear_input_buffer() 311 self._log.debug("stopped discarding incomming data") 312 313 await self._conn.send(req_adu) 314 await self._conn.drain() 315 316 async with self.async_group.create_subgroup( 317 log_exceptions=False) as subgroup: 318 receive_task = subgroup.spawn(self._receive, req_adu) 319 320 await asyncio.wait([receive_task, future], 321 timeout=self._response_timeout, 322 return_when=asyncio.FIRST_COMPLETED) 323 324 if future.done(): 325 continue 326 327 if receive_task.done(): 328 future.set_result(receive_task.result()) 329 330 else: 331 future.set_exception(TimeoutError()) 332 333 except ConnectionError: 334 pass 335 336 except Exception as e: 337 self._log.error("error in send loop: %s", e, exc_info=e) 338 339 finally: 340 self._log.debug("stopping master send loop") 341 self.close() 342 self._send_queue.close() 343 344 while True: 345 if future and not future.done(): 346 future.set_exception(ConnectionError()) 347 if self._send_queue.empty(): 348 break 349 _, future = self._send_queue.get_nowait() 350 351 async def _clear_input_buffer_loop(self): 352 try: 353 while True: 354 await self._clear_input_buffer() 355 356 await self._conn.read_byte() 357 self._log.debug("discarded 1 byte from input buffer") 358 359 except ConnectionError: 360 self.close() 361 362 except Exception as e: 363 self._log.error("error in reset input buffer loop: %s", e, 364 exc_info=e) 365 self.close() 366 367 async def _clear_input_buffer(self): 368 count = await self._conn.clear_input_buffer() 369 if not count: 370 return 371 self._log.debug("discarded %s bytes from input buffer", count)
Modbus master
83 def __init__(self, 84 link: transport.Link, 85 modbus_type: common.ModbusType, 86 response_timeout: float | None): 87 self._modbus_type = modbus_type 88 self._response_timeout = response_timeout 89 self._conn = transport.Connection(link) 90 self._send_queue = aio.Queue() 91 self._loop = asyncio.get_running_loop() 92 self._log = _create_logger_adapter(self._conn.info) 93 94 if modbus_type == common.ModbusType.TCP: 95 self._next_transaction_ids = iter(i % 0x10000 96 for i in itertools.count(1)) 97 98 self.async_group.spawn(self._send_loop) 99 100 self._log.debug('master created')
102 @property 103 def async_group(self) -> aio.Group: 104 """Async group""" 105 return self._conn.async_group
Async group
107 @property 108 def info(self) -> tcp.ConnectionInfo | serial.EndpointInfo: 109 """Connection or endpoint info""" 110 return self._conn.info
Connection or endpoint info
112 async def send(self, req: common.Request) -> common.Response: 113 """Send request and wait for response 114 115 Args: 116 req: request 117 118 Returns: 119 response 120 121 Raises: 122 ConnectionError 123 TimeoutError 124 125 """ 126 if isinstance(req, common.ReadReq): 127 if req.data_type == common.DataType.COIL: 128 req_pdu = transport.ReadCoilsReq(address=req.start_address, 129 quantity=req.quantity) 130 131 elif req.data_type == common.DataType.DISCRETE_INPUT: 132 req_pdu = transport.ReadDiscreteInputsReq( 133 address=req.start_address, 134 quantity=req.quantity) 135 136 elif req.data_type == common.DataType.HOLDING_REGISTER: 137 req_pdu = transport.ReadHoldingRegistersReq( 138 address=req.start_address, 139 quantity=req.quantity) 140 141 elif req.data_type == common.DataType.INPUT_REGISTER: 142 req_pdu = transport.ReadInputRegistersReq( 143 address=req.start_address, 144 quantity=req.quantity) 145 146 elif req.data_type == common.DataType.QUEUE: 147 req_pdu = transport.ReadFifoQueueReq(address=req.start_address) 148 149 else: 150 raise ValueError('unsupported data type') 151 152 elif isinstance(req, common.WriteReq): 153 if req.data_type == common.DataType.COIL: 154 if len(req.values) == 1: 155 req_pdu = transport.WriteSingleCoilReq( 156 address=req.start_address, 157 value=req.values[0]) 158 159 else: 160 req_pdu = transport.WriteMultipleCoilsReq( 161 address=req.start_address, 162 values=req.values) 163 164 elif req.data_type == common.DataType.HOLDING_REGISTER: 165 if len(req.values) == 1: 166 req_pdu = transport.WriteSingleRegisterReq( 167 address=req.start_address, 168 value=req.values[0]) 169 170 else: 171 req_pdu = transport.WriteMultipleRegistersReq( 172 address=req.start_address, 173 values=req.values) 174 175 else: 176 raise ValueError('unsupported data type') 177 178 elif isinstance(req, common.WriteMaskReq): 179 req_pdu = transport.MaskWriteRegisterReq(address=req.address, 180 and_mask=req.and_mask, 181 or_mask=req.or_mask) 182 183 else: 184 raise TypeError('unsupported request') 185 186 res_pdu = await self._send(req.device_id, req_pdu) 187 188 if isinstance(res_pdu, transport.ErrorRes): 189 return res_pdu.error 190 191 if isinstance(req, common.ReadReq): 192 if isinstance(res_pdu, (transport.ReadCoilsRes, 193 transport.ReadDiscreteInputsRes, 194 transport.ReadHoldingRegistersRes, 195 transport.ReadInputRegistersRes)): 196 return res_pdu.values[:req.quantity] 197 198 if isinstance(res_pdu, transport.ReadFifoQueueRes): 199 return res_pdu.values 200 201 raise ValueError("unsupported response pdu") 202 203 elif isinstance(req, common.WriteReq): 204 if not isinstance(res_pdu, (transport.WriteSingleCoilRes, 205 transport.WriteMultipleCoilsRes, 206 transport.WriteSingleRegisterRes, 207 transport.WriteMultipleRegistersRes)): 208 raise ValueError("unsupported response pdu") 209 210 if (res_pdu.address != req.start_address): 211 raise Exception("invalid response pdu address") 212 213 if isinstance(res_pdu, (transport.WriteSingleCoilRes, 214 transport.WriteSingleRegisterRes)): 215 if (res_pdu.value != req.values[0]): 216 raise Exception("invalid response pdu value") 217 218 if isinstance(res_pdu, (transport.WriteMultipleCoilsRes, 219 transport.WriteMultipleRegistersRes)): 220 if (res_pdu.quantity != len(req.values)): 221 raise Exception("invalid response pdu quantity") 222 223 return common.Success() 224 225 elif isinstance(req, common.WriteMaskReq): 226 if not isinstance(res_pdu, transport.MaskWriteRegisterRes): 227 raise ValueError("unsupported response pdu") 228 229 if (res_pdu.address != req.address): 230 raise Exception("invalid response pdu address") 231 232 if (res_pdu.and_mask != req.and_mask): 233 raise Exception("invalid response pdu and mask") 234 235 if (res_pdu.or_mask != req.or_mask): 236 raise Exception("invalid response pdu or mask") 237 238 return common.Success() 239 240 else: 241 raise TypeError('unsupported request')
Send request and wait for response
Arguments:
- req: request
Returns:
response
Raises:
- ConnectionError
- TimeoutError
26async def create_tcp_server(modbus_type: common.ModbusType, 27 addr: tcp.Address, 28 *, 29 slave_cb: SlaveCb | None = None, 30 request_cb: RequestCb | None = None, 31 **kwargs 32 ) -> tcp.Server: 33 """Create TCP server 34 35 Closing server closes all active associated slaves. 36 37 Additional arguments are passed directly to `hat.drivers.tcp.connect` 38 (`bind_connections` is set by this coroutine). 39 40 Args: 41 modbus_type: modbus type 42 addr: local listening host address 43 slave_cb: slave callback 44 request_cb: request callback 45 46 """ 47 48 async def on_connection(conn): 49 if not conn.is_open: 50 return 51 52 log = _create_logger_adapter(conn.info) 53 54 slave = Slave(link=transport.TcpLink(conn), 55 modbus_type=modbus_type, 56 request_cb=request_cb) 57 58 try: 59 if slave_cb: 60 await aio.call(slave_cb, slave) 61 62 await slave.wait_closing() 63 64 except Exception as e: 65 log.error("error in slave callback: %s", e, exc_info=e) 66 67 finally: 68 await aio.uncancellable(slave.async_close()) 69 70 server = await tcp.listen(on_connection, addr, 71 bind_connections=True, 72 **kwargs) 73 74 return server
Create TCP server
Closing server closes all active associated slaves.
Additional arguments are passed directly to hat.drivers.tcp.connect
(bind_connections is set by this coroutine).
Arguments:
- modbus_type: modbus type
- addr: local listening host address
- slave_cb: slave callback
- request_cb: request callback
77async def create_serial_slave(modbus_type: common.ModbusType, 78 port: str, 79 *, 80 request_cb: RequestCb | None = None, 81 silent_interval: float = 0.005, 82 **kwargs 83 ) -> 'Slave': 84 """Create serial slave 85 86 Additional arguments are passed directly to `hat.drivers.serial.create`. 87 88 Args: 89 modbus_type: modbus type 90 port: port name (see `hat.drivers.serial.create`) 91 request_cb: request callback 92 silent_interval: silent interval (see `serial.create`) 93 94 """ 95 endpoint = await serial.create(port, 96 silent_interval=silent_interval, 97 **kwargs) 98 99 try: 100 return Slave(link=transport.SerialLink(endpoint), 101 modbus_type=modbus_type, 102 request_cb=request_cb) 103 104 except BaseException: 105 await aio.uncancellable(endpoint.async_close()) 106 raise
Create serial slave
Additional arguments are passed directly to hat.drivers.serial.create.
Arguments:
- modbus_type: modbus type
- port: port name (see
hat.drivers.serial.create) - request_cb: request callback
- silent_interval: silent interval (see
serial.create)
109class Slave(aio.Resource): 110 """Modbus slave""" 111 112 def __init__(self, 113 link: transport.Link, 114 modbus_type: common.ModbusType, 115 request_cb: RequestCb | None = None): 116 self._modbus_type = modbus_type 117 self._request_cb = request_cb 118 self._conn = transport.Connection(link) 119 self._log = _create_logger_adapter(self._conn.info) 120 121 self.async_group.spawn(self._receive_loop) 122 123 self._log.debug('slave created') 124 125 @property 126 def async_group(self) -> aio.Group: 127 """Async group""" 128 return self._conn.async_group 129 130 @property 131 def info(self) -> tcp.ConnectionInfo | serial.EndpointInfo: 132 """Connection or endpoint info""" 133 return self._conn.info 134 135 async def _receive_loop(self): 136 self._log.debug("starting slave receive loop") 137 try: 138 while True: 139 try: 140 self._log.debug("waiting for request") 141 req_adu = await self._conn.receive( 142 modbus_type=self._modbus_type, 143 direction=transport.Direction.REQUEST) 144 145 except ConnectionError: 146 break 147 148 except Exception as e: 149 self._log.warning("error receiving request: %s", e, 150 exc_info=e) 151 continue 152 153 device_id = req_adu.device_id 154 req_pdu = req_adu.pdu 155 156 try: 157 self._log.debug("processing request (device_id %s): %s", 158 device_id, req_pdu) 159 res_pdu = await self._process_request(device_id, req_pdu) 160 161 except Exception as e: 162 self._log.warning("error processing request: %s", e, 163 exc_info=e) 164 continue 165 166 if res_pdu is None: 167 self._log.debug("skip sending response") 168 continue 169 170 self._log.debug("sending response: %s", res_pdu) 171 172 if self._modbus_type == common.ModbusType.TCP: 173 res_adu = transport.TcpAdu( 174 transaction_id=req_adu.transaction_id, 175 device_id=req_adu.device_id, 176 pdu=res_pdu) 177 178 elif self._modbus_type == common.ModbusType.RTU: 179 res_adu = transport.RtuAdu(device_id=req_adu.device_id, 180 pdu=res_pdu) 181 182 elif self._modbus_type == common.ModbusType.ASCII: 183 res_adu = transport.AsciiAdu(device_id=req_adu.device_id, 184 pdu=res_pdu) 185 186 else: 187 raise ValueError("invalid modbus type") 188 189 try: 190 await self._conn.send(res_adu) 191 192 except ConnectionError: 193 break 194 195 except Exception as e: 196 self._log.warning("error sending response: %s", e, 197 exc_info=e) 198 continue 199 200 except Exception as e: 201 self._log.error("receive loop error: %s", e, exc_info=e) 202 203 finally: 204 self._log.debug("closing slave receive loop") 205 self.close() 206 207 async def _process_request(self, device_id, req_pdu): 208 if self._request_cb is None: 209 return 210 211 if isinstance(req_pdu, transport.ReadCoilsReq): 212 req = common.ReadReq(device_id=device_id, 213 data_type=common.DataType.COIL, 214 start_address=req_pdu.address, 215 quantity=req_pdu.quantity) 216 217 elif isinstance(req_pdu, transport.ReadDiscreteInputsReq): 218 req = common.ReadReq(device_id=device_id, 219 data_type=common.DataType.DISCRETE_INPUT, 220 start_address=req_pdu.address, 221 quantity=req_pdu.quantity) 222 223 elif isinstance(req_pdu, transport.ReadHoldingRegistersReq): 224 req = common.ReadReq(device_id=device_id, 225 data_type=common.DataType.HOLDING_REGISTER, 226 start_address=req_pdu.address, 227 quantity=req_pdu.quantity) 228 229 elif isinstance(req_pdu, transport.ReadInputRegistersReq): 230 req = common.ReadReq(device_id=device_id, 231 data_type=common.DataType.INPUT_REGISTER, 232 start_address=req_pdu.address, 233 quantity=req_pdu.quantity) 234 235 elif isinstance(req_pdu, transport.WriteSingleCoilReq): 236 req = common.WriteReq(device_id=device_id, 237 data_type=common.DataType.COIL, 238 start_address=req_pdu.address, 239 values=[req_pdu.value]) 240 241 elif isinstance(req_pdu, transport.WriteSingleRegisterReq): 242 req = common.WriteReq(device_id=device_id, 243 data_type=common.DataType.HOLDING_REGISTER, 244 start_address=req_pdu.address, 245 values=[req_pdu.value]) 246 247 elif isinstance(req_pdu, transport.WriteMultipleCoilsReq): 248 req = common.WriteReq(device_id=device_id, 249 data_type=common.DataType.COIL, 250 start_address=req_pdu.address, 251 values=req_pdu.values) 252 253 elif isinstance(req_pdu, transport.WriteMultipleRegistersReq): 254 req = common.WriteReq(device_id=device_id, 255 data_type=common.DataType.HOLDING_REGISTER, 256 start_address=req_pdu.address, 257 values=req_pdu.values) 258 259 elif isinstance(req_pdu, transport.MaskWriteRegisterReq): 260 req = common.WriteMaskReq(device_id=device_id, 261 address=req_pdu.address, 262 and_mask=req_pdu.and_mask, 263 or_mask=req_pdu.or_mask) 264 265 elif isinstance(req_pdu, transport.ReadFifoQueueReq): 266 req = common.ReadReq(device_id=device_id, 267 data_type=common.DataType.QUEUE, 268 start_address=req_pdu.address, 269 quantity=0) 270 271 else: 272 raise TypeError('unsupported request') 273 274 res = await aio.call(self._request_cb, self, req) 275 276 if res is None: 277 return 278 279 if isinstance(res, common.Error): 280 return transport.ErrorRes( 281 fc=transport.get_pdu_function_code(req_pdu), 282 error=res) 283 284 if isinstance(req_pdu, transport.ReadCoilsReq): 285 return transport.ReadCoilsRes(values=res) 286 287 if isinstance(req_pdu, transport.ReadDiscreteInputsReq): 288 return transport.ReadDiscreteInputsRes(values=res) 289 290 if isinstance(req_pdu, transport.ReadHoldingRegistersReq): 291 return transport.ReadHoldingRegistersRes(values=res) 292 293 if isinstance(req_pdu, transport.ReadInputRegistersReq): 294 return transport.ReadInputRegistersRes(values=res) 295 296 if isinstance(req_pdu, transport.WriteSingleCoilReq): 297 return transport.WriteSingleCoilRes(address=req_pdu.address, 298 value=req_pdu.value) 299 300 if isinstance(req_pdu, transport.WriteSingleRegisterReq): 301 return transport.WriteSingleRegisterRes(address=req_pdu.address, 302 value=req_pdu.value) 303 304 if isinstance(req_pdu, transport.WriteMultipleCoilsReq): 305 return transport.WriteMultipleCoilsRes( 306 address=req_pdu.address, 307 quantity=len(req_pdu.values)) 308 309 if isinstance(req_pdu, transport.WriteMultipleRegistersReq): 310 return transport.WriteMultipleRegistersRes( 311 address=req_pdu.address, 312 quantity=len(req_pdu.values)) 313 314 if isinstance(req_pdu, transport.MaskWriteRegisterReq): 315 return transport.MaskWriteRegisterRes(address=req_pdu.address, 316 and_mask=req_pdu.and_mask, 317 or_mask=req_pdu.or_mask) 318 319 if isinstance(req_pdu, transport.ReadFifoQueueReq): 320 return transport.ReadFifoQueueRes(values=res) 321 322 raise TypeError('unsupported request')
Modbus slave
112 def __init__(self, 113 link: transport.Link, 114 modbus_type: common.ModbusType, 115 request_cb: RequestCb | None = None): 116 self._modbus_type = modbus_type 117 self._request_cb = request_cb 118 self._conn = transport.Connection(link) 119 self._log = _create_logger_adapter(self._conn.info) 120 121 self.async_group.spawn(self._receive_loop) 122 123 self._log.debug('slave created')
125 @property 126 def async_group(self) -> aio.Group: 127 """Async group""" 128 return self._conn.async_group
Async group