hat.drivers.modbus

Modbus communication protocol

 1"""Modbus communication protocol"""
 2
 3from hat.drivers.modbus.common import (ModbusType,
 4                                       DataType,
 5                                       Error,
 6                                       apply_mask)
 7from hat.drivers.modbus.master import (create_tcp_master,
 8                                       create_serial_master,
 9                                       Master)
10from hat.drivers.modbus.slave import (SlaveCb,
11                                      ReadCb,
12                                      WriteCb,
13                                      WriteMaskCb,
14                                      create_tcp_server,
15                                      create_serial_slave,
16                                      Slave)
17
18
19__all__ = ['ModbusType',
20           'DataType',
21           'Error',
22           'apply_mask',
23           'create_tcp_master',
24           'create_serial_master',
25           'Master',
26           'SlaveCb',
27           'ReadCb',
28           'WriteCb',
29           'WriteMaskCb',
30           'create_tcp_server',
31           'create_serial_slave',
32           'Slave']
class ModbusType(enum.Enum):
 7class ModbusType(enum.Enum):
 8    TCP = 0
 9    RTU = 1
10    ASCII = 2

An enumeration.

TCP = <ModbusType.TCP: 0>
RTU = <ModbusType.RTU: 1>
ASCII = <ModbusType.ASCII: 2>
class DataType(enum.Enum):
13class DataType(enum.Enum):
14    COIL = 1
15    DISCRETE_INPUT = 2
16    HOLDING_REGISTER = 3
17    INPUT_REGISTER = 4
18    QUEUE = 5

An enumeration.

COIL = <DataType.COIL: 1>
DISCRETE_INPUT = <DataType.DISCRETE_INPUT: 2>
HOLDING_REGISTER = <DataType.HOLDING_REGISTER: 3>
INPUT_REGISTER = <DataType.INPUT_REGISTER: 4>
QUEUE = <DataType.QUEUE: 5>
class Error(enum.Enum):
21class Error(enum.Enum):
22    INVALID_FUNCTION_CODE = 0x01
23    INVALID_DATA_ADDRESS = 0x02
24    INVALID_DATA_VALUE = 0x03
25    FUNCTION_ERROR = 0x04
26    GATEWAY_PATH_UNAVAILABLE = 0x0a
27    GATEWAY_TARGET_DEVICE_FAILED_TO_RESPOND = 0x0b

An enumeration.

INVALID_FUNCTION_CODE = <Error.INVALID_FUNCTION_CODE: 1>
INVALID_DATA_ADDRESS = <Error.INVALID_DATA_ADDRESS: 2>
INVALID_DATA_VALUE = <Error.INVALID_DATA_VALUE: 3>
FUNCTION_ERROR = <Error.FUNCTION_ERROR: 4>
GATEWAY_PATH_UNAVAILABLE = <Error.GATEWAY_PATH_UNAVAILABLE: 10>
GATEWAY_TARGET_DEVICE_FAILED_TO_RESPOND = <Error.GATEWAY_TARGET_DEVICE_FAILED_TO_RESPOND: 11>
def apply_mask(value: int, and_mask: int, or_mask: int) -> int:
30def apply_mask(value: int, and_mask: int, or_mask: int) -> int:
31    """Apply mask to value"""
32    return (value & and_mask) | (or_mask & (~and_mask))

Apply mask to value

async def create_tcp_master( modbus_type: ModbusType, addr: hat.drivers.tcp.Address, response_timeout: float | None = None, **kwargs) -> Master:
20async def create_tcp_master(modbus_type: common.ModbusType,
21                            addr: tcp.Address,
22                            response_timeout: float | None = None,
23                            **kwargs
24                            ) -> 'Master':
25    """Create TCP master
26
27    Args:
28        modbus_type: modbus type
29        addr: remote host address
30        response_timeout: response timeout in seconds
31        kwargs: additional arguments used for creating TCP connection
32            (see `tcp.connect`)
33
34    """
35    conn = await transport.tcp_connect(addr, **kwargs)
36    return Master(conn, modbus_type, response_timeout)

Create TCP master

Arguments:
  • modbus_type: modbus type
  • addr: remote host address
  • response_timeout: response timeout in seconds
  • kwargs: additional arguments used for creating TCP connection (see tcp.connect)
async def create_serial_master( modbus_type: ModbusType, port: str, *, silent_interval: float = 0.005, response_timeout: float | None = None, **kwargs) -> Master:
39async def create_serial_master(modbus_type: common.ModbusType,
40                               port: str, *,
41                               silent_interval: float = 0.005,
42                               response_timeout: float | None = None,
43                               **kwargs
44                               ) -> 'Master':
45    """Create serial master
46
47    Args:
48        modbus_type: modbus type
49        port: port name (see `serial.create`)
50        silent_interval: silent interval (see `serial.create`)
51        response_timeout: response timeout in seconds
52        kwargs: additional arguments used for opening serial connection
53            (see `serial.create`)
54
55    """
56    conn = await transport.serial_create(port,
57                                         silent_interval=silent_interval,
58                                         **kwargs)
59    return Master(conn, modbus_type, response_timeout)

