hat.drivers.tpkt - Transport Service on top of TCP¶
Implementation of TPKT based on asyncio.
Data = typing.Union[bytes, bytearray, memoryview]
class Address(typing.NamedTuple):
host: str
port: int = 102
class ConnectionInfo(typing.NamedTuple):
local_addr: Address
remote_addr: Address
ConnectionCb = aio.AsyncCallable[['Connection'], None]
async def connect(addr: Address) -> 'Connection': ...
async def listen(connection_cb: ConnectionCb,
addr: Address = Address('0.0.0.0', 102)
) -> 'Server': ...
class Server(aio.Resource):
@property
def async_group(self) -> aio.Group: ...
@property
def addresses(self) -> typing.List[Address]: ...
class Connection(aio.Resource):
@property
def async_group(self) -> aio.Group: ...
@property
def info(self) -> ConnectionInfo: ...
async def read(self) -> Data: ...
def write(self, data: Data): ...
Example usage:
conn1_future = asyncio.Future()
srv = await tpkt.listen(conn1_future.set_result, addr)
conn2 = await tpkt.connect(addr)
conn1 = await conn1_future
write_data = b'12345'
conn1.write(write_data)
read_data = await conn2.read()
assert write_data == read_data
await conn1.async_close()
await conn2.async_close()
await srv.async_close()
API¶
API reference is available as part of generated documentation: