hat.drivers.iec60870.link

IEC 60870-5 link layer

 1"""IEC 60870-5 link layer"""
 2
 3from hat.drivers.iec60870.link.common import (Address,
 4                                              AddressSize,
 5                                              Connection,
 6                                              Direction)
 7from hat.drivers.iec60870.link.unbalanced import (PollClass2Cb,
 8                                                  create_master_link,
 9                                                  create_slave_link,
10                                                  MasterLink,
11                                                  SlaveLink)
12from hat.drivers.iec60870.link.balanced import (create_balanced_link,
13                                                BalancedLink)
14
15
16__all__ = ['Address',
17           'AddressSize',
18           'Connection',
19           'Direction',
20           'PollClass2Cb',
21           'create_master_link',
22           'create_slave_link',
23           'MasterLink',
24           'SlaveLink',
25           'create_balanced_link',
26           'BalancedLink']
Address = <class 'int'>
class AddressSize(enum.Enum):
19class AddressSize(enum.Enum):
20    ZERO = 0
21    ONE = 1
22    TWO = 2
ZERO = <AddressSize.ZERO: 0>
ONE = <AddressSize.ONE: 1>
TWO = <AddressSize.TWO: 2>
class Connection(hat.aio.group.Resource):
82class Connection(aio.Resource):
83
84    @property
85    @abc.abstractmethod
86    def address(self) -> Address:
87        pass
88
89    @abc.abstractmethod
90    async def send(self,
91                   data: util.Bytes,
92                   sent_cb: aio.AsyncCallable[[], None] | None = None):
93        pass
94
95    @abc.abstractmethod
96    async def receive(self) -> util.Bytes:
97        pass

Resource with lifetime control based on Group.

address: int
84    @property
85    @abc.abstractmethod
86    def address(self) -> Address:
87        pass
@abc.abstractmethod
async def send( self, data: bytes | bytearray | memoryview, sent_cb: Optional[Callable[[], None | Awaitable[None]]] = None):
89    @abc.abstractmethod
90    async def send(self,
91                   data: util.Bytes,
92                   sent_cb: aio.AsyncCallable[[], None] | None = None):
93        pass
@abc.abstractmethod
async def receive(self) -> bytes | bytearray | memoryview:
95    @abc.abstractmethod
96    async def receive(self) -> util.Bytes:
97        pass
class Direction(enum.Enum):
14class Direction(enum.Enum):
15    B_TO_A = 0
16    A_TO_B = 1
B_TO_A = <Direction.B_TO_A: 0>
A_TO_B = <Direction.A_TO_B: 1>
PollClass2Cb = typing.Callable[[Connection], bytes | bytearray | memoryview | None | collections.abc.Awaitable[bytes | bytearray | memoryview | None]]