hat.drivers.cotp - Connection oriented transport protocol¶
Asyncio implementation of COTP communication.
Data = tpkt.Data
Address = tpkt.Address
class ConnectionInfo(typing.NamedTuple):
local_addr: Address
local_tsel: typing.Optional[int]
remote_addr: Address
remote_tsel: typing.Optional[int]
ConnectionCb = aio.AsyncCallable[['Connection'], None]
async def connect(addr: Address,
local_tsel: typing.Optional[int] = None,
remote_tsel: typing.Optional[int] = None
) -> 'Connection': ...
async def listen(connection_cb: ConnectionCb,
addr: Address = Address('0.0.0.0', 102)
) -> 'Server': ...
class Server(aio.Resource):
@property
def async_group(self) -> aio.Group: ...
@property
def addresses(self) -> typing.List[Address]: ...
class Connection(aio.Resource):
@property
def async_group(self) -> aio.Group: ...
@property
def info(self) -> ConnectionInfo: ...
async def read(self) -> Data: ...
def write(self, data: bytes): ...
Example usage:
addr = cotp.Address('127.0.0.1', util.get_unused_tcp_port())
conn2_future = asyncio.Future()
srv = await cotp.listen(conn2_future.set_result, addr)
conn1 = await cotp.connect(addr)
conn2 = await conn2_future
# send from conn1 to conn2
data = b'123'
conn1.write(data)
result = await conn2.read()
assert result == data
# send from conn2 to conn1
data = b'321'
conn2.write(data)
result = await conn1.read()
assert result == data
await conn1.async_close()
await conn2.async_close()
await srv.async_close()
API¶
API reference is available as part of generated documentation: