hat.drivers.modbus

Modbus communication protocol

 1"""Modbus communication protocol"""
 2
 3from hat.drivers.modbus.common import (ModbusType,
 4                                       DataType,
 5                                       Error,
 6                                       apply_mask)
 7from hat.drivers.modbus.master import (create_tcp_master,
 8                                       create_serial_master,
 9                                       Master)
10from hat.drivers.modbus.slave import (SlaveCb,
11                                      ReadCb,
12                                      WriteCb,
13                                      WriteMaskCb,
14                                      create_tcp_server,
15                                      create_serial_slave,
16                                      Slave)
17
18
19__all__ = ['ModbusType',
20           'DataType',
21           'Error',
22           'apply_mask',
23           'create_tcp_master',
24           'create_serial_master',
25           'Master',
26           'SlaveCb',
27           'ReadCb',
28           'WriteCb',
29           'WriteMaskCb',
30           'create_tcp_server',
31           'create_serial_slave',
32           'Slave']
class ModbusType(enum.Enum):
 7class ModbusType(enum.Enum):
 8    TCP = 0
 9    RTU = 1
10    ASCII = 2
TCP = <ModbusType.TCP: 0>
RTU = <ModbusType.RTU: 1>
ASCII = <ModbusType.ASCII: 2>
class DataType(enum.Enum):
13class DataType(enum.Enum):
14    COIL = 1
15    DISCRETE_INPUT = 2
16    HOLDING_REGISTER = 3
17    INPUT_REGISTER = 4
18    QUEUE = 5
COIL = <DataType.COIL: 1>
DISCRETE_INPUT = <DataType.DISCRETE_INPUT: 2>
HOLDING_REGISTER = <DataType.HOLDING_REGISTER: 3>
INPUT_REGISTER = <DataType.INPUT_REGISTER: 4>
QUEUE = <DataType.QUEUE: 5>
class Error(enum.Enum):
21class Error(enum.Enum):
22    INVALID_FUNCTION_CODE = 0x01
23    INVALID_DATA_ADDRESS = 0x02
24    INVALID_DATA_VALUE = 0x03
25    FUNCTION_ERROR = 0x04
26    GATEWAY_PATH_UNAVAILABLE = 0x0a
27    GATEWAY_TARGET_DEVICE_FAILED_TO_RESPOND = 0x0b
INVALID_FUNCTION_CODE = <Error.INVALID_FUNCTION_CODE: 1>
INVALID_DATA_ADDRESS = <Error.INVALID_DATA_ADDRESS: 2>
INVALID_DATA_VALUE = <Error.INVALID_DATA_VALUE: 3>
FUNCTION_ERROR = <Error.FUNCTION_ERROR: 4>
GATEWAY_PATH_UNAVAILABLE = <Error.GATEWAY_PATH_UNAVAILABLE: 10>
GATEWAY_TARGET_DEVICE_FAILED_TO_RESPOND = <Error.GATEWAY_TARGET_DEVICE_FAILED_TO_RESPOND: 11>
def apply_mask(value: int, and_mask: int, or_mask: int) -> int:
30def apply_mask(value: int, and_mask: int, or_mask: int) -> int:
31    """Apply mask to value"""
32    return (value & and_mask) | (or_mask & (~and_mask))

Apply mask to value

async def create_tcp_master( modbus_type: ModbusType, addr: hat.drivers.tcp.Address, *, response_timeout: float | None = None, **kwargs) -> Master:
21async def create_tcp_master(modbus_type: common.ModbusType,
22                            addr: tcp.Address,
23                            *,
24                            response_timeout: float | None = None,
25                            **kwargs
26                            ) -> 'Master':
27    """Create TCP master
28
29    Args:
30        modbus_type: modbus type
31        addr: remote host address
32        response_timeout: response timeout in seconds
33        kwargs: additional arguments used for creating TCP connection
34            (see `tcp.connect`)
35
36    """
37    conn = await tcp.connect(addr, **kwargs)
38
39    try:
40        return Master(link=transport.TcpLink(conn),
41                      modbus_type=modbus_type,
42                      response_timeout=response_timeout)
43
44    except BaseException:
45        await aio.uncancellable(conn.async_close())
46        raise

Create TCP master

Arguments:
  • modbus_type: modbus type
  • addr: remote host address
  • response_timeout: response timeout in seconds
  • kwargs: additional arguments used for creating TCP connection (see tcp.connect)
async def create_serial_master( modbus_type: ModbusType, port: str, *, silent_interval: float = 0.005, response_timeout: float | None = None, **kwargs) -> Master:
49async def create_serial_master(modbus_type: common.ModbusType,
50                               port: str,
51                               *,
52                               silent_interval: float = 0.005,
53                               response_timeout: float | None = None,
54                               **kwargs
55                               ) -> 'Master':
56    """Create serial master
57
58    Args:
59        modbus_type: modbus type
60        port: port name (see `serial.create`)
61        silent_interval: silent interval (see `serial.create`)
62        response_timeout: response timeout in seconds
63        kwargs: additional arguments used for opening serial connection
64            (see `serial.create`)
65
66    """
67    endpoint = await serial.create(port,
68                                   silent_interval=silent_interval,
69                                   **kwargs)
70
71    try:
72        return Master(link=transport.SerialLink(endpoint),
73                      modbus_type=modbus_type,
74                      response_timeout=response_timeout)
75
76    except BaseException:
77        await aio.uncancellable(endpoint.async_close())
78        raise