Create serial master

Arguments:
  • modbus_type: modbus type
  • port: port name (see serial.create)
  • silent_interval: silent interval (see serial.create)
  • response_timeout: response timeout in seconds
  • kwargs: additional arguments used for opening serial connection (see serial.create)
class Master(hat.aio.group.Resource):
 62class Master(aio.Resource):
 63    """Modbus master"""
 64
 65    def __init__(self,
 66                 conn: transport.Connection,
 67                 modbus_type: common.ModbusType,
 68                 response_timeout: float | None):
 69        self._conn = conn
 70        self._modbus_type = modbus_type
 71        self._response_timeout = response_timeout
 72        self._send_queue = aio.Queue()
 73
 74        if modbus_type == common.ModbusType.TCP:
 75            self._next_transaction_ids = iter(i % 0x10000
 76                                              for i in itertools.count(1))
 77
 78        self.async_group.spawn(self._send_loop)
 79
 80    @property
 81    def async_group(self) -> aio.Group:
 82        """Async group"""
 83        return self._conn.async_group
 84
 85    @property
 86    def log_prefix(self) -> str:
 87        """Logging prefix"""
 88        return self._conn.log_prefix
 89
 90    async def read(self,
 91                   device_id: int,
 92                   data_type: common.DataType,
 93                   start_address: int,
 94                   quantity: int = 1
 95                   ) -> list[int] | common.Error:
 96        """Read data from modbus device
 97
 98        Argument `quantity` is ignored if `data_type` is `QUEUE`.
 99
100        Args:
101            device_id: slave device identifier
102            data_type: data type
103            start_address: starting modbus data address
104            quantity: number of data values
105
106        Raises:
107            ConnectionError
108            TimeoutError
109
110        """
111        if device_id == 0:
112            raise ValueError('unsupported device id')
113
114        if data_type == common.DataType.COIL:
115            req = transport.ReadCoilsReq(address=start_address,
116                                         quantity=quantity)
117
118        elif data_type == common.DataType.DISCRETE_INPUT:
119            req = transport.ReadDiscreteInputsReq(address=start_address,
120                                                  quantity=quantity)
121
122        elif data_type == common.DataType.HOLDING_REGISTER:
123            req = transport.ReadHoldingRegistersReq(address=start_address,
124                                                    quantity=quantity)
125
126        elif data_type == common.DataType.INPUT_REGISTER:
127            req = transport.ReadInputRegistersReq(address=start_address,
128                                                  quantity=quantity)
129
130        elif data_type == common.DataType.QUEUE:
131            req = transport.ReadFifoQueueReq(address=start_address)
132
133        else:
134            raise ValueError('unsupported data type')
135
136        res = await self._send(device_id, req)
137
138        if isinstance(res, transport.ErrorRes):
139            return res.error
140
141        if isinstance(res, (transport.ReadCoilsRes,
142                            transport.ReadDiscreteInputsRes,
143                            transport.ReadHoldingRegistersRes,
144                            transport.ReadInputRegistersRes)):
145            return res.values[:quantity]
146
147        if isinstance(res, transport.ReadFifoQueueRes):
148            return res.values
149
150        raise ValueError("unsupported response pdu")
151
152    async def write(self,
153                    device_id: int,
154                    data_type: common.DataType,
155                    start_address: int,
156                    values: typing.List[int]
157                    ) -> common.Error | None:
158        """Write data to modbus device
159
160        Data types `DISCRETE_INPUT`, `INPUT_REGISTER` and `QUEUE` are not
161        supported.
162
163        Args:
164            device_id: slave device identifier
165            data_type: data type
166            start_address: starting modbus data address
167            values: values
168
169        Raises:
170            ConnectionError
171            TimeoutError
172
173        """
174        if data_type == common.DataType.COIL:
175            if len(values) == 1:
176                req = transport.WriteSingleCoilReq(address=start_address,
177                                                   value=values[0])
178
179            else:
180                req = transport.WriteMultipleCoilsReq(address=start_address,
181                                                      values=values)
182
183        elif data_type == common.DataType.HOLDING_REGISTER:
184            if len(values) == 1:
185                req = transport.WriteSingleRegisterReq(address=start_address,
186                                                       value=values[0])
187
188            else:
189                req = transport.WriteMultipleRegistersReq(
190                    address=start_address,
191                    values=values)
192
193        else:
194            raise ValueError('unsupported data type')
195
196        res = await self._send(device_id, req)
197
198        if isinstance(res, transport.ErrorRes):
199            return res.error
200
201        if not isinstance(res, (transport.WriteSingleCoilRes,
202                                transport.WriteMultipleCoilsRes,
203                                transport.WriteSingleRegisterRes,
204                                transport.WriteMultipleRegistersRes)):
205            raise ValueError("unsupported response pdu")
206
207        if (res.address != start_address):
208            raise Exception("invalid response pdu address")
209
210        if isinstance(res, (transport.WriteSingleCoilRes,
211                            transport.WriteSingleRegisterRes)):
212            if (res.value != values[0]):
213                raise Exception("invalid response pdu value")
214
215        if isinstance(res, (transport.WriteMultipleCoilsRes,
216                            transport.WriteMultipleRegistersRes)):
217            if (res.quantity != len(values)):
218                raise Exception("invalid response pdu quantity")
219
220    async def write_mask(self,
221                         device_id: int,
222                         address: int,
223                         and_mask: int,
224                         or_mask: int
225                         ) -> common.Error | None:
226        """Write mask to modbus device HOLDING_REGISTER
227
228        Args:
229            device_id: slave device identifier
230            address: modbus data address
231            and_mask: and mask
232            or_mask: or mask
233
234        Raises:
235            ConnectionError
236            TimeoutError
237
238        """
239        req = transport.MaskWriteRegisterReq(address=address,
240                                             and_mask=and_mask,
241                                             or_mask=or_mask)
242
243        res = await self._send(device_id, req)
244
245        if isinstance(res, transport.ErrorRes):
246            return res.error
247
248        if not isinstance(res, transport.MaskWriteRegisterRes):
249            raise ValueError("unsupported response pdu")
250
251        if (res.address != address):
252            raise Exception("invalid response pdu address")
253
254        if (res.and_mask != and_mask):
255            raise Exception("invalid response pdu and mask")
256
257        if (res.or_mask != or_mask):
258            raise Exception("invalid response pdu or mask")
259
260    async def _send(self, device_id, req):
261        if self._modbus_type == common.ModbusType.TCP:
262            req_adu = transport.TcpAdu(
263                transaction_id=next(self._next_transaction_ids),
264                device_id=device_id,
265                pdu=req)
266
267        elif self._modbus_type == common.ModbusType.RTU:
268            req_adu = transport.RtuAdu(device_id=device_id,
269                                       pdu=req)
270
271        elif self._modbus_type == common.ModbusType.ASCII:
272            req_adu = transport.AsciiAdu(device_id=device_id,
273                                         pdu=req)
274
275        else:
276            raise ValueError("unsupported modbus type")
277
278        future = asyncio.Future()
279        try:
280            self._send_queue.put_nowait((req_adu, future))
281            res_adu = await future
282
283        except aio.QueueClosedError:
284            raise ConnectionError()
285
286        return res_adu.pdu
287
288    async def _receive(self, req_adu):
289        while True:
290            res_adu = await self._conn.receive(self._modbus_type,
291                                               transport.Direction.RESPONSE)
292
293            if isinstance(res_adu, transport.TcpAdu):
294                if res_adu.transaction_id != req_adu.transaction_id:
295                    self._log(logging.WARNING,
296                              "discarding response adu: "
297                              "invalid response transaction id")
298                    continue
299
300            if res_adu.device_id != req_adu.device_id:
301                self._log(logging.WARNING,
302                          "discarding response adu: "
303                          "invalid response device id")
304                continue
305
306            req_fc = transport.get_pdu_function_code(req_adu.pdu)
307            res_fc = transport.get_pdu_function_code(res_adu.pdu)
308            if req_fc != res_fc:
309                self._log(logging.WARNING,
310                          "discarding response adu: "
311                          "invalid response function code")
312                continue
313
314            return res_adu
315
316    async def _send_loop(self):
317        self._log(logging.DEBUG, "starting master send loop")
318        future = None
319        try:
320            while self.is_open:
321                # req_adu, future = await self._send_queue.get()
322
323                async with self.async_group.create_subgroup() as subgroup:
324                    subgroup.spawn(self._reset_input_buffer_loop)
325                    self._log(logging.DEBUG,
326                              "started discarding incomming data")
327
328                    while not future or future.done():
329                        req_adu, future = await self._send_queue.get()
330
331                await self._reset_input_buffer()
332                self._log(logging.DEBUG, "stopped discarding incomming data")
333
334                await self._conn.send(req_adu)
335                await self._conn.drain()
336
337                async with self.async_group.create_subgroup(
338                        log_exceptions=False) as subgroup:
339                    receive_task = subgroup.spawn(self._receive, req_adu)
340
341                    await asyncio.wait([receive_task, future],
342                                       timeout=self._response_timeout,
343                                       return_when=asyncio.FIRST_COMPLETED)
344
345                    if future.done():
346                        continue
347
348                    if receive_task.done():
349                        future.set_result(receive_task.result())
350
351                    else:
352                        future.set_exception(TimeoutError())
353
354        except ConnectionError:
355            pass
356
357        except Exception as e:
358            self._log(logging.ERROR, "error in send loop: %s", e, exc_info=e)
359
360        finally:
361            self._log(logging.DEBUG, "stopping master send loop")
362            self.close()
363            self._send_queue.close()
364
365            while True:
366                if future and not future.done():
367                    future.set_exception(ConnectionError())
368                if self._send_queue.empty():
369                    break
370                _, future = self._send_queue.get_nowait()
371
372    async def _reset_input_buffer_loop(self):
373        try:
374            while True:
375                await self._reset_input_buffer()
376
377                await self._conn.read_byte()
378                self._log(logging.DEBUG, "discarded 1 byte from input buffer")
379
380        except ConnectionError:
381            self.close()
382
383        except Exception as e:
384            self._log(logging.ERROR, "error in reset input buffer loop: %s", e,
385                      exc_info=e)
386            self.close()
387
388    async def _reset_input_buffer(self):
389        count = await self._conn.reset_input_buffer()
390        if not count:
391            return
392        self._log(logging.DEBUG, "discarded %s bytes from input buffer", count)
393
394    def _log(self, level, msg, *args, **kwargs):
395        if not mlog.isEnabledFor(level):
396            return
397
398        mlog.log(level, f"{self.log_prefix}: {msg}", *args, **kwargs)

