hat.drivers.modbus

Modbus communication protocol

 1"""Modbus communication protocol"""
 2
 3from hat.drivers.modbus.common import (DeviceId,
 4                                       DataAddress,
 5                                       DataValues,
 6                                       ModbusType,
 7                                       DataType,
 8                                       Error,
 9                                       Success,
10                                       ReadReq,
11                                       ReadRes,
12                                       WriteReq,
13                                       WriteRes,
14                                       WriteMaskReq,
15                                       WriteMaskRes,
16                                       Request,
17                                       Response,
18                                       apply_mask)
19from hat.drivers.modbus.master import (create_tcp_master,
20                                       create_serial_master,
21                                       Master)
22from hat.drivers.modbus.slave import (SlaveCb,
23                                      RequestCb,
24                                      create_tcp_server,
25                                      create_serial_slave,
26                                      Slave)
27
28
29__all__ = ['DeviceId',
30           'DataAddress',
31           'DataValues',
32           'ModbusType',
33           'DataType',
34           'Error',
35           'Success',
36           'ReadReq',
37           'ReadRes',
38           'WriteReq',
39           'WriteRes',
40           'WriteMaskReq',
41           'WriteMaskRes',
42           'Request',
43           'Response',
44           'apply_mask',
45           'create_tcp_master',
46           'create_serial_master',
47           'Master',
48           'SlaveCb',
49           'RequestCb',
50           'create_tcp_server',
51           'create_serial_slave',
52           'Slave']
DeviceId = <class 'int'>
DataAddress = <class 'int'>
DataValues = collections.abc.Sequence[int]
class ModbusType(enum.Enum):
23class ModbusType(enum.Enum):
24    """Modbus type"""
25    TCP = 0
26    RTU = 1
27    ASCII = 2

Modbus type

TCP = <ModbusType.TCP: 0>
RTU = <ModbusType.RTU: 1>
ASCII = <ModbusType.ASCII: 2>
class DataType(enum.Enum):
30class DataType(enum.Enum):
31    """Data type"""
32    COIL = 1
33    DISCRETE_INPUT = 2
34    HOLDING_REGISTER = 3
35    INPUT_REGISTER = 4
36    QUEUE = 5

Data type

COIL = <DataType.COIL: 1>
DISCRETE_INPUT = <DataType.DISCRETE_INPUT: 2>
HOLDING_REGISTER = <DataType.HOLDING_REGISTER: 3>
INPUT_REGISTER = <DataType.INPUT_REGISTER: 4>
QUEUE = <DataType.QUEUE: 5>
class Error(enum.Enum):
39class Error(enum.Enum):
40    """Error"""
41    INVALID_FUNCTION_CODE = 0x01
42    INVALID_DATA_ADDRESS = 0x02
43    INVALID_DATA_VALUE = 0x03
44    FUNCTION_ERROR = 0x04
45    GATEWAY_PATH_UNAVAILABLE = 0x0a
46    GATEWAY_TARGET_DEVICE_FAILED_TO_RESPOND = 0x0b

Error

INVALID_FUNCTION_CODE = <Error.INVALID_FUNCTION_CODE: 1>
INVALID_DATA_ADDRESS = <Error.INVALID_DATA_ADDRESS: 2>
INVALID_DATA_VALUE = <Error.INVALID_DATA_VALUE: 3>
FUNCTION_ERROR = <Error.FUNCTION_ERROR: 4>
GATEWAY_PATH_UNAVAILABLE = <Error.GATEWAY_PATH_UNAVAILABLE: 10>
GATEWAY_TARGET_DEVICE_FAILED_TO_RESPOND = <Error.GATEWAY_TARGET_DEVICE_FAILED_TO_RESPOND: 11>
class Success(typing.NamedTuple):
49class Success(typing.NamedTuple):
50    """Success"""

Success

Success()

Create new instance of Success()

class ReadReq(typing.NamedTuple):
53class ReadReq(typing.NamedTuple):
54    """Read request
55
56    If `data_type` is ``DataType.QUEUE``, `quantity` is ignored.
57
58    """
59    device_id: DeviceId
60    data_type: DataType
61    start_address: DataAddress
62    quantity: int

Read request

If data_type is DataType.QUEUE, quantity is ignored.

ReadReq( device_id: int, data_type: DataType, start_address: int, quantity: int)

Create new instance of ReadReq(device_id, data_type, start_address, quantity)

device_id: int

Alias for field number 0

data_type: DataType

Alias for field number 1

start_address: int

Alias for field number 2

quantity: int

Alias for field number 3