Create serial master

Arguments:
  • modbus_type: modbus type
  • port: port name (see serial.create)
  • silent_interval: silent interval (see serial.create)
  • response_timeout: response timeout in seconds
  • kwargs: additional arguments used for opening serial connection (see serial.create)
class Master(hat.aio.group.Resource):
 81class Master(aio.Resource):
 82    """Modbus master"""
 83
 84    def __init__(self,
 85                 link: transport.Link,
 86                 modbus_type: common.ModbusType,
 87                 response_timeout: float | None):
 88        self._modbus_type = modbus_type
 89        self._response_timeout = response_timeout
 90        self._conn = transport.Connection(link)
 91        self._send_queue = aio.Queue()
 92        self._log = _create_logger_adapter(self._conn.info)
 93
 94        if modbus_type == common.ModbusType.TCP:
 95            self._next_transaction_ids = iter(i % 0x10000
 96                                              for i in itertools.count(1))
 97
 98        self.async_group.spawn(self._send_loop)
 99
100        self._log.debug('master created')
101
102    @property
103    def async_group(self) -> aio.Group:
104        """Async group"""
105        return self._conn.async_group
106
107    @property
108    def info(self) -> tcp.ConnectionInfo | serial.EndpointInfo:
109        """Connection or endpoint info"""
110        return self._conn.info
111
112    async def read(self,
113                   device_id: int,
114                   data_type: common.DataType,
115                   start_address: int,
116                   quantity: int = 1
117                   ) -> list[int] | common.Error:
118        """Read data from modbus device
119
120        Argument `quantity` is ignored if `data_type` is `QUEUE`.
121
122        Args:
123            device_id: slave device identifier
124            data_type: data type
125            start_address: starting modbus data address
126            quantity: number of data values
127
128        Raises:
129            ConnectionError
130            TimeoutError
131
132        """
133        if device_id == 0:
134            raise ValueError('unsupported device id')
135
136        if data_type == common.DataType.COIL:
137            req = transport.ReadCoilsReq(address=start_address,
138                                         quantity=quantity)
139
140        elif data_type == common.DataType.DISCRETE_INPUT:
141            req = transport.ReadDiscreteInputsReq(address=start_address,
142                                                  quantity=quantity)
143
144        elif data_type == common.DataType.HOLDING_REGISTER:
145            req = transport.ReadHoldingRegistersReq(address=start_address,
146                                                    quantity=quantity)
147
148        elif data_type == common.DataType.INPUT_REGISTER:
149            req = transport.ReadInputRegistersReq(address=start_address,
150                                                  quantity=quantity)
151
152        elif data_type == common.DataType.QUEUE:
153            req = transport.ReadFifoQueueReq(address=start_address)
154
155        else:
156            raise ValueError('unsupported data type')
157
158        res = await self._send(device_id, req)
159
160        if isinstance(res, transport.ErrorRes):
161            return res.error
162
163        if isinstance(res, (transport.ReadCoilsRes,
164                            transport.ReadDiscreteInputsRes,
165                            transport.ReadHoldingRegistersRes,
166                            transport.ReadInputRegistersRes)):
167            return res.values[:quantity]
168
169        if isinstance(res, transport.ReadFifoQueueRes):
170            return res.values
171
172        raise ValueError("unsupported response pdu")
173
174    async def write(self,
175                    device_id: int,
176                    data_type: common.DataType,
177                    start_address: int,
178                    values: typing.List[int]
179                    ) -> common.Error | None:
180        """Write data to modbus device
181
182        Data types `DISCRETE_INPUT`, `INPUT_REGISTER` and `QUEUE` are not
183        supported.
184
185        Args:
186            device_id: slave device identifier
187            data_type: data type
188            start_address: starting modbus data address
189            values: values
190
191        Raises:
192            ConnectionError
193            TimeoutError
194
195        """
196        if data_type == common.DataType.COIL:
197            if len(values) == 1:
198                req = transport.WriteSingleCoilReq(address=start_address,
199                                                   value=values[0])
200
201            else:
202                req = transport.WriteMultipleCoilsReq(address=start_address,
203                                                      values=values)
204
205        elif data_type == common.DataType.HOLDING_REGISTER:
206            if len(values) == 1:
207                req = transport.WriteSingleRegisterReq(address=start_address,
208                                                       value=values[0])
209
210            else:
211                req = transport.WriteMultipleRegistersReq(
212                    address=start_address,
213                    values=values)
214
215        else:
216            raise ValueError('unsupported data type')
217
218        res = await self._send(device_id, req)
219
220        if isinstance(res, transport.ErrorRes):
221            return res.error
222
223        if not isinstance(res, (transport.WriteSingleCoilRes,
224                                transport.WriteMultipleCoilsRes,
225                                transport.WriteSingleRegisterRes,
226                                transport.WriteMultipleRegistersRes)):
227            raise ValueError("unsupported response pdu")
228
229        if (res.address != start_address):
230            raise Exception("invalid response pdu address")
231
232        if isinstance(res, (transport.WriteSingleCoilRes,
233                            transport.WriteSingleRegisterRes)):
234            if (res.value != values[0]):
235                raise Exception("invalid response pdu value")
236
237        if isinstance(res, (transport.WriteMultipleCoilsRes,
238                            transport.WriteMultipleRegistersRes)):
239            if (res.quantity != len(values)):
240                raise Exception("invalid response pdu quantity")
241
242    async def write_mask(self,
243                         device_id: int,
244                         address: int,
245                         and_mask: int,
246                         or_mask: int
247                         ) -> common.Error | None:
248        """Write mask to modbus device HOLDING_REGISTER
249
250        Args:
251            device_id: slave device identifier
252            address: modbus data address
253            and_mask: and mask
254            or_mask: or mask
255
256        Raises:
257            ConnectionError
258            TimeoutError
259
260        """
261        req = transport.MaskWriteRegisterReq(address=address,
262                                             and_mask=and_mask,
263                                             or_mask=or_mask)
264
265        res = await self._send(device_id, req)
266
267        if isinstance(res, transport.ErrorRes):
268            return res.error
269
270        if not isinstance(res, transport.MaskWriteRegisterRes):
271            raise ValueError("unsupported response pdu")
272
273        if (res.address != address):
274            raise Exception("invalid response pdu address")
275
276        if (res.and_mask != and_mask):
277            raise Exception("invalid response pdu and mask")
278
279        if (res.or_mask != or_mask):
280            raise Exception("invalid response pdu or mask")
281
282    async def _send(self, device_id, req):
283        if self._modbus_type == common.ModbusType.TCP:
284            req_adu = transport.TcpAdu(
285                transaction_id=next(self._next_transaction_ids),
286                device_id=device_id,
287                pdu=req)
288
289        elif self._modbus_type == common.ModbusType.RTU:
290            req_adu = transport.RtuAdu(device_id=device_id,
291                                       pdu=req)
292
293        elif self._modbus_type == common.ModbusType.ASCII:
294            req_adu = transport.AsciiAdu(device_id=device_id,
295                                         pdu=req)
296
297        else:
298            raise ValueError("unsupported modbus type")
299
300        future = asyncio.Future()
301        try:
302            self._send_queue.put_nowait((req_adu, future))
303            res_adu = await future
304
305        except aio.QueueClosedError:
306            raise ConnectionError()
307
308        return res_adu.pdu
309
310    async def _receive(self, req_adu):
311        while True:
312            res_adu = await self._conn.receive(self._modbus_type,
313                                               transport.Direction.RESPONSE)
314
315            if isinstance(res_adu, transport.TcpAdu):
316                if res_adu.transaction_id != req_adu.transaction_id:
317                    self._log.warning("discarding response adu: "
318                                      "invalid response transaction id")
319                    continue
320
321            if res_adu.device_id != req_adu.device_id:
322                self._log.warning("discarding response adu: "
323                                  "invalid response device id")
324                continue
325
326            req_fc = transport.get_pdu_function_code(req_adu.pdu)
327            res_fc = transport.get_pdu_function_code(res_adu.pdu)
328            if req_fc != res_fc:
329                self._log.warning("discarding response adu: "
330                                  "invalid response function code")
331                continue
332
333            return res_adu
334
335    async def _send_loop(self):
336        self._log.debug("starting master send loop")
337        future = None
338        try:
339            while self.is_open:
340                # req_adu, future = await self._send_queue.get()
341
342                async with self.async_group.create_subgroup() as subgroup:
343                    subgroup.spawn(self._clear_input_buffer_loop)
344                    self._log.debug("started discarding incomming data")
345
346                    while not future or future.done():
347                        req_adu, future = await self._send_queue.get()
348
349                await self._clear_input_buffer()
350                self._log.debug("stopped discarding incomming data")
351
352                await self._conn.send(req_adu)
353                await self._conn.drain()
354
355                async with self.async_group.create_subgroup(
356                        log_exceptions=False) as subgroup:
357                    receive_task = subgroup.spawn(self._receive, req_adu)
358
359                    await asyncio.wait([receive_task, future],
360                                       timeout=self._response_timeout,
361                                       return_when=asyncio.FIRST_COMPLETED)
362
363                    if future.done():
364                        continue
365
366                    if receive_task.done():
367                        future.set_result(receive_task.result())
368
369                    else:
370                        future.set_exception(TimeoutError())
371
372        except ConnectionError:
373            pass
374
375        except Exception as e:
376            self._log.error("error in send loop: %s", e, exc_info=e)
377
378        finally:
379            self._log.debug("stopping master send loop")
380            self.close()
381            self._send_queue.close()
382
383            while True:
384                if future and not future.done():
385                    future.set_exception(ConnectionError())
386                if self._send_queue.empty():
387                    break
388                _, future = self._send_queue.get_nowait()
389
390    async def _clear_input_buffer_loop(self):
391        try:
392            while True:
393                await self._clear_input_buffer()
394
395                await self._conn.read_byte()
396                self._log.debug("discarded 1 byte from input buffer")
397
398        except ConnectionError:
399            self.close()
400
401        except Exception as e:
402            self._log.error("error in reset input buffer loop: %s", e,
403                            exc_info=e)
404            self.close()
405
406    async def _clear_input_buffer(self):
407        count = await self._conn.clear_input_buffer()
408        if not count:
409            return
410        self._log.debug("discarded %s bytes from input buffer", count)

