hat.drivers.iec60870.link
IEC 60870-5 link layer
1"""IEC 60870-5 link layer""" 2 3from hat.drivers.iec60870.link.common import (Address, 4 AddressSize, 5 Connection, 6 Direction) 7from hat.drivers.iec60870.link.unbalanced import (PollClass2Cb, 8 create_master_link, 9 create_slave_link, 10 MasterLink, 11 SlaveLink) 12from hat.drivers.iec60870.link.balanced import (create_balanced_link, 13 BalancedLink) 14 15 16__all__ = ['Address', 17 'AddressSize', 18 'Connection', 19 'Direction', 20 'PollClass2Cb', 21 'create_master_link', 22 'create_slave_link', 23 'MasterLink', 24 'SlaveLink', 25 'create_balanced_link', 26 'BalancedLink']
Address =
<class 'int'>
class
AddressSize(enum.Enum):
ZERO =
<AddressSize.ZERO: 0>
ONE =
<AddressSize.ONE: 1>
TWO =
<AddressSize.TWO: 2>
class
Connection(hat.aio.group.Resource):
82class Connection(aio.Resource): 83 84 @property 85 @abc.abstractmethod 86 def address(self) -> Address: 87 pass 88 89 @abc.abstractmethod 90 async def send(self, 91 data: util.Bytes, 92 sent_cb: aio.AsyncCallable[[], None] | None = None): 93 pass 94 95 @abc.abstractmethod 96 async def receive(self) -> util.Bytes: 97 pass
Resource with lifetime control based on Group
.
class
Direction(enum.Enum):
B_TO_A =
<Direction.B_TO_A: 0>
A_TO_B =
<Direction.A_TO_B: 1>
PollClass2Cb =
typing.Callable[[Connection], bytes | bytearray | memoryview | None | collections.abc.Awaitable[bytes | bytearray | memoryview | None]]
async def
create_master_link( port: str, address_size: AddressSize, *, silent_interval: float = 0.005, send_queue_size: int = 1024, **kwargs) -> MasterLink:
18async def create_master_link(port: str, 19 address_size: common.AddressSize, 20 *, 21 silent_interval: float = 0.005, 22 send_queue_size: int = 1024, 23 **kwargs 24 ) -> 'MasterLink': 25 """Create unbalanced master link 26 27 Additional arguments are passed directly to 28 `hat.drivers.iec60870.link.endpoint.create`. 29 30 """ 31 if address_size == common.AddressSize.ZERO: 32 raise ValueError('unsupported address size') 33 34 link = MasterLink() 35 link._silent_interval = silent_interval 36 link._loop = asyncio.get_running_loop() 37 link._send_queue = aio.Queue(send_queue_size) 38 link._res_future = None 39 link._broadcast_address = common.get_broadcast_address(address_size) 40 41 link._endpoint = await endpoint.create(port=port, 42 address_size=address_size, 43 direction_valid=False, 44 **kwargs) 45 46 link.async_group.spawn(link._send_loop) 47 link.async_group.spawn(link._receive_loop) 48 49 return link
Create unbalanced master link
Additional arguments are passed directly to
hat.drivers.iec60870.link.endpoint.create
.
async def
create_slave_link( port: str, address_size: AddressSize, *, silent_interval: float = 0.005, **kwargs) -> SlaveLink:
20async def create_slave_link(port: str, 21 address_size: common.AddressSize, 22 *, 23 silent_interval: float = 0.005, 24 **kwargs 25 ) -> 'SlaveLink': 26 """Create unbalanced slave link 27 28 Additional arguments are passed directly to 29 `hat.drivers.iec60870.link.endpoint.create`. 30 31 """ 32 if address_size == common.AddressSize.ZERO: 33 raise ValueError('unsupported address size') 34 35 link = SlaveLink() 36 link._silent_interval = silent_interval 37 link._loop = asyncio.get_running_loop() 38 link._conns = {} 39 link._broadcast_address = common.get_broadcast_address(address_size) 40 41 link._endpoint = await endpoint.create(port=port, 42 address_size=address_size, 43 direction_valid=False, 44 **kwargs) 45 46 link.async_group.spawn(link._receive_loop) 47 48 return link
Create unbalanced slave link
Additional arguments are passed directly to
hat.drivers.iec60870.link.endpoint.create
.
class
MasterLink(hat.aio.group.Resource):
52class MasterLink(aio.Resource): 53 54 @property 55 def async_group(self): 56 return self._endpoint.async_group 57 58 async def open_connection(self, 59 addr: common.Address, 60 *, 61 response_timeout: float = 15, 62 send_retry_count: int = 3, 63 poll_class1_delay: float | None = 1, 64 poll_class2_delay: float | None = None, 65 send_queue_size: int = 1024, 66 receive_queue_size: int = 1024, 67 ) -> common.Connection: 68 if addr >= self._broadcast_address: 69 raise ValueError('unsupported address') 70 71 conn = _MasterConnection() 72 conn._addr = addr 73 conn._send_retry_count = send_retry_count 74 conn._loop = self._loop 75 conn._send_queue = aio.Queue(send_queue_size) 76 conn._receive_queue = aio.Queue(receive_queue_size) 77 conn._access_demand_event = asyncio.Event() 78 conn._async_group = self.async_group.create_subgroup() 79 80 send = functools.partial(self._send, response_timeout) 81 82 try: 83 while True: 84 with contextlib.suppress(asyncio.TimeoutError): 85 # req = common.ReqFrame( 86 # direction=None, 87 # frame_count_bit=False, 88 # frame_count_valid=False, 89 # function=common.ReqFunction.REQ_STATUS, 90 # address=addr, 91 # data=b'') 92 # res = await send(req) 93 94 # if res.function not in [common.ResFunction.ACK, 95 # common.ResFunction.RES_STATUS]: 96 # continue 97 98 req = common.ReqFrame( 99 direction=None, 100 frame_count_bit=False, 101 frame_count_valid=False, 102 function=common.ReqFunction.RESET_LINK, 103 address=addr, 104 data=b'') 105 res = await send(req) 106 107 if (isinstance(res, common.ShortFrame) or 108 (isinstance(res, common.ResFrame) and 109 res.function == common.ResFunction.ACK)): 110 # TODO maybe delay? 111 break 112 113 conn.async_group.spawn(conn._send_loop, send) 114 115 if poll_class1_delay is not None: 116 conn.async_group.spawn(conn._poll_loop_class1, 117 poll_class1_delay) 118 119 if poll_class2_delay is not None: 120 conn.async_group.spawn(conn._poll_loop_class2, 121 poll_class2_delay) 122 123 # TODO spawn status loop if polling is disabled 124 125 except BaseException: 126 await aio.uncancellable(conn.async_close()) 127 raise 128 129 return conn 130 131 async def _send(self, response_timeout, req): 132 future = self._loop.create_future() 133 try: 134 await self._send_queue.put((future, response_timeout, req)) 135 return await future 136 137 except aio.QueueClosedError: 138 raise ConnectionError() 139 140 async def _receive_loop(self): 141 try: 142 while True: 143 msg = await self._endpoint.receive() 144 145 # TODO check msg is response 146 147 if self._res_future and not self._res_future.done(): 148 self._res_future.set_result(msg) 149 150 except ConnectionError: 151 pass 152 153 except Exception as e: 154 mlog.error("read loop error: %s", e, exc_info=e) 155 156 finally: 157 self.close() 158 159 async def _send_loop(self): 160 future = None 161 last_read_time = None 162 163 try: 164 while True: 165 future, response_timeout, req = await self._send_queue.get() 166 if future.done(): 167 continue 168 169 last_read_delta = (time.monotonic() - last_read_time 170 if last_read_time is not None 171 else self._silent_interval) 172 sleep_duration = self._silent_interval - last_read_delta 173 if sleep_duration > 0: 174 await asyncio.sleep(sleep_duration) 175 176 self._res_future = self._loop.create_future() 177 178 mlog.debug("writing request %s", req.function.name) 179 await self._endpoint.send(req) 180 await self._endpoint.drain() 181 182 if (req.address == self._broadcast_address or 183 req.function == common.ReqFunction.DATA_NO_RES): 184 res = None 185 last_read_time = time.monotonic() 186 187 else: 188 try: 189 res = await aio.wait_for(self._res_future, 190 response_timeout) 191 last_read_time = time.monotonic() 192 193 except asyncio.TimeoutError as e: 194 if not future.done(): 195 future.set_exception(e) 196 197 if not future.done(): 198 future.set_result(res) 199 200 self._res_future = None 201 202 except ConnectionError: 203 pass 204 205 except Exception as e: 206 mlog.error("write loop error: %s", e, exc_info=e) 207 208 finally: 209 self.close() 210 self._send_queue.close() 211 212 while True: 213 if future and not future.done(): 214 future.set_exception(ConnectionError()) 215 216 if self._send_queue.empty(): 217 break 218 219 future, _, __ = self._send_queue.get_nowait()
Resource with lifetime control based on Group
.
async def
open_connection( self, addr: int, *, response_timeout: float = 15, send_retry_count: int = 3, poll_class1_delay: float | None = 1, poll_class2_delay: float | None = None, send_queue_size: int = 1024, receive_queue_size: int = 1024) -> Connection:
58 async def open_connection(self, 59 addr: common.Address, 60 *, 61 response_timeout: float = 15, 62 send_retry_count: int = 3, 63 poll_class1_delay: float | None = 1, 64 poll_class2_delay: float | None = None, 65 send_queue_size: int = 1024, 66 receive_queue_size: int = 1024, 67 ) -> common.Connection: 68 if addr >= self._broadcast_address: 69 raise ValueError('unsupported address') 70 71 conn = _MasterConnection() 72 conn._addr = addr 73 conn._send_retry_count = send_retry_count 74 conn._loop = self._loop 75 conn._send_queue = aio.Queue(send_queue_size) 76 conn._receive_queue = aio.Queue(receive_queue_size) 77 conn._access_demand_event = asyncio.Event() 78 conn._async_group = self.async_group.create_subgroup() 79 80 send = functools.partial(self._send, response_timeout) 81 82 try: 83 while True: 84 with contextlib.suppress(asyncio.TimeoutError): 85 # req = common.ReqFrame( 86 # direction=None, 87 # frame_count_bit=False, 88 # frame_count_valid=False, 89 # function=common.ReqFunction.REQ_STATUS, 90 # address=addr, 91 # data=b'') 92 # res = await send(req) 93 94 # if res.function not in [common.ResFunction.ACK, 95 # common.ResFunction.RES_STATUS]: 96 # continue 97 98 req = common.ReqFrame( 99 direction=None, 100 frame_count_bit=False, 101 frame_count_valid=False, 102 function=common.ReqFunction.RESET_LINK, 103 address=addr, 104 data=b'') 105 res = await send(req) 106 107 if (isinstance(res, common.ShortFrame) or 108 (isinstance(res, common.ResFrame) and 109 res.function == common.ResFunction.ACK)): 110 # TODO maybe delay? 111 break 112 113 conn.async_group.spawn(conn._send_loop, send) 114 115 if poll_class1_delay is not None: 116 conn.async_group.spawn(conn._poll_loop_class1, 117 poll_class1_delay) 118 119 if poll_class2_delay is not None: 120 conn.async_group.spawn(conn._poll_loop_class2, 121 poll_class2_delay) 122 123 # TODO spawn status loop if polling is disabled 124 125 except BaseException: 126 await aio.uncancellable(conn.async_close()) 127 raise 128 129 return conn
class
SlaveLink(hat.aio.group.Resource):
51class SlaveLink(aio.Resource): 52 53 @property 54 def async_group(self): 55 return self._endpoint.async_group 56 57 async def open_connection(self, 58 addr: common.Address, 59 *, 60 poll_class2_cb: PollClass2Cb | None = None, 61 keep_alive_timeout: float = 30, 62 receive_queue_size: int = 1024, 63 send_queue_size: int = 1024 64 ) -> common.Connection: 65 if addr >= self._broadcast_address: 66 raise ValueError('unsupported address') 67 68 if addr in self._conns: 69 raise Exception('connection already exists') 70 71 conn = _SlaveConnection() 72 conn._addr = addr 73 conn._poll_class2_cb = poll_class2_cb 74 conn._loop = self._loop 75 conn._active_future = self._loop.create_future() 76 conn._frame_count_bit = None 77 conn._res = None 78 conn._keep_alive_event = asyncio.Event() 79 conn._send_queue = aio.Queue(send_queue_size) 80 conn._receive_queue = aio.Queue(receive_queue_size) 81 conn._async_group = self.async_group.create_subgroup() 82 83 conn.async_group.spawn(aio.call_on_cancel, conn._send_queue.close) 84 conn.async_group.spawn(aio.call_on_cancel, conn._receive_queue.close) 85 86 conn.async_group.spawn(aio.call_on_cancel, self._conns.pop, addr, 87 None) 88 self._conns[addr] = conn 89 90 conn.async_group.spawn(conn._keep_alive_loop, keep_alive_timeout) 91 92 try: 93 await conn._active_future 94 95 except BaseException: 96 await aio.uncancellable(conn.async_close()) 97 raise 98 99 return conn 100 101 async def _receive_loop(self): 102 try: 103 while True: 104 req = await self._endpoint.receive() 105 last_rw_time = time.monotonic() 106 107 if not isinstance(req, common.ReqFrame): 108 continue 109 110 if req.address == self._broadcast_address: 111 # TODO maybe filter conns that have active_future set 112 conns = list(self._conns.values()) 113 114 elif req.address in self._conns: 115 conns = [self._conns[req.address]] 116 117 else: 118 continue 119 120 for conn in conns: 121 if not conn.is_open: 122 continue 123 124 res, sent_cb = await conn._process(req) 125 126 if (req.address == self._broadcast_address or 127 req.function == common.ReqFunction.DATA_NO_RES): 128 continue 129 130 last_rw_delta = time.monotonic() - last_rw_time 131 sleep_duration = self._silent_interval - last_rw_delta 132 if sleep_duration > 0: 133 await asyncio.sleep(sleep_duration) 134 135 await self._endpoint.send(res) 136 last_rw_time = time.monotonic() 137 138 if sent_cb: 139 await aio.call(sent_cb) 140 141 except ConnectionError: 142 pass 143 144 except Exception as e: 145 mlog.warning("receive loop error: %s", e, exc_info=e) 146 147 finally: 148 self.close()
Resource with lifetime control based on Group
.
async def
open_connection( self, addr: int, *, poll_class2_cb: Optional[Callable[[Connection], bytes | bytearray | memoryview | None | Awaitable[bytes | bytearray | memoryview | None]]] = None, keep_alive_timeout: float = 30, receive_queue_size: int = 1024, send_queue_size: int = 1024) -> Connection:
57 async def open_connection(self, 58 addr: common.Address, 59 *, 60 poll_class2_cb: PollClass2Cb | None = None, 61 keep_alive_timeout: float = 30, 62 receive_queue_size: int = 1024, 63 send_queue_size: int = 1024 64 ) -> common.Connection: 65 if addr >= self._broadcast_address: 66 raise ValueError('unsupported address') 67 68 if addr in self._conns: 69 raise Exception('connection already exists') 70 71 conn = _SlaveConnection() 72 conn._addr = addr 73 conn._poll_class2_cb = poll_class2_cb 74 conn._loop = self._loop 75 conn._active_future = self._loop.create_future() 76 conn._frame_count_bit = None 77 conn._res = None 78 conn._keep_alive_event = asyncio.Event() 79 conn._send_queue = aio.Queue(send_queue_size) 80 conn._receive_queue = aio.Queue(receive_queue_size) 81 conn._async_group = self.async_group.create_subgroup() 82 83 conn.async_group.spawn(aio.call_on_cancel, conn._send_queue.close) 84 conn.async_group.spawn(aio.call_on_cancel, conn._receive_queue.close) 85 86 conn.async_group.spawn(aio.call_on_cancel, self._conns.pop, addr, 87 None) 88 self._conns[addr] = conn 89 90 conn.async_group.spawn(conn._keep_alive_loop, keep_alive_timeout) 91 92 try: 93 await conn._active_future 94 95 except BaseException: 96 await aio.uncancellable(conn.async_close()) 97 raise 98 99 return conn
async def
create_balanced_link( port: str, address_size: AddressSize, *, silent_interval: float = 0.005, send_queue_size: int = 1024, **kwargs) -> BalancedLink:
17async def create_balanced_link(port: str, 18 address_size: common.AddressSize, 19 *, 20 silent_interval: float = 0.005, 21 send_queue_size: int = 1024, 22 **kwargs 23 ) -> 'BalancedLink': 24 link = BalancedLink() 25 link._address_size = address_size 26 link._loop = asyncio.get_running_loop() 27 link._conns = {} 28 link._res_future = None 29 link._send_queue = aio.Queue(send_queue_size) 30 31 link._endpoint = await endpoint.create(port=port, 32 address_size=address_size, 33 direction_valid=True, 34 silent_interval=silent_interval, 35 **kwargs) 36 37 link.async_group.spawn(link._send_loop) 38 link.async_group.spawn(link._receive_loop) 39 40 return link
class
BalancedLink(hat.aio.group.Resource):
43class BalancedLink(aio.Resource): 44 45 @property 46 def async_group(self): 47 return self._endpoint.async_group 48 49 async def open_connection(self, 50 direction: common.Direction, 51 addr: common.Address, 52 *, 53 response_timeout: float = 15, 54 send_retry_count: int = 3, 55 status_delay: float = 5, 56 receive_queue_size: int = 1024, 57 send_queue_size: int = 1024 58 ) -> common.Connection: 59 if addr >= (1 << (self._address_size.value * 8)): 60 raise ValueError('unsupported address') 61 62 conn_key = direction, addr 63 if conn_key in self._conns: 64 raise Exception('connection already exists') 65 66 conn = _BalancedConnection() 67 conn._direction = direction 68 conn._addr = addr 69 conn._send_retry_count = send_retry_count 70 conn._loop = self._loop 71 conn._send_event = asyncio.Event() 72 conn._receive_queue = aio.Queue(receive_queue_size) 73 conn._send_queue = aio.Queue(send_queue_size) 74 conn._frame_count_bit = None 75 conn._res = None 76 conn._async_group = self.async_group.create_subgroup() 77 78 conn.async_group.spawn(aio.call_on_cancel, self._conns.pop, conn_key, 79 None) 80 self._conns[conn_key] = conn 81 82 send = functools.partial(self._send, response_timeout) 83 84 try: 85 while True: 86 with contextlib.suppress(asyncio.TimeoutError): 87 # req = common.ReqFrame( 88 # direction=direction, 89 # frame_count_bit=False, 90 # frame_count_valid=False, 91 # function=common.ReqFunction.REQ_STATUS, 92 # address=addr, 93 # data=b'') 94 # res = await self._send(req) 95 96 # if res.function not in [common.ResFunction.ACK, 97 # common.ResFunction.RES_STATUS]: 98 # continue 99 100 req = common.ReqFrame( 101 direction=direction, 102 frame_count_bit=False, 103 frame_count_valid=False, 104 function=common.ReqFunction.RESET_LINK, 105 address=addr, 106 data=b'') 107 res = await send(req) 108 109 if (isinstance(res, common.ShortFrame) or 110 (isinstance(res, common.ResFrame) and 111 res.function == common.ResFunction.ACK)): 112 break 113 114 conn.async_group.spawn(conn._send_loop, send) 115 conn.async_group.spawn(conn._status_loop, status_delay) 116 117 except BaseException: 118 await aio.uncancellable(conn.async_close()) 119 raise 120 121 return conn 122 123 async def _send(self, response_timeout, req): 124 future = self._loop.create_future() 125 try: 126 await self._send_queue.put((future, response_timeout, req)) 127 return await future 128 129 except aio.QueueClosedError: 130 raise ConnectionError() 131 132 async def _process(self, req): 133 conn_key = _invert_direction(req.direction), req.address 134 conn = self._conns.get(conn_key) 135 if not conn or not conn.is_open: 136 return 137 138 res = await conn._process(req) 139 140 if req.function == common.ReqFunction.DATA_NO_RES: 141 return 142 143 await self._endpoint.send(res) 144 145 async def _receive_loop(self): 146 try: 147 while True: 148 msg = await self._endpoint.receive() 149 150 if isinstance(msg, common.ReqFrame): 151 await self._process(msg) 152 153 elif isinstance(msg, (common.ResFrame, common.ShortFrame)): 154 if self._res_future and not self._res_future.done(): 155 self._res_future.set_result(msg) 156 157 else: 158 raise TypeError('unsupported frame type') 159 160 except ConnectionError: 161 pass 162 163 except Exception as e: 164 mlog.error("receive loop error: %s", e, exc_info=e) 165 166 finally: 167 self.close() 168 169 async def _send_loop(self): 170 future = None 171 172 try: 173 while True: 174 future, response_timeout, req = await self._send_queue.get() 175 if future.done(): 176 continue 177 178 self._res_future = self._loop.create_future() 179 180 mlog.debug("writing request %s", req.function.name) 181 await self._endpoint.send(req) 182 await self._endpoint.drain() 183 184 if req.function == common.ReqFunction.DATA_NO_RES: 185 res = None 186 187 else: 188 try: 189 res = await aio.wait_for(self._res_future, 190 response_timeout) 191 192 except asyncio.TimeoutError as e: 193 if not future.done(): 194 future.set_exception(e) 195 196 if not future.done(): 197 future.set_result(res) 198 199 self._res_future = None 200 201 except ConnectionError: 202 pass 203 204 except Exception as e: 205 mlog.error("send loop error: %s", e, exc_info=e) 206 207 finally: 208 self.close() 209 self._send_queue.close() 210 211 while True: 212 if future and not future.done(): 213 future.set_exception(ConnectionError()) 214 215 if self._send_queue.empty(): 216 break 217 218 future, _, __ = self._send_queue.get_nowait()
Resource with lifetime control based on Group
.
async def
open_connection( self, direction: Direction, addr: int, *, response_timeout: float = 15, send_retry_count: int = 3, status_delay: float = 5, receive_queue_size: int = 1024, send_queue_size: int = 1024) -> Connection:
49 async def open_connection(self, 50 direction: common.Direction, 51 addr: common.Address, 52 *, 53 response_timeout: float = 15, 54 send_retry_count: int = 3, 55 status_delay: float = 5, 56 receive_queue_size: int = 1024, 57 send_queue_size: int = 1024 58 ) -> common.Connection: 59 if addr >= (1 << (self._address_size.value * 8)): 60 raise ValueError('unsupported address') 61 62 conn_key = direction, addr 63 if conn_key in self._conns: 64 raise Exception('connection already exists') 65 66 conn = _BalancedConnection() 67 conn._direction = direction 68 conn._addr = addr 69 conn._send_retry_count = send_retry_count 70 conn._loop = self._loop 71 conn._send_event = asyncio.Event() 72 conn._receive_queue = aio.Queue(receive_queue_size) 73 conn._send_queue = aio.Queue(send_queue_size) 74 conn._frame_count_bit = None 75 conn._res = None 76 conn._async_group = self.async_group.create_subgroup() 77 78 conn.async_group.spawn(aio.call_on_cancel, self._conns.pop, conn_key, 79 None) 80 self._conns[conn_key] = conn 81 82 send = functools.partial(self._send, response_timeout) 83 84 try: 85 while True: 86 with contextlib.suppress(asyncio.TimeoutError): 87 # req = common.ReqFrame( 88 # direction=direction, 89 # frame_count_bit=False, 90 # frame_count_valid=False, 91 # function=common.ReqFunction.REQ_STATUS, 92 # address=addr, 93 # data=b'') 94 # res = await self._send(req) 95 96 # if res.function not in [common.ResFunction.ACK, 97 # common.ResFunction.RES_STATUS]: 98 # continue 99 100 req = common.ReqFrame( 101 direction=direction, 102 frame_count_bit=False, 103 frame_count_valid=False, 104 function=common.ReqFunction.RESET_LINK, 105 address=addr, 106 data=b'') 107 res = await send(req) 108 109 if (isinstance(res, common.ShortFrame) or 110 (isinstance(res, common.ResFrame) and 111 res.function == common.ResFunction.ACK)): 112 break 113 114 conn.async_group.spawn(conn._send_loop, send) 115 conn.async_group.spawn(conn._status_loop, status_delay) 116 117 except BaseException: 118 await aio.uncancellable(conn.async_close()) 119 raise 120 121 return conn