ReadRes = collections.abc.Sequence[int] | Error
class WriteReq(typing.NamedTuple):
69class WriteReq(typing.NamedTuple):
70    """Write request"""
71    device_id: DeviceId
72    data_type: DataType
73    start_address: DataAddress
74    values: DataValues

Write request

WriteReq( device_id: int, data_type: DataType, start_address: int, values: Sequence[int])

Create new instance of WriteReq(device_id, data_type, start_address, values)

device_id: int

Alias for field number 0

data_type: DataType

Alias for field number 1

start_address: int

Alias for field number 2

values: Sequence[int]

Alias for field number 3

WriteRes = Success | Error
class WriteMaskReq(typing.NamedTuple):
81class WriteMaskReq(typing.NamedTuple):
82    """Write mask request"""
83    device_id: DeviceId
84    address: DataAddress
85    and_mask: int
86    or_mask: int

Write mask request

WriteMaskReq(device_id: int, address: int, and_mask: int, or_mask: int)

Create new instance of WriteMaskReq(device_id, address, and_mask, or_mask)

device_id: int

Alias for field number 0

address: int

Alias for field number 1

and_mask: int

Alias for field number 2

or_mask: int

Alias for field number 3

WriteMaskRes = Success | Error
Request = ReadReq | WriteReq | WriteMaskReq
Response = collections.abc.Sequence[int] | Error | Success
def apply_mask(value: int, and_mask: int, or_mask: int) -> int:
101def apply_mask(value: int, and_mask: int, or_mask: int) -> int:
102    """Apply mask to value"""
103    return (value & and_mask) | (or_mask & (~and_mask))

Apply mask to value

async def create_tcp_master( modbus_type: ModbusType, addr: hat.drivers.tcp.Address, *, response_timeout: float | None = None, **kwargs) -> Master:
20async def create_tcp_master(modbus_type: common.ModbusType,
21                            addr: tcp.Address,
22                            *,
23                            response_timeout: float | None = None,
24                            **kwargs
25                            ) -> 'Master':
26    """Create TCP master
27
28    Additional arguments are passed directly to `hat.drivers.tcp.connect`.
29
30    Args:
31        modbus_type: modbus type
32        addr: remote host address
33        response_timeout: response timeout in seconds
34
35    """
36    conn = await tcp.connect(addr, **kwargs)
37
38    try:
39        return Master(link=transport.TcpLink(conn),
40                      modbus_type=modbus_type,
41                      response_timeout=response_timeout)
42
43    except BaseException:
44        await aio.uncancellable(conn.async_close())
45        raise

Create TCP master

Additional arguments are passed directly to hat.drivers.tcp.connect.

Arguments:
  • modbus_type: modbus type
  • addr: remote host address
  • response_timeout: response timeout in seconds
async def create_serial_master( modbus_type: ModbusType, port: str, *, silent_interval: float = 0.005, response_timeout: float | None = None, **kwargs) -> Master:
48async def create_serial_master(modbus_type: common.ModbusType,
49                               port: str,
50                               *,
51                               silent_interval: float = 0.005,
52                               response_timeout: float | None = None,
53                               **kwargs
54                               ) -> 'Master':
55    """Create serial master
56
57    Additional arguments are passed directly to `hat.drivers.serial.create`.
58
59    Args:
60        modbus_type: modbus type
61        port: port name (see `hat.drivers.serial.create`)
62        silent_interval: silent interval (see `hat.drivers.serial.create`)
63        response_timeout: response timeout in seconds
64
65    """
66    endpoint = await serial.create(port,
67                                   silent_interval=silent_interval,
68                                   **kwargs)
69
70    try:
71        return Master(link=transport.SerialLink(endpoint),
72                      modbus_type=modbus_type,
73                      response_timeout=response_timeout)
74
75    except BaseException:
76        await aio.uncancellable(endpoint.async_close())
77        raise

Create serial master

Additional arguments are passed directly to hat.drivers.serial.create.

Arguments:
class Master(hat.aio.group.Resource):
 80class Master(aio.Resource):
 81    """Modbus master"""
 82
 83    def __init__(self,
 84                 link: transport.Link,
 85                 modbus_type: common.ModbusType,
 86                 response_timeout: float | None):
 87        self._modbus_type = modbus_type
 88        self._response_timeout = response_timeout
 89        self._conn = transport.Connection(link)
 90        self._send_queue = aio.Queue()
 91        self._loop = asyncio.get_running_loop()
 92        self._log = _create_logger_adapter(self._conn.info)
 93
 94        if modbus_type == common.ModbusType.TCP:
 95            self._next_transaction_ids = iter(i % 0x10000
 96                                              for i in itertools.count(1))
 97
 98        self.async_group.spawn(self._send_loop)
 99