Modbus master

Master( link: hat.drivers.modbus.transport.connection.Link, modbus_type: ModbusType, response_timeout: float | None)
 84    def __init__(self,
 85                 link: transport.Link,
 86                 modbus_type: common.ModbusType,
 87                 response_timeout: float | None):
 88        self._modbus_type = modbus_type
 89        self._response_timeout = response_timeout
 90        self._conn = transport.Connection(link)
 91        self._send_queue = aio.Queue()
 92        self._log = _create_logger_adapter(self._conn.info)
 93
 94        if modbus_type == common.ModbusType.TCP:
 95            self._next_transaction_ids = iter(i % 0x10000
 96                                              for i in itertools.count(1))
 97
 98        self.async_group.spawn(self._send_loop)
 99
100        self._log.debug('master created')
async_group: hat.aio.group.Group
102    @property
103    def async_group(self) -> aio.Group:
104        """Async group"""
105        return self._conn.async_group

Async group

107    @property
108    def info(self) -> tcp.ConnectionInfo | serial.EndpointInfo:
109        """Connection or endpoint info"""
110        return self._conn.info

Connection or endpoint info

async def read( self, device_id: int, data_type: DataType, start_address: int, quantity: int = 1) -> list[int] | Error:
112    async def read(self,
113                   device_id: int,
114                   data_type: common.DataType,
115                   start_address: int,
116                   quantity: int = 1
117                   ) -> list[int] | common.Error:
118        """Read data from modbus device
119
120        Argument `quantity` is ignored if `data_type` is `QUEUE`.
121
122        Args:
123            device_id: slave device identifier
124            data_type: data type
125            start_address: starting modbus data address
126            quantity: number of data values
127
128        Raises:
129            ConnectionError
130            TimeoutError
131
132        """
133        if device_id == 0:
134            raise ValueError('unsupported device id')
135
136        if data_type == common.DataType.COIL:
137            req = transport.ReadCoilsReq(address=start_address,
138                                         quantity=quantity)
139
140        elif data_type == common.DataType.DISCRETE_INPUT:
141            req = transport.ReadDiscreteInputsReq(address=start_address,
142                                                  quantity=quantity)
143
144        elif data_type == common.DataType.HOLDING_REGISTER:
145            req = transport.ReadHoldingRegistersReq(address=start_address,
146                                                    quantity=quantity)
147
148        elif data_type == common.DataType.INPUT_REGISTER:
149            req = transport.ReadInputRegistersReq(address=start_address,
150                                                  quantity=quantity)
151
152        elif data_type == common.DataType.QUEUE:
153            req = transport.ReadFifoQueueReq(address=start_address)
154
155        else:
156            raise ValueError('unsupported data type')
157
158        res = await self._send(device_id, req)
159
160        if isinstance(res, transport.ErrorRes):
161            return res.error
162
163        if isinstance(res, (transport.ReadCoilsRes,
164                            transport.ReadDiscreteInputsRes,
165                            transport.ReadHoldingRegistersRes,
166                            transport.ReadInputRegistersRes)):
167            return res.values[:quantity]
168
169        if isinstance(res, transport.ReadFifoQueueRes):
170            return res.values
171
172        raise ValueError("unsupported response pdu")

