hat.drivers.iec60870.link
IEC 60870-5 link layer
1"""IEC 60870-5 link layer""" 2 3from hat.drivers.iec60870.link.common import (Address, 4 AddressSize, 5 Direction, 6 ConnectionInfo, 7 Connection) 8from hat.drivers.iec60870.link.unbalanced import (PollClass2Cb, 9 create_master_link, 10 create_slave_link, 11 MasterLink, 12 SlaveLink) 13from hat.drivers.iec60870.link.balanced import (create_balanced_link, 14 BalancedLink) 15 16 17__all__ = ['Address', 18 'AddressSize', 19 'Direction', 20 'ConnectionInfo', 21 'Connection', 22 'PollClass2Cb', 23 'create_master_link', 24 'create_slave_link', 25 'MasterLink', 26 'SlaveLink', 27 'create_balanced_link', 28 'BalancedLink']
Address =
<class 'int'>
class
AddressSize(enum.Enum):
ZERO =
<AddressSize.ZERO: 0>
ONE =
<AddressSize.ONE: 1>
TWO =
<AddressSize.TWO: 2>
class
Direction(enum.Enum):
B_TO_A =
<Direction.B_TO_A: 0>
A_TO_B =
<Direction.A_TO_B: 1>
class
ConnectionInfo(typing.NamedTuple):
ConnectionInfo(name, port, address)
class
Connection(hat.aio.group.Resource):
80class Connection(aio.Resource): 81 82 @property 83 @abc.abstractmethod 84 def info(self) -> ConnectionInfo: 85 pass 86 87 @abc.abstractmethod 88 async def send(self, 89 data: util.Bytes, 90 sent_cb: aio.AsyncCallable[[], None] | None = None): 91 pass 92 93 @abc.abstractmethod 94 async def receive(self) -> util.Bytes: 95 pass
Resource with lifetime control based on Group.
info: ConnectionInfo
PollClass2Cb =
typing.Callable[[Connection], bytes | bytearray | memoryview | None | collections.abc.Awaitable[bytes | bytearray | memoryview | None]]
async def
create_master_link( port: str, address_size: AddressSize, *, silent_interval: float = 0.005, send_queue_size: int = 1024, **kwargs) -> MasterLink:
19async def create_master_link(port: str, 20 address_size: common.AddressSize, 21 *, 22 silent_interval: float = 0.005, 23 send_queue_size: int = 1024, 24 **kwargs 25 ) -> 'MasterLink': 26 """Create unbalanced master link 27 28 Additional arguments are passed directly to 29 `hat.drivers.iec60870.link.endpoint.create`. 30 31 """ 32 if address_size == common.AddressSize.ZERO: 33 raise ValueError('unsupported address size') 34 35 link = MasterLink() 36 link._silent_interval = silent_interval 37 link._loop = asyncio.get_running_loop() 38 link._send_queue = aio.Queue(send_queue_size) 39 link._res_future = None 40 link._broadcast_address = common.get_broadcast_address(address_size) 41 42 link._endpoint = await endpoint.create(port=port, 43 address_size=address_size, 44 direction_valid=False, 45 **kwargs) 46 47 link._log = logger.create_logger(mlog, link._endpoint.info) 48 49 link.async_group.spawn(link._send_loop) 50 link.async_group.spawn(link._receive_loop) 51 52 return link
Create unbalanced master link
Additional arguments are passed directly to
hat.drivers.iec60870.link.endpoint.create.
async def
create_slave_link( port: str, address_size: AddressSize, *, silent_interval: float = 0.005, **kwargs) -> SlaveLink:
21async def create_slave_link(port: str, 22 address_size: common.AddressSize, 23 *, 24 silent_interval: float = 0.005, 25 **kwargs 26 ) -> 'SlaveLink': 27 """Create unbalanced slave link 28 29 Additional arguments are passed directly to 30 `hat.drivers.iec60870.link.endpoint.create`. 31 32 """ 33 if address_size == common.AddressSize.ZERO: 34 raise ValueError('unsupported address size') 35 36 link = SlaveLink() 37 link._silent_interval = silent_interval 38 link._loop = asyncio.get_running_loop() 39 link._conns = {} 40 link._broadcast_address = common.get_broadcast_address(address_size) 41 42 link._endpoint = await endpoint.create(port=port, 43 address_size=address_size, 44 direction_valid=False, 45 **kwargs) 46 47 link._log = logger.create_logger(mlog, link._endpoint.info) 48 49 link.async_group.spawn(link._receive_loop) 50 51 return link
Create unbalanced slave link
Additional arguments are passed directly to
hat.drivers.iec60870.link.endpoint.create.
class
MasterLink(hat.aio.group.Resource):
55class MasterLink(aio.Resource): 56 57 @property 58 def async_group(self): 59 return self._endpoint.async_group 60 61 async def open_connection(self, 62 addr: common.Address, 63 *, 64 name: str | None = None, 65 response_timeout: float = 15, 66 send_retry_count: int = 3, 67 poll_class1_delay: float | None = 1, 68 poll_class2_delay: float | None = None, 69 send_queue_size: int = 1024, 70 receive_queue_size: int = 1024, 71 ) -> common.Connection: 72 if addr >= self._broadcast_address: 73 raise ValueError('unsupported address') 74 75 conn = MasterConnection() 76 conn._send_retry_count = send_retry_count 77 conn._loop = self._loop 78 conn._send_queue = aio.Queue(send_queue_size) 79 conn._receive_queue = aio.Queue(receive_queue_size) 80 conn._access_demand_event = asyncio.Event() 81 conn._async_group = self.async_group.create_subgroup() 82 conn._info = common.ConnectionInfo(name=name, 83 port=self._endpoint.info.port, 84 address=addr) 85 conn._log = logger.create_connection_logger(mlog, conn._info) 86 87 send = functools.partial(self._send, response_timeout) 88 89 try: 90 while True: 91 with contextlib.suppress(asyncio.TimeoutError): 92 # req = common.ReqFrame( 93 # direction=None, 94 # frame_count_bit=False, 95 # frame_count_valid=False, 96 # function=common.ReqFunction.REQ_STATUS, 97 # address=addr, 98 # data=b'') 99 # res = await send(req) 100 101 # if res.function not in [common.ResFunction.ACK, 102 # common.ResFunction.RES_STATUS]: 103 # continue 104 105 req = common.ReqFrame( 106 direction=None, 107 frame_count_bit=False, 108 frame_count_valid=False, 109 function=common.ReqFunction.RESET_LINK, 110 address=addr, 111 data=b'') 112 res = await send(req) 113 114 if (isinstance(res, common.ShortFrame) or 115 (isinstance(res, common.ResFrame) and 116 res.function == common.ResFunction.ACK)): 117 # TODO maybe delay? 118 break 119 120 conn.async_group.spawn(conn._send_loop, send) 121 122 if poll_class1_delay is not None: 123 conn.async_group.spawn(conn._poll_loop_class1, 124 poll_class1_delay) 125 126 if poll_class2_delay is not None: 127 conn.async_group.spawn(conn._poll_loop_class2, 128 poll_class2_delay) 129 130 # TODO spawn status loop if polling is disabled 131 132 except BaseException: 133 await aio.uncancellable(conn.async_close()) 134 raise 135 136 return conn 137 138 async def _send(self, response_timeout, req): 139 future = self._loop.create_future() 140 try: 141 await self._send_queue.put((future, response_timeout, req)) 142 return await future 143 144 except aio.QueueClosedError: 145 raise ConnectionError() 146 147 async def _receive_loop(self): 148 try: 149 while True: 150 msg = await self._endpoint.receive() 151 152 # TODO check msg is response 153 154 if self._res_future and not self._res_future.done(): 155 self._res_future.set_result(msg) 156 157 except ConnectionError: 158 pass 159 160 except Exception as e: 161 self._log.error("read loop error: %s", e, exc_info=e) 162 163 finally: 164 self.close() 165 166 async def _send_loop(self): 167 future = None 168 last_read_time = None 169 170 try: 171 while True: 172 future, response_timeout, req = await self._send_queue.get() 173 if future.done(): 174 continue 175 176 last_read_delta = (time.monotonic() - last_read_time 177 if last_read_time is not None 178 else self._silent_interval) 179 sleep_duration = self._silent_interval - last_read_delta 180 if sleep_duration > 0: 181 await asyncio.sleep(sleep_duration) 182 183 self._res_future = self._loop.create_future() 184 185 self._log.debug("writing request %s", req.function.name) 186 await self._endpoint.send(req) 187 await self._endpoint.drain() 188 189 if (req.address == self._broadcast_address or 190 req.function == common.ReqFunction.DATA_NO_RES): 191 res = None 192 last_read_time = time.monotonic() 193 194 else: 195 try: 196 res = await aio.wait_for(self._res_future, 197 response_timeout) 198 last_read_time = time.monotonic() 199 200 except asyncio.TimeoutError as e: 201 if not future.done(): 202 future.set_exception(e) 203 204 if not future.done(): 205 future.set_result(res) 206 207 self._res_future = None 208 209 except ConnectionError: 210 pass 211 212 except Exception as e: 213 self._log.error("write loop error: %s", e, exc_info=e) 214 215 finally: 216 self.close() 217 self._send_queue.close() 218 219 while True: 220 if future and not future.done(): 221 future.set_exception(ConnectionError()) 222 223 if self._send_queue.empty(): 224 break 225 226 future, _, __ = self._send_queue.get_nowait()
Resource with lifetime control based on Group.
async def
open_connection( self, addr: int, *, name: str | None = None, response_timeout: float = 15, send_retry_count: int = 3, poll_class1_delay: float | None = 1, poll_class2_delay: float | None = None, send_queue_size: int = 1024, receive_queue_size: int = 1024) -> Connection:
61 async def open_connection(self, 62 addr: common.Address, 63 *, 64 name: str | None = None, 65 response_timeout: float = 15, 66 send_retry_count: int = 3, 67 poll_class1_delay: float | None = 1, 68 poll_class2_delay: float | None = None, 69 send_queue_size: int = 1024, 70 receive_queue_size: int = 1024, 71 ) -> common.Connection: 72 if addr >= self._broadcast_address: 73 raise ValueError('unsupported address') 74 75 conn = MasterConnection() 76 conn._send_retry_count = send_retry_count 77 conn._loop = self._loop 78 conn._send_queue = aio.Queue(send_queue_size) 79 conn._receive_queue = aio.Queue(receive_queue_size) 80 conn._access_demand_event = asyncio.Event() 81 conn._async_group = self.async_group.create_subgroup() 82 conn._info = common.ConnectionInfo(name=name, 83 port=self._endpoint.info.port, 84 address=addr) 85 conn._log = logger.create_connection_logger(mlog, conn._info) 86 87 send = functools.partial(self._send, response_timeout) 88 89 try: 90 while True: 91 with contextlib.suppress(asyncio.TimeoutError): 92 # req = common.ReqFrame( 93 # direction=None, 94 # frame_count_bit=False, 95 # frame_count_valid=False, 96 # function=common.ReqFunction.REQ_STATUS, 97 # address=addr, 98 # data=b'') 99 # res = await send(req) 100 101 # if res.function not in [common.ResFunction.ACK, 102 # common.ResFunction.RES_STATUS]: 103 # continue 104 105 req = common.ReqFrame( 106 direction=None, 107 frame_count_bit=False, 108 frame_count_valid=False, 109 function=common.ReqFunction.RESET_LINK, 110 address=addr, 111 data=b'') 112 res = await send(req) 113 114 if (isinstance(res, common.ShortFrame) or 115 (isinstance(res, common.ResFrame) and 116 res.function == common.ResFunction.ACK)): 117 # TODO maybe delay? 118 break 119 120 conn.async_group.spawn(conn._send_loop, send) 121 122 if poll_class1_delay is not None: 123 conn.async_group.spawn(conn._poll_loop_class1, 124 poll_class1_delay) 125 126 if poll_class2_delay is not None: 127 conn.async_group.spawn(conn._poll_loop_class2, 128 poll_class2_delay) 129 130 # TODO spawn status loop if polling is disabled 131 132 except BaseException: 133 await aio.uncancellable(conn.async_close()) 134 raise 135 136 return conn
class
SlaveLink(hat.aio.group.Resource):
54class SlaveLink(aio.Resource): 55 56 @property 57 def async_group(self): 58 return self._endpoint.async_group 59 60 async def open_connection(self, 61 addr: common.Address, 62 *, 63 name: str | None = None, 64 poll_class2_cb: PollClass2Cb | None = None, 65 keep_alive_timeout: float = 30, 66 receive_queue_size: int = 1024, 67 send_queue_size: int = 1024 68 ) -> common.Connection: 69 if addr >= self._broadcast_address: 70 raise ValueError('unsupported address') 71 72 if addr in self._conns: 73 raise Exception('connection already exists') 74 75 conn = SlaveConnection() 76 conn._poll_class2_cb = poll_class2_cb 77 conn._loop = self._loop 78 conn._active_future = self._loop.create_future() 79 conn._frame_count_bit = None 80 conn._res = None 81 conn._keep_alive_event = asyncio.Event() 82 conn._send_queue = aio.Queue(send_queue_size) 83 conn._receive_queue = aio.Queue(receive_queue_size) 84 conn._async_group = self.async_group.create_subgroup() 85 conn._info = common.ConnectionInfo(name=name, 86 port=self._endpoint.info.port, 87 address=addr) 88 conn._log = logger.create_connection_logger(mlog, conn._info) 89 90 conn.async_group.spawn(aio.call_on_cancel, conn._send_queue.close) 91 conn.async_group.spawn(aio.call_on_cancel, conn._receive_queue.close) 92 93 conn.async_group.spawn(aio.call_on_cancel, self._conns.pop, addr, 94 None) 95 self._conns[addr] = conn 96 97 conn.async_group.spawn(conn._keep_alive_loop, keep_alive_timeout) 98 99 try: 100 await conn._active_future 101 102 except BaseException: 103 await aio.uncancellable(conn.async_close()) 104 raise 105 106 return conn 107 108 async def _receive_loop(self): 109 try: 110 while True: 111 req = await self._endpoint.receive() 112 last_rw_time = time.monotonic() 113 114 if not isinstance(req, common.ReqFrame): 115 continue 116 117 if req.address == self._broadcast_address: 118 # TODO maybe filter conns that have active_future set 119 conns = list(self._conns.values()) 120 121 elif req.address in self._conns: 122 conns = [self._conns[req.address]] 123 124 else: 125 continue 126 127 for conn in conns: 128 if not conn.is_open: 129 continue 130 131 res, sent_cb = await conn._process(req) 132 133 if (req.address == self._broadcast_address or 134 req.function == common.ReqFunction.DATA_NO_RES): 135 continue 136 137 last_rw_delta = time.monotonic() - last_rw_time 138 sleep_duration = self._silent_interval - last_rw_delta 139 if sleep_duration > 0: 140 await asyncio.sleep(sleep_duration) 141 142 await self._endpoint.send(res) 143 last_rw_time = time.monotonic() 144 145 if sent_cb: 146 await aio.call(sent_cb) 147 148 except ConnectionError: 149 pass 150 151 except Exception as e: 152 self._log.warning("receive loop error: %s", e, exc_info=e) 153 154 finally: 155 self.close()
Resource with lifetime control based on Group.
async def
open_connection( self, addr: int, *, name: str | None = None, poll_class2_cb: Optional[Callable[[Connection], bytes | bytearray | memoryview | None | Awaitable[bytes | bytearray | memoryview | None]]] = None, keep_alive_timeout: float = 30, receive_queue_size: int = 1024, send_queue_size: int = 1024) -> Connection:
60 async def open_connection(self, 61 addr: common.Address, 62 *, 63 name: str | None = None, 64 poll_class2_cb: PollClass2Cb | None = None, 65 keep_alive_timeout: float = 30, 66 receive_queue_size: int = 1024, 67 send_queue_size: int = 1024 68 ) -> common.Connection: 69 if addr >= self._broadcast_address: 70 raise ValueError('unsupported address') 71 72 if addr in self._conns: 73 raise Exception('connection already exists') 74 75 conn = SlaveConnection() 76 conn._poll_class2_cb = poll_class2_cb 77 conn._loop = self._loop 78 conn._active_future = self._loop.create_future() 79 conn._frame_count_bit = None 80 conn._res = None 81 conn._keep_alive_event = asyncio.Event() 82 conn._send_queue = aio.Queue(send_queue_size) 83 conn._receive_queue = aio.Queue(receive_queue_size) 84 conn._async_group = self.async_group.create_subgroup() 85 conn._info = common.ConnectionInfo(name=name, 86 port=self._endpoint.info.port, 87 address=addr) 88 conn._log = logger.create_connection_logger(mlog, conn._info) 89 90 conn.async_group.spawn(aio.call_on_cancel, conn._send_queue.close) 91 conn.async_group.spawn(aio.call_on_cancel, conn._receive_queue.close) 92 93 conn.async_group.spawn(aio.call_on_cancel, self._conns.pop, addr, 94 None) 95 self._conns[addr] = conn 96 97 conn.async_group.spawn(conn._keep_alive_loop, keep_alive_timeout) 98 99 try: 100 await conn._active_future 101 102 except BaseException: 103 await aio.uncancellable(conn.async_close()) 104 raise 105 106 return conn
async def
create_balanced_link( port: str, address_size: AddressSize, *, silent_interval: float = 0.005, send_queue_size: int = 1024, **kwargs) -> BalancedLink:
18async def create_balanced_link(port: str, 19 address_size: common.AddressSize, 20 *, 21 silent_interval: float = 0.005, 22 send_queue_size: int = 1024, 23 **kwargs 24 ) -> 'BalancedLink': 25 link = BalancedLink() 26 link._address_size = address_size 27 link._loop = asyncio.get_running_loop() 28 link._conns = {} 29 link._res_future = None 30 link._send_queue = aio.Queue(send_queue_size) 31 32 link._endpoint = await endpoint.create(port=port, 33 address_size=address_size, 34 direction_valid=True, 35 silent_interval=silent_interval, 36 **kwargs) 37 38 link._log = logger.create_logger(mlog, link._endpoint.info) 39 40 link.async_group.spawn(link._send_loop) 41 link.async_group.spawn(link._receive_loop) 42 43 return link
class
BalancedLink(hat.aio.group.Resource):
46class BalancedLink(aio.Resource): 47 48 @property 49 def async_group(self): 50 return self._endpoint.async_group 51 52 async def open_connection(self, 53 direction: common.Direction, 54 addr: common.Address, 55 *, 56 name: str | None = None, 57 response_timeout: float = 15, 58 send_retry_count: int = 3, 59 status_delay: float = 5, 60 receive_queue_size: int = 1024, 61 send_queue_size: int = 1024 62 ) -> common.Connection: 63 if addr >= (1 << (self._address_size.value * 8)): 64 raise ValueError('unsupported address') 65 66 conn_key = direction, addr 67 if conn_key in self._conns: 68 raise Exception('connection already exists') 69 70 conn = BalancedConnection() 71 conn._direction = direction 72 conn._send_retry_count = send_retry_count 73 conn._loop = self._loop 74 conn._send_event = asyncio.Event() 75 conn._receive_queue = aio.Queue(receive_queue_size) 76 conn._send_queue = aio.Queue(send_queue_size) 77 conn._frame_count_bit = None 78 conn._res = None 79 conn._async_group = self.async_group.create_subgroup() 80 conn._info = common.ConnectionInfo(name=name, 81 port=self._endpoint.info.port, 82 address=addr) 83 conn._log = logger.create_connection_logger(mlog, conn._info) 84 85 conn.async_group.spawn(aio.call_on_cancel, self._conns.pop, conn_key, 86 None) 87 self._conns[conn_key] = conn 88 89 send = functools.partial(self._send, response_timeout) 90 91 try: 92 while True: 93 with contextlib.suppress(asyncio.TimeoutError): 94 # req = common.ReqFrame( 95 # direction=direction, 96 # frame_count_bit=False, 97 # frame_count_valid=False, 98 # function=common.ReqFunction.REQ_STATUS, 99 # address=addr, 100 # data=b'') 101 # res = await self._send(req) 102 103 # if res.function not in [common.ResFunction.ACK, 104 # common.ResFunction.RES_STATUS]: 105 # continue 106 107 req = common.ReqFrame( 108 direction=direction, 109 frame_count_bit=False, 110 frame_count_valid=False, 111 function=common.ReqFunction.RESET_LINK, 112 address=addr, 113 data=b'') 114 res = await send(req) 115 116 if (isinstance(res, common.ShortFrame) or 117 (isinstance(res, common.ResFrame) and 118 res.function == common.ResFunction.ACK)): 119 break 120 121 conn.async_group.spawn(conn._send_loop, send) 122 conn.async_group.spawn(conn._status_loop, status_delay) 123 124 except BaseException: 125 await aio.uncancellable(conn.async_close()) 126 raise 127 128 return conn 129 130 async def _send(self, response_timeout, req): 131 future = self._loop.create_future() 132 try: 133 await self._send_queue.put((future, response_timeout, req)) 134 return await future 135 136 except aio.QueueClosedError: 137 raise ConnectionError() 138 139 async def _process(self, req): 140 conn_key = _invert_direction(req.direction), req.address 141 conn = self._conns.get(conn_key) 142 if not conn or not conn.is_open: 143 return 144 145 res = await conn._process(req) 146 147 if req.function == common.ReqFunction.DATA_NO_RES: 148 return 149 150 await self._endpoint.send(res) 151 152 async def _receive_loop(self): 153 try: 154 while True: 155 msg = await self._endpoint.receive() 156 157 if isinstance(msg, common.ReqFrame): 158 await self._process(msg) 159 160 elif isinstance(msg, (common.ResFrame, common.ShortFrame)): 161 if self._res_future and not self._res_future.done(): 162 self._res_future.set_result(msg) 163 164 else: 165 raise TypeError('unsupported frame type') 166 167 except ConnectionError: 168 pass 169 170 except Exception as e: 171 self._log.error("receive loop error: %s", e, exc_info=e) 172 173 finally: 174 self.close() 175 176 async def _send_loop(self): 177 future = None 178 179 try: 180 while True: 181 future, response_timeout, req = await self._send_queue.get() 182 if future.done(): 183 continue 184 185 self._res_future = self._loop.create_future() 186 187 self._log.debug("writing request %s", req.function.name) 188 await self._endpoint.send(req) 189 await self._endpoint.drain() 190 191 if req.function == common.ReqFunction.DATA_NO_RES: 192 res = None 193 194 else: 195 try: 196 res = await aio.wait_for(self._res_future, 197 response_timeout) 198 199 except asyncio.TimeoutError as e: 200 if not future.done(): 201 future.set_exception(e) 202 203 if not future.done(): 204 future.set_result(res) 205 206 self._res_future = None 207 208 except ConnectionError: 209 pass 210 211 except Exception as e: 212 self._log.error("send loop error: %s", e, exc_info=e) 213 214 finally: 215 self.close() 216 self._send_queue.close() 217 218 while True: 219 if future and not future.done(): 220 future.set_exception(ConnectionError()) 221 222 if self._send_queue.empty(): 223 break 224 225 future, _, __ = self._send_queue.get_nowait()
Resource with lifetime control based on Group.
async def
open_connection( self, direction: Direction, addr: int, *, name: str | None = None, response_timeout: float = 15, send_retry_count: int = 3, status_delay: float = 5, receive_queue_size: int = 1024, send_queue_size: int = 1024) -> Connection:
52 async def open_connection(self, 53 direction: common.Direction, 54 addr: common.Address, 55 *, 56 name: str | None = None, 57 response_timeout: float = 15, 58 send_retry_count: int = 3, 59 status_delay: float = 5, 60 receive_queue_size: int = 1024, 61 send_queue_size: int = 1024 62 ) -> common.Connection: 63 if addr >= (1 << (self._address_size.value * 8)): 64 raise ValueError('unsupported address') 65 66 conn_key = direction, addr 67 if conn_key in self._conns: 68 raise Exception('connection already exists') 69 70 conn = BalancedConnection() 71 conn._direction = direction 72 conn._send_retry_count = send_retry_count 73 conn._loop = self._loop 74 conn._send_event = asyncio.Event() 75 conn._receive_queue = aio.Queue(receive_queue_size) 76 conn._send_queue = aio.Queue(send_queue_size) 77 conn._frame_count_bit = None 78 conn._res = None 79 conn._async_group = self.async_group.create_subgroup() 80 conn._info = common.ConnectionInfo(name=name, 81 port=self._endpoint.info.port, 82 address=addr) 83 conn._log = logger.create_connection_logger(mlog, conn._info) 84 85 conn.async_group.spawn(aio.call_on_cancel, self._conns.pop, conn_key, 86 None) 87 self._conns[conn_key] = conn 88 89 send = functools.partial(self._send, response_timeout) 90 91 try: 92 while True: 93 with contextlib.suppress(asyncio.TimeoutError): 94 # req = common.ReqFrame( 95 # direction=direction, 96 # frame_count_bit=False, 97 # frame_count_valid=False, 98 # function=common.ReqFunction.REQ_STATUS, 99 # address=addr, 100 # data=b'') 101 # res = await self._send(req) 102 103 # if res.function not in [common.ResFunction.ACK, 104 # common.ResFunction.RES_STATUS]: 105 # continue 106 107 req = common.ReqFrame( 108 direction=direction, 109 frame_count_bit=False, 110 frame_count_valid=False, 111 function=common.ReqFunction.RESET_LINK, 112 address=addr, 113 data=b'') 114 res = await send(req) 115 116 if (isinstance(res, common.ShortFrame) or 117 (isinstance(res, common.ResFrame) and 118 res.function == common.ResFunction.ACK)): 119 break 120 121 conn.async_group.spawn(conn._send_loop, send) 122 conn.async_group.spawn(conn._status_loop, status_delay) 123 124 except BaseException: 125 await aio.uncancellable(conn.async_close()) 126 raise 127 128 return conn