100        self._log.debug('master created')
101
102    @property
103    def async_group(self) -> aio.Group:
104        """Async group"""
105        return self._conn.async_group
106
107    @property
108    def info(self) -> tcp.ConnectionInfo | serial.EndpointInfo:
109        """Connection or endpoint info"""
110        return self._conn.info
111
112    async def send(self, req: common.Request) -> common.Response:
113        """Send request and wait for response
114
115        Args:
116            req: request
117
118        Returns:
119            response
120
121        Raises:
122            ConnectionError
123            TimeoutError
124
125        """
126        if isinstance(req, common.ReadReq):
127            if req.data_type == common.DataType.COIL:
128                req_pdu = transport.ReadCoilsReq(address=req.start_address,
129                                                 quantity=req.quantity)
130
131            elif req.data_type == common.DataType.DISCRETE_INPUT:
132                req_pdu = transport.ReadDiscreteInputsReq(
133                    address=req.start_address,
134                    quantity=req.quantity)
135
136            elif req.data_type == common.DataType.HOLDING_REGISTER:
137                req_pdu = transport.ReadHoldingRegistersReq(
138                    address=req.start_address,
139                    quantity=req.quantity)
140
141            elif req.data_type == common.DataType.INPUT_REGISTER:
142                req_pdu = transport.ReadInputRegistersReq(
143                    address=req.start_address,
144                    quantity=req.quantity)
145
146            elif req.data_type == common.DataType.QUEUE:
147                req_pdu = transport.ReadFifoQueueReq(address=req.start_address)
148
149            else:
150                raise ValueError('unsupported data type')
151
152        elif isinstance(req, common.WriteReq):
153            if req.data_type == common.DataType.COIL:
154                if len(req.values) == 1:
155                    req_pdu = transport.WriteSingleCoilReq(
156                        address=req.start_address,
157                        value=req.values[0])
158
159                else:
160                    req_pdu = transport.WriteMultipleCoilsReq(
161                        address=req.start_address,
162                        values=req.values)
163
164            elif req.data_type == common.DataType.HOLDING_REGISTER:
165                if len(req.values) == 1:
166                    req_pdu = transport.WriteSingleRegisterReq(
167                        address=req.start_address,
168                        value=req.values[0])
169
170                else:
171                    req_pdu = transport.WriteMultipleRegistersReq(
172                        address=req.start_address,
173                        values=req.values)
174
175            else:
176                raise ValueError('unsupported data type')
177
178        elif isinstance(req, common.WriteMaskReq):
179            req_pdu = transport.MaskWriteRegisterReq(address=req.address,
180                                                     and_mask=req.and_mask,
181                                                     or_mask=req.or_mask)
182
183        else:
184            raise TypeError('unsupported request')
185
186        res_pdu = await self._send(req.device_id, req_pdu)
187
188        if isinstance(res_pdu, transport.ErrorRes):
189            return res_pdu.error
190
191        if isinstance(req, common.ReadReq):
192            if isinstance(res_pdu, (transport.ReadCoilsRes,
193                                    transport.ReadDiscreteInputsRes,
194                                    transport.ReadHoldingRegistersRes,
195                                    transport.ReadInputRegistersRes)):
196                return res_pdu.values[:req.quantity]
197
198            if isinstance(res_pdu, transport.ReadFifoQueueRes):
199                return res_pdu.values
200
201            raise ValueError("unsupported response pdu")
202
203        elif isinstance(req, common.WriteReq):
204            if not isinstance(res_pdu, (transport.WriteSingleCoilRes,
205                                        transport.WriteMultipleCoilsRes,
206                                        transport.WriteSingleRegisterRes,
207                                        transport.WriteMultipleRegistersRes)):
208                raise ValueError("unsupported response pdu")
209
210            if (res_pdu.address != req.start_address):
211                raise Exception("invalid response pdu address")
212
213            if isinstance(res_pdu, (transport.WriteSingleCoilRes,
214                                    transport.WriteSingleRegisterRes)):
215                if (res_pdu.value != req.values[0]):
216                    raise Exception("invalid response pdu value")
217
218            if isinstance(res_pdu, (transport.WriteMultipleCoilsRes,
219                                    transport.WriteMultipleRegistersRes)):
220                if (res_pdu.quantity != len(req.values)):
221                    raise Exception("invalid response pdu quantity")
222
223            return common.Success()
224
225        elif isinstance(req, common.WriteMaskReq):
226            if not isinstance(res_pdu, transport.MaskWriteRegisterRes):
227                raise ValueError("unsupported response pdu")
228
229            if (res_pdu.address != req.address):
230                raise Exception("invalid response pdu address")
231
232            if (res_pdu.and_mask != req.and_mask):
233                raise Exception("invalid response pdu and mask")
234
235            if (res_pdu.or_mask != req.or_mask):
236                raise Exception("invalid response pdu or mask")
237
238            return common.Success()
239
240        else:
241            raise TypeError('unsupported request')
242
243    async def _send(self, device_id, req_pdu):
244        if self._modbus_type == common.ModbusType.TCP:
245            req_adu = transport.TcpAdu(
246                transaction_id=next(self._next_transaction_ids),
247                device_id=device_id,
248                pdu=req_pdu)
249
250        elif self._modbus_type == common.ModbusType.RTU:
251            req_adu = transport.RtuAdu(device_id=device_id,
252                                       pdu=req_pdu)
253
254        elif self._modbus_type == common.ModbusType.ASCII:
255            req_adu = transport.AsciiAdu(device_id=device_id,
256                                         pdu=req_pdu)
257
258        else:
259            raise ValueError("unsupported modbus type")
260
261        future = self._loop.create_future()
262        try:
263            self._send_queue.put_nowait((req_adu, future))
264            res_adu = await future
265
266        except aio.QueueClosedError:
267            raise ConnectionError()
268
269        return res_adu.pdu
270
271    async def _receive(self, req_adu):
272        while True:
273            res_adu = await self._conn.receive(self._modbus_type,
274                                               transport.Direction.RESPONSE)
275
276            if isinstance(res_adu, transport.TcpAdu):
277                if res_adu.transaction_id != req_adu.transaction_id:
278                    self._log.warning("discarding response adu: "
279                                      "invalid response transaction id")
280                    continue
281
282            if res_adu.device_id != req_adu.device_id:
283                self._log.warning("discarding response adu: "
284                                  "invalid response device id")
285                continue
286
287            req_fc = transport.get_pdu_function_code(req_adu.pdu)
288            res_fc = transport.get_pdu_function_code(res_adu.pdu)
289            if req_fc != res_fc:
290                self._log.warning("discarding response adu: "
291                                  "invalid response function code")
292                continue
293
294            return res_adu
295
296    async def _send_loop(self):
297        self._log.debug("starting master send loop")
298        future = None
299        try:
300            while self.is_open:
301                # req_adu, future = await self._send_queue.get()
302
303                async with self.async_group.create_subgroup() as subgroup:
304                    subgroup.spawn(self._clear_input_buffer_loop)
305                    self._log.debug("started discarding incomming data")
306
307                    while not future or future.done():
308                        req_adu, future = await self._send_queue.get()
309
310                await self._clear_input_buffer()
311                self._log.debug("stopped discarding incomming data")
312
313                await self._conn.send(req_adu)
314                await self._conn.drain()
315
316                async with self.async_group.create_subgroup(
317                        log_exceptions=False) as subgroup:
318                    receive_task = subgroup.spawn(self._receive, req_adu)
319
320                    await asyncio.wait([receive_task, future],
321                                       timeout=self._response_timeout,
322                                       return_when=asyncio.FIRST_COMPLETED)
323
324                    if future.done():
325                        continue
326
327                    if receive_task.done():
328                        future.set_result(receive_task.result())
329
330                    else:
331                        future.set_exception(TimeoutError())
332
333        except ConnectionError:
334            pass
335
336        except Exception as e:
337            self._log.error("error in send loop: %s", e, exc_info=e)
338
339        finally:
340            self._log.debug("stopping master send loop")
341            self.close()
342            self._send_queue.close()
343
344            while True:
345                if future and not future.done():
346                    future.set_exception(ConnectionError())
347                if self._send_queue.empty():
348                    break
349                _, future = self._send_queue.get_nowait()
350
351    async def _clear_input_buffer_loop(self):
352        try:
353            while True:
354                await self._clear_input_buffer()
355
356                await self._conn.read_byte()
357                self._log.debug("discarded 1 byte from input buffer")
358
359        except ConnectionError:
360            self.close()
361
362        except Exception as e:
363            self._log.error("error in reset input buffer loop: %s", e,
364                            exc_info=e)
365            self.close()
366
367    async def _clear_input_buffer(self):
368        count = await self._conn.clear_input_buffer()
369        if not count:
370            return
371        self._log.debug("discarded %s bytes from input buffer", count)

Modbus master

Master( link: hat.drivers.modbus.transport.connection.Link, modbus_type: ModbusType, response_timeout: float | None)
 83    def __init__(self,
 84                 link: transport.Link,
 85                 modbus_type: common.ModbusType,
 86                 response_timeout: float | None):
 87        self._modbus_type = modbus_type
 88        self._response_timeout = response_timeout
 89        self._conn = transport.Connection(link)
 90        self._send_queue = aio.Queue()
 91        self._loop = asyncio.get_running_loop()
 92        self._log = _create_logger_adapter(self._conn.info)
 93
 94        if modbus_type == common.ModbusType.TCP:
 95            self._next_transaction_ids = iter(i % 0x10000
 96                                              for i in itertools.count(1))
 97
 98        self.async_group.spawn(self._send_loop)
 99
100        self._log.debug('master created')
async_group: hat.aio.group.Group
102    @property
103    def async_group(self) -> aio.Group:
104        """Async group"""
105        return self._conn.async_group

Async group

107    @property
108    def info(self) -> tcp.ConnectionInfo | serial.EndpointInfo:
109        """Connection or endpoint info"""
110        return self._conn.info

Connection or endpoint info

async def send( self, req: ReadReq | WriteReq | WriteMaskReq) -> Sequence[int] | Error | Success:
112    async def send(self, req: common.Request) -> common.Response:
113        """Send request and wait for response
114
115        Args:
116            req: request
117
118        Returns:
119            response
120
121        Raises:
122            ConnectionError
123            TimeoutError
124
125        """
126        if isinstance(req, common.ReadReq):
127            if req.data_type == common.DataType.COIL:
128                req_pdu = transport.ReadCoilsReq(address=req.start_address,
129                                                 quantity=req.quantity)
130
131            elif req.data_type == common.DataType.DISCRETE_INPUT:
132                req_pdu = transport.ReadDiscreteInputsReq(
133                    address=req.start_address,
134                    quantity=req.quantity)
135
136            elif req.data_type == common.DataType.HOLDING_REGISTER:
137                req_pdu = transport.ReadHoldingRegistersReq(
138                    address=req.start_address,
139                    quantity=req.quantity)
140
141            elif req.data_type == common.DataType.INPUT_REGISTER:
142                req_pdu = transport.ReadInputRegistersReq(
143                    address=req.start_address,
144                    quantity=req.quantity)
145
146            elif req.data_type == common.DataType.QUEUE:
147                req_pdu = transport.ReadFifoQueueReq(address=req.start_address)
148
149            else:
150                raise ValueError('unsupported data type')
151
152        elif isinstance(req, common.WriteReq):
153            if req.data_type == common.DataType.COIL:
154                if len(req.values) == 1:
155                    req_pdu = transport.WriteSingleCoilReq(
156                        address=req.start_address,
157                        value=req.values[0])
158
159                else:
160                    req_pdu = transport.WriteMultipleCoilsReq(
161                        address=req.start_address,
162                        values=req.values)
163
164            elif req.data_type == common.DataType.HOLDING_REGISTER:
165                if len(req.values) == 1:
166                    req_pdu = transport.WriteSingleRegisterReq(
167                        address=req.start_address,
168                        value=req.values[0])
169
170                else:
171                    req_pdu = transport.WriteMultipleRegistersReq(
172                        address=req.start_address,
173                        values=req.values)
174
175            else:
176                raise ValueError('unsupported data type')
177
178        elif isinstance(req, common.WriteMaskReq):
179            req_pdu = transport.MaskWriteRegisterReq(address=req.address,
180                                                     and_mask=req.and_mask,
181                                                     or_mask=req.or_mask)
182
183        else:
184            raise TypeError('unsupported request')
185
186        res_pdu = await self._send(req.device_id, req_pdu)
187
188        if isinstance(res_pdu, transport.ErrorRes):
189            return res_pdu.error
190
191        if isinstance(req, common.ReadReq):
192            if isinstance(res_pdu, (transport.ReadCoilsRes,
193                                    transport.ReadDiscreteInputsRes,
194                                    transport.ReadHoldingRegistersRes,
195                                    transport.ReadInputRegistersRes)):
196                return res_pdu.values[:req.quantity]
197
198            if isinstance(res_pdu, transport.ReadFifoQueueRes):
199                return res_pdu.values
200
201            raise ValueError("unsupported response pdu")
202
203        elif isinstance(req, common.WriteReq):
204            if not isinstance(res_pdu, (transport.WriteSingleCoilRes,
205                                        transport.WriteMultipleCoilsRes,
206                                        transport.WriteSingleRegisterRes,
207                                        transport.WriteMultipleRegistersRes)):
208                raise ValueError("unsupported response pdu")
209
210            if (res_pdu.address != req.start_address):
211                raise Exception("invalid response pdu address")
212
213            if isinstance(res_pdu, (transport.WriteSingleCoilRes,
214                                    transport.WriteSingleRegisterRes)):
215                if (res_pdu.value != req.values[0]):
216                    raise Exception("invalid response pdu value")
217
218            if isinstance(res_pdu, (transport.WriteMultipleCoilsRes,
219                                    transport.WriteMultipleRegistersRes)):
220                if (res_pdu.quantity != len(req.values)):
221                    raise Exception("invalid response pdu quantity")
222
223            return common.Success()
224
225        elif isinstance(req, common.WriteMaskReq):
226            if not isinstance(res_pdu, transport.MaskWriteRegisterRes):
227                raise ValueError("unsupported response pdu")
228
229            if (res_pdu.address != req.address):
230                raise Exception("invalid response pdu address")
231
232            if (res_pdu.and_mask != req.and_mask):
233                raise Exception("invalid response pdu and mask")
234
235            if (res_pdu.or_mask != req.or_mask):
236                raise Exception("invalid response pdu or mask")
237
238            return common.Success()
239
240        else:
241            raise TypeError('unsupported request')

Send request and wait for response

Arguments:
  • req: request
Returns:

response

Raises:
  • ConnectionError
  • TimeoutError
SlaveCb = typing.Callable[[ForwardRef('Slave')], None | collections.abc.Awaitable[None]]
RequestCb = typing.Callable[[ForwardRef('Slave'), ReadReq | WriteReq | WriteMaskReq], collections.abc.Sequence[int] | Error | Success | None | collections.abc.Awaitable[collections.abc.Sequence[int] | Error | Success | None]]
async def create_tcp_server( modbus_type: ModbusType, addr: hat.drivers.tcp.Address, *, slave_cb: Optional[Callable[[Slave], None | Awaitable[None]]] = None, request_cb: Optional[Callable[[Slave, ReadReq | WriteReq | WriteMaskReq], Sequence[int] | Error | Success | None | Awaitable[Sequence[int] | Error | Success | None]]] = None, **kwargs) -> hat.drivers.tcp.Server:
26async def create_tcp_server(modbus_type: common.ModbusType,
27                            addr: tcp.Address,
28                            *,
29                            slave_cb: SlaveCb | None = None,
30                            request_cb: RequestCb | None = None,
31                            **kwargs
32                            ) -> tcp.Server:
33    """Create TCP server
34
35    Closing server closes all active associated slaves.
36
37    Additional arguments are passed directly to `hat.drivers.tcp.connect`
38    (`bind_connections` is set by this coroutine).
39
40    Args:
41        modbus_type: modbus type
42        addr: local listening host address
43        slave_cb: slave callback
44        request_cb: request callback
45
46    """
47
48    async def on_connection(conn):
49        if not conn.is_open:
50            return
51
52        log = _create_logger_adapter(conn.info)
53
54        slave = Slave(link=transport.TcpLink(conn),
55                      modbus_type=modbus_type,
56                      request_cb=request_cb)
57
58        try:
59            if slave_cb:
60                await aio.call(slave_cb, slave)
61
62            await slave.wait_closing()
63
64        except Exception as e:
65            log.error("error in slave callback: %s", e, exc_info=e)
66
67        finally:
68            await aio.uncancellable(slave.async_close())
69
70    server = await tcp.listen(on_connection, addr,
71                              bind_connections=True,
72                              **kwargs)
73
74    return server

Create TCP server

Closing server closes all active associated slaves.

Additional arguments are passed directly to hat.drivers.tcp.connect (bind_connections is set by this coroutine).

Arguments:
  • modbus_type: modbus type
  • addr: local listening host address
  • slave_cb: slave callback
  • request_cb: request callback
async def create_serial_slave( modbus_type: ModbusType, port: str, *, request_cb: Optional[Callable[[Slave, ReadReq | WriteReq | WriteMaskReq], Sequence[int] | Error | Success | None | Awaitable[Sequence[int] | Error | Success | None]]] = None, silent_interval: float = 0.005, **kwargs) -> Slave:
 77async def create_serial_slave(modbus_type: common.ModbusType,
 78                              port: str,
 79                              *,
 80                              request_cb: RequestCb | None = None,
 81                              silent_interval: float = 0.005,
 82                              **kwargs
 83                              ) -> 'Slave':
 84    """Create serial slave
 85
 86    Additional arguments are passed directly to `hat.drivers.serial.create`.
 87
 88    Args:
 89        modbus_type: modbus type
 90        port: port name (see `hat.drivers.serial.create`)
 91        request_cb: request callback
 92        silent_interval: silent interval (see `serial.create`)
 93
 94    """
 95    endpoint = await serial.create(port,
 96                                   silent_interval=silent_interval,
 97                                   **kwargs)
 98
 99    try:
100        return Slave(link=transport.SerialLink(endpoint),
101                     modbus_type=modbus_type,
102                     request_cb=request_cb)
103
104    except BaseException:
105        await aio.uncancellable(endpoint.async_close())
106        raise

Create serial slave