Read data from modbus device

Argument quantity is ignored if data_type is QUEUE.

Arguments:
  • device_id: slave device identifier
  • data_type: data type
  • start_address: starting modbus data address
  • quantity: number of data values
Raises:
  • ConnectionError
  • TimeoutError
async def write( self, device_id: int, data_type: DataType, start_address: int, values: List[int]) -> Error | None:
174    async def write(self,
175                    device_id: int,
176                    data_type: common.DataType,
177                    start_address: int,
178                    values: typing.List[int]
179                    ) -> common.Error | None:
180        """Write data to modbus device
181
182        Data types `DISCRETE_INPUT`, `INPUT_REGISTER` and `QUEUE` are not
183        supported.
184
185        Args:
186            device_id: slave device identifier
187            data_type: data type
188            start_address: starting modbus data address
189            values: values
190
191        Raises:
192            ConnectionError
193            TimeoutError
194
195        """
196        if data_type == common.DataType.COIL:
197            if len(values) == 1:
198                req = transport.WriteSingleCoilReq(address=start_address,
199                                                   value=values[0])
200
201            else:
202                req = transport.WriteMultipleCoilsReq(address=start_address,
203                                                      values=values)
204
205        elif data_type == common.DataType.HOLDING_REGISTER:
206            if len(values) == 1:
207                req = transport.WriteSingleRegisterReq(address=start_address,
208                                                       value=values[0])
209
210            else:
211                req = transport.WriteMultipleRegistersReq(
212                    address=start_address,
213                    values=values)
214
215        else:
216            raise ValueError('unsupported data type')
217
218        res = await self._send(device_id, req)
219
220        if isinstance(res, transport.ErrorRes):
221            return res.error
222
223        if not isinstance(res, (transport.WriteSingleCoilRes,
224                                transport.WriteMultipleCoilsRes,
225                                transport.WriteSingleRegisterRes,
226                                transport.WriteMultipleRegistersRes)):
227            raise ValueError("unsupported response pdu")
228
229        if (res.address != start_address):
230            raise Exception("invalid response pdu address")
231
232        if isinstance(res, (transport.WriteSingleCoilRes,
233                            transport.WriteSingleRegisterRes)):
234            if (res.value != values[0]):
235                raise Exception("invalid response pdu value")
236
237        if isinstance(res, (transport.WriteMultipleCoilsRes,
238                            transport.WriteMultipleRegistersRes)):
239            if (res.quantity != len(values)):
240                raise Exception("invalid response pdu quantity")

Write data to modbus device

Data types DISCRETE_INPUT, INPUT_REGISTER and QUEUE are not supported.

Arguments:
  • device_id: slave device identifier
  • data_type: data type
  • start_address: starting modbus data address
  • values: values
Raises:
  • ConnectionError
  • TimeoutError
async def write_mask( self, device_id: int, address: int, and_mask: int, or_mask: int) -> Error | None:
242    async def write_mask(self,
243                         device_id: int,
244                         address: int,
245                         and_mask: int,
246                         or_mask: int
247                         ) -> common.Error | None:
248        """Write mask to modbus device HOLDING_REGISTER
249
250        Args:
251            device_id: slave device identifier
252            address: modbus data address
253            and_mask: and mask
254            or_mask: or mask
255
256        Raises:
257            ConnectionError
258            TimeoutError
259
260        """
261        req = transport.MaskWriteRegisterReq(address=address,
262                                             and_mask=and_mask,
263                                             or_mask=or_mask)
264
265        res = await self._send(device_id, req)
266
267        if isinstance(res, transport.ErrorRes):
268            return res.error
269
270        if not isinstance(res, transport.MaskWriteRegisterRes):
271            raise ValueError("unsupported response pdu")
272
273        if (res.address != address):
274            raise Exception("invalid response pdu address")
275
276        if (res.and_mask != and_mask):
277            raise Exception("invalid response pdu and mask")
278
279        if (res.or_mask != or_mask):
280            raise Exception("invalid response pdu or mask")

Write mask to modbus device HOLDING_REGISTER

Arguments:
  • device_id: slave device identifier
  • address: modbus data address
  • and_mask: and mask
  • or_mask: or mask
Raises:
  • ConnectionError
  • TimeoutError
SlaveCb = typing.Callable[[ForwardRef('Slave')], None | collections.abc.Awaitable[None]]
ReadCb = typing.Callable[[ForwardRef('Slave'), int, DataType, int, int | None], list[int] | Error | collections.abc.Awaitable[list[int] | Error]]
WriteCb = typing.Callable[[ForwardRef('Slave'), int, DataType, int, list[int]], Error | None | collections.abc.Awaitable[Error | None]]
WriteMaskCb = typing.Callable[[ForwardRef('Slave'), int, int, int, int], Error | None | collections.abc.Awaitable[Error | None]]
async def create_tcp_server( modbus_type: ModbusType, addr: hat.drivers.tcp.Address, *, slave_cb: Optional[Callable[[Slave], None | Awaitable[None]]] = None, read_cb: Optional[Callable[[Slave, int, DataType, int, int | None], list[int] | Error | Awaitable[list[int] | Error]]] = None, write_cb: Optional[Callable[[Slave, int, DataType, int, list[int]], Error | None | Awaitable[Error | None]]] = None, write_mask_cb: Optional[Callable[[Slave, int, int, int, int], Error | None | Awaitable[Error | None]]] = None, **kwargs) -> hat.drivers.tcp.Server:
 82async def create_tcp_server(modbus_type: common.ModbusType,
 83                            addr: tcp.Address,
 84                            *,
 85                            slave_cb: SlaveCb | None = None,
 86                            read_cb: ReadCb | None = None,
 87                            write_cb: WriteCb | None = None,
 88                            write_mask_cb: WriteMaskCb | None = None,
 89                            **kwargs
 90                            ) -> tcp.Server:
 91    """Create TCP server
 92
 93    Closing server closes all active associated slaves.
 94
 95    Args:
 96        modbus_type: modbus type
 97        addr: local listening host address
 98        slave_cb: slave callback
 99        read_cb: read callback
100        write_cb: write callback
101        write_mask_cb: write mask callback
102        kwargs: additional arguments used for creating TCP server
103            (see `tcp.listen`)
104
105    """
106
107    async def on_connection(conn):
108        if not conn.is_open:
109            return
110
111        log = _create_logger_adapter(conn.info)
112
113        slave = Slave(link=transport.TcpLink(conn),
114                      modbus_type=modbus_type,
115                      read_cb=read_cb,
116                      write_cb=write_cb,
117                      write_mask_cb=write_mask_cb)
118
119        try:
120            if slave_cb:
121                await aio.call(slave_cb, slave)
122
123            await slave.wait_closing()
124
125        except Exception as e:
126            log.error("error in slave callback: %s", e, exc_info=e)
127
128        finally:
129            await aio.uncancellable(slave.async_close())
130
131    server = await tcp.listen(on_connection, addr,
132                              bind_connections=True,
133                              **kwargs)
134
135    return server

Create TCP server

Closing server closes all active associated slaves.

Arguments:
  • modbus_type: modbus type
  • addr: local listening host address
  • slave_cb: slave callback
  • read_cb: read callback
  • write_cb: write callback
  • write_mask_cb: write mask callback
  • kwargs: additional arguments used for creating TCP server (see tcp.listen)
async def create_serial_slave( modbus_type: ModbusType, port: str, *, read_cb: Optional[Callable[[Slave, int, DataType, int, int | None], list[int] | Error | Awaitable[list[int] | Error]]] = None, write_cb: Optional[Callable[[Slave, int, DataType, int, list[int]], Error | None | Awaitable[Error | None]]] = None, write_mask_cb: Optional[Callable[[Slave, int, int, int, int], Error | None | Awaitable[Error | None]]] = None, silent_interval: float = 0.005, **kwargs) -> Slave:
138async def create_serial_slave(modbus_type: common.ModbusType,
139                              port: str,
140                              *,
141                              read_cb: ReadCb | None = None,
142                              write_cb: WriteCb | None = None,
143                              write_mask_cb: WriteMaskCb | None = None,
144                              silent_interval: float = 0.005,
145                              **kwargs
146                              ) -> 'Slave':
147    """Create serial slave
148
149    Args:
150        modbus_type: modbus type
151        port: port name (see `serial.create`)
152        read_cb: read callback
153        write_cb: write callback
154        write_mask_cb: write mask callback
155        silent_interval: silent interval (see `serial.create`)
156        kwargs: additional arguments used for opening serial connection
157            (see `serial.create`)
158
159    """
160    endpoint = await serial.create(port,
161                                   silent_interval=silent_interval,
162                                   **kwargs)
163
164    try:
165        return Slave(link=transport.SerialLink(endpoint),
166                     modbus_type=modbus_type,
167                     read_cb=read_cb,
168                     write_cb=write_cb,
169                     write_mask_cb=write_mask_cb)
170
171    except BaseException:
172        await aio.uncancellable(endpoint.async_close())
173        raise

Create serial slave

Arguments:
  • modbus_type: modbus type
  • port: port name (see serial.create)
  • read_cb: read callback
  • write_cb: write callback
  • write_mask_cb: write mask callback
  • silent_interval: silent interval (see serial.create)
  • kwargs: additional arguments used for opening serial connection (see serial.create)