Modbus master

Master( conn: hat.drivers.modbus.transport.connection.Connection, modbus_type: ModbusType, response_timeout: float | None)
65    def __init__(self,
66                 conn: transport.Connection,
67                 modbus_type: common.ModbusType,
68                 response_timeout: float | None):
69        self._conn = conn
70        self._modbus_type = modbus_type
71        self._response_timeout = response_timeout
72        self._send_queue = aio.Queue()
73
74        if modbus_type == common.ModbusType.TCP:
75            self._next_transaction_ids = iter(i % 0x10000
76                                              for i in itertools.count(1))
77
78        self.async_group.spawn(self._send_loop)
async_group: hat.aio.group.Group
80    @property
81    def async_group(self) -> aio.Group:
82        """Async group"""
83        return self._conn.async_group

Async group

log_prefix: str
85    @property
86    def log_prefix(self) -> str:
87        """Logging prefix"""
88        return self._conn.log_prefix

Logging prefix

async def read( self, device_id: int, data_type: DataType, start_address: int, quantity: int = 1) -> list[int] | Error:
 90    async def read(self,
 91                   device_id: int,
 92                   data_type: common.DataType,
 93                   start_address: int,
 94                   quantity: int = 1
 95                   ) -> list[int] | common.Error:
 96        """Read data from modbus device
 97
 98        Argument `quantity` is ignored if `data_type` is `QUEUE`.
 99
100        Args:
101            device_id: slave device identifier
102            data_type: data type
103            start_address: starting modbus data address
104            quantity: number of data values
105
106        Raises:
107            ConnectionError
108            TimeoutError
109
110        """
111        if device_id == 0:
112            raise ValueError('unsupported device id')
113
114        if data_type == common.DataType.COIL:
115            req = transport.ReadCoilsReq(address=start_address,
116                                         quantity=quantity)
117
118        elif data_type == common.DataType.DISCRETE_INPUT:
119            req = transport.ReadDiscreteInputsReq(address=start_address,
120                                                  quantity=quantity)
121
122        elif data_type == common.DataType.HOLDING_REGISTER:
123            req = transport.ReadHoldingRegistersReq(address=start_address,
124                                                    quantity=quantity)
125
126        elif data_type == common.DataType.INPUT_REGISTER:
127            req = transport.ReadInputRegistersReq(address=start_address,
128                                                  quantity=quantity)
129
130        elif data_type == common.DataType.QUEUE:
131            req = transport.ReadFifoQueueReq(address=start_address)
132
133        else:
134            raise ValueError('unsupported data type')
135
136        res = await self._send(device_id, req)
137
138        if isinstance(res, transport.ErrorRes):
139            return res.error
140
141        if isinstance(res, (transport.ReadCoilsRes,
142                            transport.ReadDiscreteInputsRes,
143                            transport.ReadHoldingRegistersRes,
144                            transport.ReadInputRegistersRes)):
145            return res.values[:quantity]
146
147        if isinstance(res, transport.ReadFifoQueueRes):
148            return res.values
149
150        raise ValueError("unsupported response pdu")

Read data from modbus device

Argument quantity is ignored if data_type is QUEUE.

Arguments:
  • device_id: slave device identifier
  • data_type: data type
  • start_address: starting modbus data address
  • quantity: number of data values
Raises:
  • ConnectionError
  • TimeoutError
async def write( self, device_id: int, data_type: DataType, start_address: int, values: List[int]) -> Error | None:
152    async def write(self,
153                    device_id: int,
154                    data_type: common.DataType,
155                    start_address: int,
156                    values: typing.List[int]
157                    ) -> common.Error | None:
158        """Write data to modbus device
159
160        Data types `DISCRETE_INPUT`, `INPUT_REGISTER` and `QUEUE` are not
161        supported.
162
163        Args:
164            device_id: slave device identifier
165            data_type: data type
166            start_address: starting modbus data address
167            values: values
168
169        Raises:
170            ConnectionError
171            TimeoutError
172
173        """
174        if data_type == common.DataType.COIL:
175            if len(values) == 1:
176                req = transport.WriteSingleCoilReq(address=start_address,
177                                                   value=values[0])
178
179            else:
180                req = transport.WriteMultipleCoilsReq(address=start_address,
181                                                      values=values)
182
183        elif data_type == common.DataType.HOLDING_REGISTER:
184            if len(values) == 1:
185                req = transport.WriteSingleRegisterReq(address=start_address,
186                                                       value=values[0])
187
188            else:
189                req = transport.WriteMultipleRegistersReq(
190                    address=start_address,
191                    values=values)
192
193        else:
194            raise ValueError('unsupported data type')
195
196        res = await self._send(device_id, req)
197
198        if isinstance(res, transport.ErrorRes):
199            return res.error
200
201        if not isinstance(res, (transport.WriteSingleCoilRes,
202                                transport.WriteMultipleCoilsRes,
203                                transport.WriteSingleRegisterRes,
204                                transport.WriteMultipleRegistersRes)):
205            raise ValueError("unsupported response pdu")
206
207        if (res.address != start_address):
208            raise Exception("invalid response pdu address")
209
210        if isinstance(res, (transport.WriteSingleCoilRes,
211                            transport.WriteSingleRegisterRes)):
212            if (res.value != values[0]):
213                raise Exception("invalid response pdu value")
214
215        if isinstance(res, (transport.WriteMultipleCoilsRes,
216                            transport.WriteMultipleRegistersRes)):
217            if (res.quantity != len(values)):
218                raise Exception("invalid response pdu quantity")