Additional arguments are passed directly to hat.drivers.serial.create.

Arguments:
  • modbus_type: modbus type
  • port: port name (see hat.drivers.serial.create)
  • request_cb: request callback
  • silent_interval: silent interval (see serial.create)
class Slave(hat.aio.group.Resource):
109class Slave(aio.Resource):
110    """Modbus slave"""
111
112    def __init__(self,
113                 link: transport.Link,
114                 modbus_type: common.ModbusType,
115                 request_cb: RequestCb | None = None):
116        self._modbus_type = modbus_type
117        self._request_cb = request_cb
118        self._conn = transport.Connection(link)
119        self._log = _create_logger_adapter(self._conn.info)
120
121        self.async_group.spawn(self._receive_loop)
122
123        self._log.debug('slave created')
124
125    @property
126    def async_group(self) -> aio.Group:
127        """Async group"""
128        return self._conn.async_group
129
130    @property
131    def info(self) -> tcp.ConnectionInfo | serial.EndpointInfo:
132        """Connection or endpoint info"""
133        return self._conn.info
134
135    async def _receive_loop(self):
136        self._log.debug("starting slave receive loop")
137        try:
138            while True:
139                try:
140                    self._log.debug("waiting for request")
141                    req_adu = await self._conn.receive(
142                        modbus_type=self._modbus_type,
143                        direction=transport.Direction.REQUEST)
144
145                except ConnectionError:
146                    break
147
148                except Exception as e:
149                    self._log.warning("error receiving request: %s", e,
150                                      exc_info=e)
151                    continue
152
153                device_id = req_adu.device_id
154                req_pdu = req_adu.pdu
155
156                try:
157                    self._log.debug("processing request (device_id %s): %s",
158                                    device_id, req_pdu)
159                    res_pdu = await self._process_request(device_id, req_pdu)
160
161                except Exception as e:
162                    self._log.warning("error processing request: %s", e,
163                                      exc_info=e)
164                    continue
165
166                if res_pdu is None:
167                    self._log.debug("skip sending response")
168                    continue
169
170                self._log.debug("sending response: %s", res_pdu)
171
172                if self._modbus_type == common.ModbusType.TCP:
173                    res_adu = transport.TcpAdu(
174                        transaction_id=req_adu.transaction_id,
175                        device_id=req_adu.device_id,
176                        pdu=res_pdu)
177
178                elif self._modbus_type == common.ModbusType.RTU:
179                    res_adu = transport.RtuAdu(device_id=req_adu.device_id,
180                                               pdu=res_pdu)
181
182                elif self._modbus_type == common.ModbusType.ASCII:
183                    res_adu = transport.AsciiAdu(device_id=req_adu.device_id,
184                                                 pdu=res_pdu)
185
186                else:
187                    raise ValueError("invalid modbus type")
188
189                try:
190                    await self._conn.send(res_adu)
191
192                except ConnectionError:
193                    break
194
195                except Exception as e:
196                    self._log.warning("error sending response: %s", e,
197                                      exc_info=e)
198                    continue
199
200        except Exception as e:
201            self._log.error("receive loop error: %s", e, exc_info=e)
202
203        finally:
204            self._log.debug("closing slave receive loop")
205            self.close()
206
207    async def _process_request(self, device_id, req_pdu):
208        if self._request_cb is None:
209            return
210
211        if isinstance(req_pdu, transport.ReadCoilsReq):
212            req = common.ReadReq(device_id=device_id,
213                                 data_type=common.DataType.COIL,
214                                 start_address=req_pdu.address,
215                                 quantity=req_pdu.quantity)
216
217        elif isinstance(req_pdu, transport.ReadDiscreteInputsReq):
218            req = common.ReadReq(device_id=device_id,
219                                 data_type=common.DataType.DISCRETE_INPUT,
220                                 start_address=req_pdu.address,
221                                 quantity=req_pdu.quantity)
222
223        elif isinstance(req_pdu, transport.ReadHoldingRegistersReq):
224            req = common.ReadReq(device_id=device_id,
225                                 data_type=common.DataType.HOLDING_REGISTER,
226                                 start_address=req_pdu.address,
227                                 quantity=req_pdu.quantity)
228
229        elif isinstance(req_pdu, transport.ReadInputRegistersReq):
230            req = common.ReadReq(device_id=device_id,
231                                 data_type=common.DataType.INPUT_REGISTER,
232                                 start_address=req_pdu.address,
233                                 quantity=req_pdu.quantity)
234
235        elif isinstance(req_pdu, transport.WriteSingleCoilReq):
236            req = common.WriteReq(device_id=device_id,
237                                  data_type=common.DataType.COIL,
238                                  start_address=req_pdu.address,
239                                  values=[req_pdu.value])
240
241        elif isinstance(req_pdu, transport.WriteSingleRegisterReq):
242            req = common.WriteReq(device_id=device_id,
243                                  data_type=common.DataType.HOLDING_REGISTER,
244                                  start_address=req_pdu.address,
245                                  values=[req_pdu.value])
246
247        elif isinstance(req_pdu, transport.WriteMultipleCoilsReq):
248            req = common.WriteReq(device_id=device_id,
249                                  data_type=common.DataType.COIL,
250                                  start_address=req_pdu.address,
251                                  values=req_pdu.values)
252
253        elif isinstance(req_pdu, transport.WriteMultipleRegistersReq):
254            req = common.WriteReq(device_id=device_id,
255                                  data_type=common.DataType.HOLDING_REGISTER,
256                                  start_address=req_pdu.address,
257                                  values=req_pdu.values)
258
259        elif isinstance(req_pdu, transport.MaskWriteRegisterReq):
260            req = common.WriteMaskReq(device_id=device_id,
261                                      address=req_pdu.address,
262                                      and_mask=req_pdu.and_mask,
263                                      or_mask=req_pdu.or_mask)
264
265        elif isinstance(req_pdu, transport.ReadFifoQueueReq):
266            req = common.ReadReq(device_id=device_id,
267                                 data_type=common.DataType.QUEUE,
268                                 start_address=req_pdu.address,
269                                 quantity=0)
270
271        else:
272            raise TypeError('unsupported request')
273
274        res = await aio.call(self._request_cb, self, req)
275
276        if res is None:
277            return
278
279        if isinstance(res, common.Error):
280            return transport.ErrorRes(
281                fc=transport.get_pdu_function_code(req_pdu),
282                error=res)
283
284        if isinstance(req_pdu, transport.ReadCoilsReq):
285            return transport.ReadCoilsRes(values=res)
286
287        if isinstance(req_pdu, transport.ReadDiscreteInputsReq):
288            return transport.ReadDiscreteInputsRes(values=res)
289
290        if isinstance(req_pdu, transport.ReadHoldingRegistersReq):
291            return transport.ReadHoldingRegistersRes(values=res)
292
293        if isinstance(req_pdu, transport.ReadInputRegistersReq):
294            return transport.ReadInputRegistersRes(values=res)
295
296        if isinstance(req_pdu, transport.WriteSingleCoilReq):
297            return transport.WriteSingleCoilRes(address=req_pdu.address,
298                                                value=req_pdu.value)
299
300        if isinstance(req_pdu, transport.WriteSingleRegisterReq):
301            return transport.WriteSingleRegisterRes(address=req_pdu.address,
302                                                    value=req_pdu.value)
303
304        if isinstance(req_pdu, transport.WriteMultipleCoilsReq):
305            return transport.WriteMultipleCoilsRes(
306                address=req_pdu.address,
307                quantity=len(req_pdu.values))
308
309        if isinstance(req_pdu, transport.WriteMultipleRegistersReq):
310            return transport.WriteMultipleRegistersRes(
311                address=req_pdu.address,
312                quantity=len(req_pdu.values))
313
314        if isinstance(req_pdu, transport.MaskWriteRegisterReq):
315            return transport.MaskWriteRegisterRes(address=req_pdu.address,
316                                                  and_mask=req_pdu.and_mask,
317                                                  or_mask=req_pdu.or_mask)
318
319        if isinstance(req_pdu, transport.ReadFifoQueueReq):
320            return transport.ReadFifoQueueRes(values=res)
321
322        raise TypeError('unsupported request')

Modbus slave

Slave( link: hat.drivers.modbus.transport.connection.Link, modbus_type: ModbusType, request_cb: Optional[Callable[[Slave, ReadReq | WriteReq | WriteMaskReq], Sequence[int] | Error | Success | None | Awaitable[Sequence[int] | Error | Success | None]]] = None)
112    def __init__(self,
113                 link: transport.Link,
114                 modbus_type: common.ModbusType,
115                 request_cb: RequestCb | None = None):
116        self._modbus_type = modbus_type
117        self._request_cb = request_cb
118        self._conn = transport.Connection(link)
119        self._log = _create_logger_adapter(self._conn.info)
120
121        self.async_group.spawn(self._receive_loop)
122
123        self._log.debug('slave created')
async_group: hat.aio.group.Group
125    @property
126    def async_group(self) -> aio.Group:
127        """Async group"""
128        return self._conn.async_group

Async group

130    @property
131    def info(self) -> tcp.ConnectionInfo | serial.EndpointInfo:
132        """Connection or endpoint info"""
133        return self._conn.info

Connection or endpoint info