class Slave(hat.aio.group.Resource):
176class Slave(aio.Resource):
177    """Modbus slave"""
178
179    def __init__(self,
180                 link: transport.Link,
181                 modbus_type: common.ModbusType,
182                 read_cb: ReadCb | None = None,
183                 write_cb: WriteCb | None = None,
184                 write_mask_cb: WriteMaskCb | None = None):
185        self._modbus_type = modbus_type
186        self._read_cb = read_cb
187        self._write_cb = write_cb
188        self._write_mask_cb = write_mask_cb
189        self._conn = transport.Connection(link)
190        self._log = _create_logger_adapter(self._conn.info)
191
192        self.async_group.spawn(self._receive_loop)
193
194        self._log.debug('slave created')
195
196    @property
197    def async_group(self) -> aio.Group:
198        """Async group"""
199        return self._conn.async_group
200
201    @property
202    def info(self) -> tcp.ConnectionInfo | serial.EndpointInfo:
203        """Connection or endpoint info"""
204        return self._conn.info
205
206    async def _receive_loop(self):
207        self._log.debug("starting slave receive loop")
208        try:
209            while True:
210                try:
211                    self._log.debug("waiting for request")
212                    req_adu = await self._conn.receive(
213                        modbus_type=self._modbus_type,
214                        direction=transport.Direction.REQUEST)
215
216                except ConnectionError:
217                    break
218
219                except Exception as e:
220                    self._log.warning("error receiving request: %s", e,
221                                      exc_info=e)
222                    continue
223
224                device_id = req_adu.device_id
225                req = req_adu.pdu
226
227                try:
228                    self._log.debug("processing request (device_id %s): %s",
229                                    device_id, req)
230                    res = await self._process_request(device_id, req)
231
232                except Exception as e:
233                    self._log.warning("error processing request: %s", e,
234                                      exc_info=e)
235                    continue
236
237                if device_id == 0:
238                    self._log.debug(
239                        "skip sending response (broadcast request): %s", res)
240                    continue
241
242                self._log.debug("sending response: %s", res)
243
244                if self._modbus_type == common.ModbusType.TCP:
245                    res_adu = transport.TcpAdu(
246                        transaction_id=req_adu.transaction_id,
247                        device_id=req_adu.device_id,
248                        pdu=res)
249
250                elif self._modbus_type == common.ModbusType.RTU:
251                    res_adu = transport.RtuAdu(device_id=req_adu.device_id,
252                                               pdu=res)
253
254                elif self._modbus_type == common.ModbusType.ASCII:
255                    res_adu = transport.AsciiAdu(device_id=req_adu.device_id,
256                                                 pdu=res)
257
258                else:
259                    raise ValueError("invalid modbus type")
260
261                try:
262                    await self._conn.send(res_adu)
263
264                except ConnectionError:
265                    break
266
267                except Exception as e:
268                    self._log.warning("error sending response: %s", e,
269                                      exc_info=e)
270                    continue
271
272        except Exception as e:
273            self._log.error("receive loop error: %s", e, exc_info=e)
274
275        finally:
276            self._log.debug("closing slave receive loop")
277            self.close()
278
279    async def _process_request(self, device_id, req):
280        if isinstance(req, transport.ReadCoilsReq):
281            result = await self._call_read_cb(
282                device_id=device_id,
283                data_type=common.DataType.COIL,
284                start_address=req.address,
285                quantity=req.quantity)
286
287            if isinstance(result, common.Error):
288                return transport.ErrorRes(
289                    fc=transport.FunctionCode.READ_COILS,
290                    error=result)
291
292            return transport.ReadCoilsRes(values=result)
293
294        if isinstance(req, transport.ReadDiscreteInputsReq):
295            result = await self._call_read_cb(
296                device_id=device_id,
297                data_type=common.DataType.DISCRETE_INPUT,
298                start_address=req.address,
299                quantity=req.quantity)
300
301            if isinstance(result, common.Error):
302                return transport.ErrorRes(
303                    fc=transport.FunctionCode.READ_DISCRETE_INPUTS,
304                    error=result)
305
306            return transport.ReadDiscreteInputsRes(values=result)
307
308        if isinstance(req, transport.ReadHoldingRegistersReq):
309            result = await self._call_read_cb(
310                device_id=device_id,
311                data_type=common.DataType.HOLDING_REGISTER,
312                start_address=req.address,
313                quantity=req.quantity)
314
315            if isinstance(result, common.Error):
316                return transport.ErrorRes(
317                    fc=transport.FunctionCode.READ_HOLDING_REGISTERS,
318                    error=result)
319
320            return transport.ReadHoldingRegistersRes(values=result)
321
322        if isinstance(req, transport.ReadInputRegistersReq):
323            result = await self._call_read_cb(
324                device_id=device_id,
325                data_type=common.DataType.INPUT_REGISTER,
326                start_address=req.address,
327                quantity=req.quantity)
328
329            if isinstance(result, common.Error):
330                return transport.ErrorRes(
331                    fc=transport.FunctionCode.READ_INPUT_REGISTERS,
332                    error=result)
333
334            return transport.ReadInputRegistersRes(values=result)
335
336        if isinstance(req, transport.WriteSingleCoilReq):
337            result = await self._call_write_cb(
338                device_id=device_id,
339                data_type=common.DataType.COIL,
340                start_address=req.address,
341                values=[req.value])
342
343            if isinstance(result, common.Error):
344                return transport.ErrorRes(
345                    fc=transport.FunctionCode.WRITE_SINGLE_COIL,
346                    error=result)
347
348            return transport.WriteSingleCoilRes(address=req.address,
349                                                value=req.value)
350
351        if isinstance(req, transport.WriteSingleRegisterReq):
352            result = await self._call_write_cb(
353                device_id=device_id,
354                data_type=common.DataType.HOLDING_REGISTER,
355                start_address=req.address,
356                values=[req.value])
357
358            if isinstance(result, common.Error):
359                return transport.ErrorRes(
360                    fc=transport.FunctionCode.WRITE_SINGLE_REGISTER,
361                    error=result)
362
363            return transport.WriteSingleRegisterRes(address=req.address,
364                                                    value=req.value)
365
366        if isinstance(req, transport.WriteMultipleCoilsReq):
367            result = await self._call_write_cb(
368                device_id=device_id,
369                data_type=common.DataType.COIL,
370                start_address=req.address,
371                values=req.values)
372
373            if isinstance(result, common.Error):
374                return transport.ErrorRes(
375                    fc=transport.FunctionCode.WRITE_MULTIPLE_COILS,
376                    error=result)
377
378            return transport.WriteMultipleCoilsRes(address=req.address,
379                                                   quantity=len(req.values))
380
381        if isinstance(req, transport.WriteMultipleRegistersReq):
382            result = await self._call_write_cb(
383                device_id=device_id,
384                data_type=common.DataType.HOLDING_REGISTER,
385                start_address=req.address,
386                values=req.values)
387
388            if isinstance(result, common.Error):
389                return transport.ErrorRes(
390                    fc=transport.FunctionCode.WRITE_MULTIPLE_REGISTER,
391                    error=result)
392
393            return transport.WriteMultipleRegistersRes(
394                address=req.address,
395                quantity=len(req.values))
396
397        if isinstance(req, transport.MaskWriteRegisterReq):
398            result = await self._call_write_mask_cb(
399                device_id=device_id,
400                address=req.address,
401                and_mask=req.and_mask,
402                or_mask=req.or_mask)
403
404            if isinstance(result, common.Error):
405                return transport.ErrorRes(
406                    fc=transport.FunctionCode.MASK_WRITE_REGISTER,
407                    error=result)
408
409            return transport.MaskWriteRegisterRes(address=req.address,
410                                                  and_mask=req.and_mask,
411                                                  or_mask=req.or_mask)
412
413        if isinstance(req, transport.ReadFifoQueueReq):
414            result = await self._call_read_cb(
415                device_id=device_id,
416                data_type=common.DataType.QUEUE,
417                start_address=req.address,
418                quantity=None)
419
420            if isinstance(result, common.Error):
421                return transport.ErrorRes(
422                    fc=transport.FunctionCode.READ_FIFO_QUEUE,
423                    error=result)
424
425            return transport.ReadFifoQueueRes(values=result)
426
427        return transport.ErrorRes(fc=transport.get_pdu_function_code(req),
428                                  error=common.Error.INVALID_FUNCTION_CODE)
429
430    async def _call_read_cb(self, device_id, data_type, start_address,
431                            quantity):
432        if not self._read_cb:
433            self._log.debug("read callback not defined")
434            return common.Error.FUNCTION_ERROR
435
436        try:
437            return await aio.call(self._read_cb, self, device_id, data_type,
438                                  start_address, quantity)
439
440        except Exception as e:
441            self._log.warning("error in read callback: %s", e,
442                              exc_info=e)
443            return common.Error.FUNCTION_ERROR
444
445    async def _call_write_cb(self, device_id, data_type, start_address,
446                             values):
447        if not self._write_cb:
448            self._log.debug("write callback not defined")
449            return common.Error.FUNCTION_ERROR
450
451        try:
452            return await aio.call(self._write_cb, self, device_id, data_type,
453                                  start_address, values)
454
455        except Exception as e:
456            self._log.warning("error in write callback: %s", e,
457                              exc_info=e)
458            return common.Error.FUNCTION_ERROR
459
460    async def _call_write_mask_cb(self, device_id, address, and_mask,
461                                  or_mask):
462        if not self._write_mask_cb:
463            self._log.debug("write mask callback not defined")
464            return common.Error.FUNCTION_ERROR
465
466        try:
467            return await aio.call(self._write_mask_cb, self, device_id,
468                                  address, and_mask, or_mask)
469
470        except Exception as e:
471            self._log.warning("error in write mask callback: %s", e,
472                              exc_info=e)
473            return common.Error.FUNCTION_ERROR