Write data to modbus device

Data types DISCRETE_INPUT, INPUT_REGISTER and QUEUE are not supported.

Arguments:
  • device_id: slave device identifier
  • data_type: data type
  • start_address: starting modbus data address
  • values: values
Raises:
  • ConnectionError
  • TimeoutError
async def write_mask( self, device_id: int, address: int, and_mask: int, or_mask: int) -> Error | None:
220    async def write_mask(self,
221                         device_id: int,
222                         address: int,
223                         and_mask: int,
224                         or_mask: int
225                         ) -> common.Error | None:
226        """Write mask to modbus device HOLDING_REGISTER
227
228        Args:
229            device_id: slave device identifier
230            address: modbus data address
231            and_mask: and mask
232            or_mask: or mask
233
234        Raises:
235            ConnectionError
236            TimeoutError
237
238        """
239        req = transport.MaskWriteRegisterReq(address=address,
240                                             and_mask=and_mask,
241                                             or_mask=or_mask)
242
243        res = await self._send(device_id, req)
244
245        if isinstance(res, transport.ErrorRes):
246            return res.error
247
248        if not isinstance(res, transport.MaskWriteRegisterRes):
249            raise ValueError("unsupported response pdu")
250
251        if (res.address != address):
252            raise Exception("invalid response pdu address")
253
254        if (res.and_mask != and_mask):
255            raise Exception("invalid response pdu and mask")
256
257        if (res.or_mask != or_mask):
258            raise Exception("invalid response pdu or mask")

Write mask to modbus device HOLDING_REGISTER

Arguments:
  • device_id: slave device identifier
  • address: modbus data address
  • and_mask: and mask
  • or_mask: or mask
Raises:
  • ConnectionError
  • TimeoutError
SlaveCb = typing.Callable[[ForwardRef('Slave')], typing.Optional[typing.Awaitable[NoneType]]]
ReadCb = typing.Callable[[ForwardRef('Slave'), int, DataType, int, int | None], typing.Union[list[int], Error, typing.Awaitable[list[int] | Error]]]
WriteCb = typing.Callable[[ForwardRef('Slave'), int, DataType, int, list[int]], typing.Union[Error, NoneType, typing.Awaitable[Error | None]]]
WriteMaskCb = typing.Callable[[ForwardRef('Slave'), int, int, int, int], typing.Union[Error, NoneType, typing.Awaitable[Error | None]]]
async def create_tcp_server( modbus_type: ModbusType, addr: hat.drivers.tcp.Address, slave_cb: Optional[Callable[[Slave], Optional[Awaitable[NoneType]]]] = None, read_cb: Optional[Callable[[Slave, int, DataType, int, int | None], Union[list[int], Error, Awaitable[list[int] | Error]]]] = None, write_cb: Optional[Callable[[Slave, int, DataType, int, list[int]], Union[Error, NoneType, Awaitable[Error | None]]]] = None, write_mask_cb: Optional[Callable[[Slave, int, int, int, int], Union[Error, NoneType, Awaitable[Error | None]]]] = None, **kwargs) -> hat.drivers.tcp.Server:
 81async def create_tcp_server(modbus_type: common.ModbusType,
 82                            addr: tcp.Address,
 83                            slave_cb: typing.Optional[SlaveCb] = None,
 84                            read_cb: typing.Optional[ReadCb] = None,
 85                            write_cb: typing.Optional[WriteCb] = None,
 86                            write_mask_cb: typing.Optional[WriteMaskCb] = None,
 87                            **kwargs
 88                            ) -> tcp.Server:
 89    """Create TCP server
 90
 91    Closing server closes all active associated slaves.
 92
 93    Args:
 94        modbus_type: modbus type
 95        addr: local listening host address
 96        slave_cb: slave callback
 97        read_cb: read callback
 98        write_cb: write callback
 99        write_mask_cb: write mask callback
100        kwargs: additional arguments used for creating TCP server
101            (see `tcp.listen`)
102
103    """
104
105    async def on_connection(conn):
106        slave = Slave(conn=conn,
107                      modbus_type=modbus_type,
108                      read_cb=read_cb,
109                      write_cb=write_cb,
110                      write_mask_cb=write_mask_cb)
111
112        try:
113            if slave_cb:
114                await aio.call(slave_cb, slave)
115
116            await slave.wait_closing()
117
118        except Exception as e:
119            if mlog.isEnabledFor(logging.ERROR):
120                mlog.error(f"tcp local ({addr.host}:{addr.port}): "
121                           f"error in slave callback: %s", e, exc_info=e)
122
123        finally:
124            slave.close()
125
126    return await transport.tcp_listen(on_connection, addr, **kwargs)

