hat.drivers.iec60870.link

IEC 60870-5 link layer

 1"""IEC 60870-5 link layer"""
 2
 3from hat.drivers.iec60870.link.common import (Address,
 4                                              AddressSize,
 5                                              Direction,
 6                                              ConnectionInfo,
 7                                              Connection)
 8from hat.drivers.iec60870.link.unbalanced import (PollClass2Cb,
 9                                                  create_master_link,
10                                                  create_slave_link,
11                                                  MasterLink,
12                                                  SlaveLink)
13from hat.drivers.iec60870.link.balanced import (create_balanced_link,
14                                                BalancedLink)
15
16
17__all__ = ['Address',
18           'AddressSize',
19           'Direction',
20           'ConnectionInfo',
21           'Connection',
22           'PollClass2Cb',
23           'create_master_link',
24           'create_slave_link',
25           'MasterLink',
26           'SlaveLink',
27           'create_balanced_link',
28           'BalancedLink']
Address = <class 'int'>
class AddressSize(enum.Enum):
16class AddressSize(enum.Enum):
17    ZERO = 0
18    ONE = 1
19    TWO = 2
ZERO = <AddressSize.ZERO: 0>
ONE = <AddressSize.ONE: 1>
TWO = <AddressSize.TWO: 2>
class Direction(enum.Enum):
22class Direction(enum.Enum):
23    B_TO_A = 0
24    A_TO_B = 1
B_TO_A = <Direction.B_TO_A: 0>
A_TO_B = <Direction.A_TO_B: 1>
class ConnectionInfo(typing.NamedTuple):
74class ConnectionInfo(typing.NamedTuple):
75    name: str | None
76    port: str
77    address: Address

ConnectionInfo(name, port, address)

ConnectionInfo(name: str | None, port: str, address: int)

Create new instance of ConnectionInfo(name, port, address)

name: str | None

Alias for field number 0

port: str

Alias for field number 1

address: int

Alias for field number 2

class Connection(hat.aio.group.Resource):
80class Connection(aio.Resource):
81
82    @property
83    @abc.abstractmethod
84    def info(self) -> ConnectionInfo:
85        pass
86
87    @abc.abstractmethod
88    async def send(self,
89                   data: util.Bytes,
90                   sent_cb: aio.AsyncCallable[[], None] | None = None):
91        pass
92
93    @abc.abstractmethod
94    async def receive(self) -> util.Bytes:
95        pass

Resource with lifetime control based on Group.

info: ConnectionInfo
82    @property
83    @abc.abstractmethod
84    def info(self) -> ConnectionInfo:
85        pass
@abc.abstractmethod
async def send( self, data: bytes | bytearray | memoryview, sent_cb: Optional[Callable[[], None | Awaitable[None]]] = None):
87    @abc.abstractmethod
88    async def send(self,
89                   data: util.Bytes,
90                   sent_cb: aio.AsyncCallable[[], None] | None = None):
91        pass
@abc.abstractmethod
async def receive(self) -> bytes | bytearray | memoryview:
93    @abc.abstractmethod
94    async def receive(self) -> util.Bytes:
95        pass
PollClass2Cb = typing.Callable[[Connection], bytes | bytearray | memoryview | None | collections.abc.Awaitable[bytes | bytearray | memoryview | None]]