hat.drivers.serial
Asyncio serial communication driver
1"""Asyncio serial communication driver""" 2 3from hat.drivers.serial.common import (ByteSize, 4 Parity, 5 StopBits, 6 EndpointInfo, 7 Endpoint) 8 9from hat.drivers.serial import py_serial 10 11try: 12 from hat.drivers.serial import native_serial 13 14except ImportError: 15 native_serial = None 16 17 18__all__ = ['ByteSize', 19 'Parity', 20 'StopBits', 21 'EndpointInfo', 22 'Endpoint', 23 'create', 24 'py_serial', 25 'native_serial'] 26 27 28async def create(port: str, 29 *, 30 name: str | None = None, 31 baudrate: int = 9600, 32 bytesize: ByteSize = ByteSize.EIGHTBITS, 33 parity: Parity = Parity.NONE, 34 stopbits: StopBits = StopBits.ONE, 35 xonxoff: bool = False, 36 rtscts: bool = False, 37 dsrdtr: bool = False, 38 silent_interval: float = 0 39 ) -> Endpoint: 40 """Open serial port 41 42 Args: 43 port: port name dependent of operating system 44 (e.g. `/dev/ttyUSB0`, `COM3`, ...) 45 name: endpoint name 46 baudrate: baud rate 47 bytesize: number of data bits 48 parity: parity checking 49 stopbits: number of stop bits 50 xonxoff: enable software flow control 51 rtscts: enable hardware RTS/CTS flow control 52 dsrdtr: enable hardware DSR/DTR flow control 53 silent_interval: minimum time in seconds between writing two 54 consecutive messages 55 56 """ 57 impl = native_serial or py_serial 58 return await impl.create(port=port, 59 name=name, 60 baudrate=baudrate, 61 bytesize=bytesize, 62 parity=parity, 63 stopbits=stopbits, 64 xonxoff=xonxoff, 65 rtscts=rtscts, 66 dsrdtr=dsrdtr, 67 silent_interval=silent_interval)
class
ByteSize(enum.Enum):
FIVEBITS =
<ByteSize.FIVEBITS: 5>
SIXBITS =
<ByteSize.SIXBITS: 6>
SEVENBITS =
<ByteSize.SEVENBITS: 7>
EIGHTBITS =
<ByteSize.EIGHTBITS: 8>
class
Parity(enum.Enum):
NONE =
<Parity.NONE: 'N'>
EVEN =
<Parity.EVEN: 'E'>
ODD =
<Parity.ODD: 'O'>
MARK =
<Parity.MARK: 'M'>
SPACE =
<Parity.SPACE: 'S'>
class
StopBits(enum.Enum):
ONE =
<StopBits.ONE: 1>
ONE_POINT_FIVE =
<StopBits.ONE_POINT_FIVE: 1.5>
TWO =
<StopBits.TWO: 2>
class
EndpointInfo(typing.NamedTuple):
EndpointInfo(name, port)
class
Endpoint(hat.aio.group.Resource):
41class Endpoint(aio.Resource): 42 """Serial endpoint""" 43 44 @property 45 @abc.abstractmethod 46 def info(self) -> EndpointInfo: 47 """Endpoint info""" 48 49 @abc.abstractmethod 50 async def read(self, size: int) -> util.Bytes: 51 """Read 52 53 Args: 54 size: number of bytes to read 55 56 Raises: 57 ConnectionError 58 59 """ 60 61 @abc.abstractmethod 62 async def write(self, data: util.Bytes): 63 """Write 64 65 Raises: 66 ConnectionError 67 68 """ 69 70 @abc.abstractmethod 71 async def drain(self): 72 """Drain output buffer 73 74 Raises: 75 ConnectionError 76 77 """ 78 79 @abc.abstractmethod 80 async def clear_input_buffer(self) -> int: 81 """Reset input buffer 82 83 Returns number of bytes available in buffer immediately before 84 buffer was cleared. 85 86 Raises: 87 ConnectionError 88 89 """
Serial endpoint
@abc.abstractmethod
async def
read(self, size: int) -> bytes | bytearray | memoryview:
49 @abc.abstractmethod 50 async def read(self, size: int) -> util.Bytes: 51 """Read 52 53 Args: 54 size: number of bytes to read 55 56 Raises: 57 ConnectionError 58 59 """
Read
Arguments:
- size: number of bytes to read
Raises:
- ConnectionError
@abc.abstractmethod
async def
write(self, data: bytes | bytearray | memoryview):
61 @abc.abstractmethod 62 async def write(self, data: util.Bytes): 63 """Write 64 65 Raises: 66 ConnectionError 67 68 """
Write
Raises:
- ConnectionError
@abc.abstractmethod
async def
drain(self):
70 @abc.abstractmethod 71 async def drain(self): 72 """Drain output buffer 73 74 Raises: 75 ConnectionError 76 77 """
Drain output buffer
Raises:
- ConnectionError
@abc.abstractmethod
async def
clear_input_buffer(self) -> int:
79 @abc.abstractmethod 80 async def clear_input_buffer(self) -> int: 81 """Reset input buffer 82 83 Returns number of bytes available in buffer immediately before 84 buffer was cleared. 85 86 Raises: 87 ConnectionError 88 89 """
Reset input buffer
Returns number of bytes available in buffer immediately before buffer was cleared.
Raises:
- ConnectionError
async def
create( port: str, *, name: str | None = None, baudrate: int = 9600, bytesize: ByteSize = <ByteSize.EIGHTBITS: 8>, parity: Parity = <Parity.NONE: 'N'>, stopbits: StopBits = <StopBits.ONE: 1>, xonxoff: bool = False, rtscts: bool = False, dsrdtr: bool = False, silent_interval: float = 0) -> Endpoint:
29async def create(port: str, 30 *, 31 name: str | None = None, 32 baudrate: int = 9600, 33 bytesize: ByteSize = ByteSize.EIGHTBITS, 34 parity: Parity = Parity.NONE, 35 stopbits: StopBits = StopBits.ONE, 36 xonxoff: bool = False, 37 rtscts: bool = False, 38 dsrdtr: bool = False, 39 silent_interval: float = 0 40 ) -> Endpoint: 41 """Open serial port 42 43 Args: 44 port: port name dependent of operating system 45 (e.g. `/dev/ttyUSB0`, `COM3`, ...) 46 name: endpoint name 47 baudrate: baud rate 48 bytesize: number of data bits 49 parity: parity checking 50 stopbits: number of stop bits 51 xonxoff: enable software flow control 52 rtscts: enable hardware RTS/CTS flow control 53 dsrdtr: enable hardware DSR/DTR flow control 54 silent_interval: minimum time in seconds between writing two 55 consecutive messages 56 57 """ 58 impl = native_serial or py_serial 59 return await impl.create(port=port, 60 name=name, 61 baudrate=baudrate, 62 bytesize=bytesize, 63 parity=parity, 64 stopbits=stopbits, 65 xonxoff=xonxoff, 66 rtscts=rtscts, 67 dsrdtr=dsrdtr, 68 silent_interval=silent_interval)
Open serial port
Arguments:
- port: port name dependent of operating system
(e.g.
/dev/ttyUSB0,COM3, ...) - name: endpoint name
- baudrate: baud rate
- bytesize: number of data bits
- parity: parity checking
- stopbits: number of stop bits
- xonxoff: enable software flow control
- rtscts: enable hardware RTS/CTS flow control
- dsrdtr: enable hardware DSR/DTR flow control
- silent_interval: minimum time in seconds between writing two consecutive messages