Create TCP server

Closing server closes all active associated slaves.

Arguments:
  • modbus_type: modbus type
  • addr: local listening host address
  • slave_cb: slave callback
  • read_cb: read callback
  • write_cb: write callback
  • write_mask_cb: write mask callback
  • kwargs: additional arguments used for creating TCP server (see tcp.listen)
async def create_serial_slave( modbus_type: ModbusType, port: str, read_cb: Optional[Callable[[Slave, int, DataType, int, int | None], Union[list[int], Error, Awaitable[list[int] | Error]]]] = None, write_cb: Optional[Callable[[Slave, int, DataType, int, list[int]], Union[Error, NoneType, Awaitable[Error | None]]]] = None, write_mask_cb: Optional[Callable[[Slave, int, int, int, int], Union[Error, NoneType, Awaitable[Error | None]]]] = None, silent_interval: float = 0.005, **kwargs) -> Slave:
129async def create_serial_slave(modbus_type: common.ModbusType,
130                              port: str,
131                              read_cb: typing.Optional[ReadCb] = None,
132                              write_cb: typing.Optional[WriteCb] = None,
133                              write_mask_cb: typing.Optional[WriteMaskCb] = None,  # NOQA
134                              silent_interval: float = 0.005,
135                              **kwargs
136                              ) -> 'Slave':
137    """Create serial slave
138
139    Args:
140        modbus_type: modbus type
141        port: port name (see `serial.create`)
142        read_cb: read callback
143        write_cb: write callback
144        write_mask_cb: write mask callback
145        silent_interval: silent interval (see `serial.create`)
146        kwargs: additional arguments used for opening serial connection
147            (see `serial.create`)
148
149    """
150    conn = await transport.serial_create(port,
151                                         silent_interval=silent_interval,
152                                         **kwargs)
153    return Slave(conn=conn,
154                 modbus_type=modbus_type,
155                 read_cb=read_cb,
156                 write_cb=write_cb,
157                 write_mask_cb=write_mask_cb)

Create serial slave

Arguments:
  • modbus_type: modbus type
  • port: port name (see serial.create)
  • read_cb: read callback
  • write_cb: write callback
  • write_mask_cb: write mask callback
  • silent_interval: silent interval (see serial.create)
  • kwargs: additional arguments used for opening serial connection (see serial.create)