Modbus slave

Slave( link: hat.drivers.modbus.transport.connection.Link, modbus_type: ModbusType, read_cb: Optional[Callable[[Slave, int, DataType, int, int | None], list[int] | Error | Awaitable[list[int] | Error]]] = None, write_cb: Optional[Callable[[Slave, int, DataType, int, list[int]], Error | None | Awaitable[Error | None]]] = None, write_mask_cb: Optional[Callable[[Slave, int, int, int, int], Error | None | Awaitable[Error | None]]] = None)
179    def __init__(self,
180                 link: transport.Link,
181                 modbus_type: common.ModbusType,
182                 read_cb: ReadCb | None = None,
183                 write_cb: WriteCb | None = None,
184                 write_mask_cb: WriteMaskCb | None = None):
185        self._modbus_type = modbus_type
186        self._read_cb = read_cb
187        self._write_cb = write_cb
188        self._write_mask_cb = write_mask_cb
189        self._conn = transport.Connection(link)
190        self._log = _create_logger_adapter(self._conn.info)
191
192        self.async_group.spawn(self._receive_loop)
193
194        self._log.debug('slave created')
async_group: hat.aio.group.Group
196    @property
197    def async_group(self) -> aio.Group:
198        """Async group"""
199        return self._conn.async_group

Async group

201    @property
202    def info(self) -> tcp.ConnectionInfo | serial.EndpointInfo:
203        """Connection or endpoint info"""
204        return self._conn.info

Connection or endpoint info