class Slave(hat.aio.group.Resource):
160class Slave(aio.Resource):
161    """Modbus slave"""
162
163    def __init__(self,
164                 conn: transport.Connection,
165                 modbus_type: common.ModbusType,
166                 read_cb: typing.Optional[ReadCb] = None,
167                 write_cb: typing.Optional[WriteCb] = None,
168                 write_mask_cb: typing.Optional[WriteMaskCb] = None):
169        self._conn = conn
170        self._modbus_type = modbus_type
171        self._read_cb = read_cb
172        self._write_cb = write_cb
173        self._write_mask_cb = write_mask_cb
174
175        self.async_group.spawn(self._receive_loop)
176
177    @property
178    def async_group(self) -> aio.Group:
179        """Async group"""
180        return self._conn.async_group
181
182    @property
183    def log_prefix(self) -> str:
184        """Logging prefix"""
185        return self._conn.log_prefix
186
187    async def _receive_loop(self):
188        self._log(logging.DEBUG, "starting slave receive loop")
189        try:
190            while True:
191                try:
192                    self._log(logging.DEBUG, "waiting for request")
193                    req_adu = await self._conn.receive(
194                        modbus_type=self._modbus_type,
195                        direction=transport.Direction.REQUEST)
196
197                except ConnectionError:
198                    break
199
200                except Exception as e:
201                    self._log(logging.WARNING, "error receiving request: %s",
202                              e, exc_info=e)
203                    continue
204
205                device_id = req_adu.device_id
206                req = req_adu.pdu
207
208                try:
209                    self._log(logging.DEBUG,
210                              "processing request (device_id %s): %s",
211                              device_id, req)
212                    res = await self._process_request(device_id, req)
213
214                except Exception as e:
215                    self._log(logging.WARNING, "error processing request: %s",
216                              e, exc_info=e)
217                    continue
218
219                if device_id == 0:
220                    self._log(logging.DEBUG,
221                              "skip sending response (broadcast request): %s",
222                              res)
223                    continue
224
225                self._log(logging.DEBUG, "sending response: %s", res)
226
227                if self._modbus_type == common.ModbusType.TCP:
228                    res_adu = transport.TcpAdu(
229                        transaction_id=req_adu.transaction_id,
230                        device_id=req_adu.device_id,
231                        pdu=res)
232
233                elif self._modbus_type == common.ModbusType.RTU:
234                    res_adu = transport.RtuAdu(device_id=req_adu.device_id,
235                                               pdu=res)
236
237                elif self._modbus_type == common.ModbusType.ASCII:
238                    res_adu = transport.AsciiAdu(device_id=req_adu.device_id,
239                                                 pdu=res)
240
241                else:
242                    raise ValueError("invalid modbus type")
243
244                try:
245                    await self._conn.send(res_adu)
246
247                except ConnectionError:
248                    break
249
250                except Exception as e:
251                    self._log(logging.WARNING, "error sending response: %s", e,
252                              exc_info=e)
253                    continue
254
255        except Exception as e:
256            self._log(logging.ERROR, "receive loop error: %s", e, exc_info=e)
257
258        finally:
259            self._log(logging.DEBUG, "closing slave receive loop")
260            self.close()
261
262    async def _process_request(self, device_id, req):
263        if isinstance(req, transport.ReadCoilsReq):
264            result = await self._call_read_cb(
265                device_id=device_id,
266                data_type=common.DataType.COIL,
267                start_address=req.address,
268                quantity=req.quantity)
269
270            if isinstance(result, common.Error):
271                return transport.ErrorRes(
272                    fc=transport.FunctionCode.READ_COILS,
273                    error=result)
274
275            return transport.ReadCoilsRes(values=result)
276
277        if isinstance(req, transport.ReadDiscreteInputsReq):
278            result = await self._call_read_cb(
279                device_id=device_id,
280                data_type=common.DataType.DISCRETE_INPUT,
281                start_address=req.address,
282                quantity=req.quantity)
283
284            if isinstance(result, common.Error):
285                return transport.ErrorRes(
286                    fc=transport.FunctionCode.READ_DISCRETE_INPUTS,
287                    error=result)
288
289            return transport.ReadDiscreteInputsRes(values=result)
290
291        if isinstance(req, transport.ReadHoldingRegistersReq):
292            result = await self._call_read_cb(
293                device_id=device_id,
294                data_type=common.DataType.HOLDING_REGISTER,
295                start_address=req.address,
296                quantity=req.quantity)
297
298            if isinstance(result, common.Error):
299                return transport.ErrorRes(
300                    fc=transport.FunctionCode.READ_HOLDING_REGISTERS,
301                    error=result)
302
303            return transport.ReadHoldingRegistersRes(values=result)
304
305        if isinstance(req, transport.ReadInputRegistersReq):
306            result = await self._call_read_cb(
307                device_id=device_id,
308                data_type=common.DataType.INPUT_REGISTER,
309                start_address=req.address,
310                quantity=req.quantity)
311
312            if isinstance(result, common.Error):
313                return transport.ErrorRes(
314                    fc=transport.FunctionCode.READ_INPUT_REGISTERS,
315                    error=result)
316
317            return transport.ReadInputRegistersRes(values=result)
318
319        if isinstance(req, transport.WriteSingleCoilReq):
320            result = await self._call_write_cb(
321                device_id=device_id,
322                data_type=common.DataType.COIL,
323                start_address=req.address,
324                values=[req.value])
325
326            if isinstance(result, common.Error):
327                return transport.ErrorRes(
328                    fc=transport.FunctionCode.WRITE_SINGLE_COIL,
329                    error=result)
330
331            return transport.WriteSingleCoilRes(address=req.address,
332                                                value=req.value)
333
334        if isinstance(req, transport.WriteSingleRegisterReq):
335            result = await self._call_write_cb(
336                device_id=device_id,
337                data_type=common.DataType.HOLDING_REGISTER,
338                start_address=req.address,
339                values=[req.value])
340
341            if isinstance(result, common.Error):
342                return transport.ErrorRes(
343                    fc=transport.FunctionCode.WRITE_SINGLE_REGISTER,
344                    error=result)
345
346            return transport.WriteSingleRegisterRes(address=req.address,
347                                                    value=req.value)
348
349        if isinstance(req, transport.WriteMultipleCoilsReq):
350            result = await self._call_write_cb(
351                device_id=device_id,
352                data_type=common.DataType.COIL,
353                start_address=req.address,
354                values=req.values)
355
356            if isinstance(result, common.Error):
357                return transport.ErrorRes(
358                    fc=transport.FunctionCode.WRITE_MULTIPLE_COILS,
359                    error=result)
360
361            return transport.WriteMultipleCoilsRes(address=req.address,
362                                                   quantity=len(req.values))
363
364        if isinstance(req, transport.WriteMultipleRegistersReq):
365            result = await self._call_write_cb(
366                device_id=device_id,
367                data_type=common.DataType.HOLDING_REGISTER,
368                start_address=req.address,
369                values=req.values)
370
371            if isinstance(result, common.Error):
372                return transport.ErrorRes(
373                    fc=transport.FunctionCode.WRITE_MULTIPLE_REGISTER,
374                    error=result)
375
376            return transport.WriteMultipleRegistersRes(
377                address=req.address,
378                quantity=len(req.values))
379
380        if isinstance(req, transport.MaskWriteRegisterReq):
381            result = await self._call_write_mask_cb(
382                device_id=device_id,
383                address=req.address,
384                and_mask=req.and_mask,
385                or_mask=req.or_mask)
386
387            if isinstance(result, common.Error):
388                return transport.ErrorRes(
389                    fc=transport.FunctionCode.MASK_WRITE_REGISTER,
390                    error=result)
391
392            return transport.MaskWriteRegisterRes(address=req.address,
393                                                  and_mask=req.and_mask,
394                                                  or_mask=req.or_mask)
395
396        if isinstance(req, transport.ReadFifoQueueReq):
397            result = await self._call_read_cb(
398                device_id=device_id,
399                data_type=common.DataType.QUEUE,
400                start_address=req.address,
401                quantity=None)
402
403            if isinstance(result, common.Error):
404                return transport.ErrorRes(
405                    fc=transport.FunctionCode.READ_FIFO_QUEUE,
406                    error=result)
407
408            return transport.ReadFifoQueueRes(values=result)
409
410        return transport.ErrorRes(fc=transport.get_pdu_function_code(req),
411                                  error=common.Error.INVALID_FUNCTION_CODE)
412
413    async def _call_read_cb(self, device_id, data_type, start_address,
414                            quantity):
415        if not self._read_cb:
416            self._log(logging.DEBUG, "read callback not defined")
417            return common.Error.FUNCTION_ERROR
418
419        try:
420            return await aio.call(self._read_cb, self, device_id, data_type,
421                                  start_address, quantity)
422
423        except Exception as e:
424            self._log(logging.WARNING, "error in read callback: %s", e,
425                      exc_info=e)
426            return common.Error.FUNCTION_ERROR
427
428    async def _call_write_cb(self, device_id, data_type, start_address,
429                             values):
430        if not self._write_cb:
431            self._log(logging.DEBUG, "write callback not defined")
432            return common.Error.FUNCTION_ERROR
433
434        try:
435            return await aio.call(self._write_cb, self, device_id, data_type,
436                                  start_address, values)
437
438        except Exception as e:
439            self._log(logging.WARNING, "error in write callback: %s", e,
440                      exc_info=e)
441            return common.Error.FUNCTION_ERROR
442
443    async def _call_write_mask_cb(self, device_id, address, and_mask,
444                                  or_mask):
445        if not self._write_mask_cb:
446            self._log(logging.DEBUG, "write mask callback not defined")
447            return common.Error.FUNCTION_ERROR
448
449        try:
450            return await aio.call(self._write_mask_cb, self, device_id,
451                                  address, and_mask, or_mask)
452
453        except Exception as e:
454            self._log(logging.WARNING, "error in write mask callback: %s", e,
455                      exc_info=e)
456            return common.Error.FUNCTION_ERROR
457
458    def _log(self, level, msg, *args, **kwargs):
459        if not mlog.isEnabledFor(level):
460            return
461
462        mlog.log(level, f"{self.log_prefix}: {msg}", *args, **kwargs)

Modbus slave

Slave( conn: hat.drivers.modbus.transport.connection.Connection, modbus_type: ModbusType, read_cb: Optional[Callable[[Slave, int, DataType, int, int | None], Union[list[int], Error, Awaitable[list[int] | Error]]]] = None, write_cb: Optional[Callable[[Slave, int, DataType, int, list[int]], Union[Error, NoneType, Awaitable[Error | None]]]] = None, write_mask_cb: Optional[Callable[[Slave, int, int, int, int], Union[Error, NoneType, Awaitable[Error | None]]]] = None)
163    def __init__(self,
164                 conn: transport.Connection,
165                 modbus_type: common.ModbusType,
166                 read_cb: typing.Optional[ReadCb] = None,
167                 write_cb: typing.Optional[WriteCb] = None,
168                 write_mask_cb: typing.Optional[WriteMaskCb] = None):
169        self._conn = conn
170        self._modbus_type = modbus_type
171        self._read_cb = read_cb
172        self._write_cb = write_cb
173        self._write_mask_cb = write_mask_cb
174
175        self.async_group.spawn(self._receive_loop)
async_group: hat.aio.group.Group
177    @property
178    def async_group(self) -> aio.Group:
179        """Async group"""
180        return self._conn.async_group

Async group

log_prefix: str
182    @property
183    def log_prefix(self) -> str:
184        """Logging prefix"""
185        return self._conn.log_prefix

Logging prefix