hat.drivers.serial
Asyncio serial communication driver
1"""Asyncio serial communication driver""" 2 3from hat.drivers.serial.common import (ByteSize, 4 Parity, 5 StopBits, 6 Endpoint) 7 8from hat.drivers.serial import py_serial 9 10try: 11 from hat.drivers.serial import native_serial 12 13except ImportError: 14 native_serial = None 15 16 17__all__ = ['ByteSize', 18 'Parity', 19 'StopBits', 20 'Endpoint', 21 'create', 22 'py_serial', 23 'native_serial'] 24 25 26async def create(port: str, *, 27 baudrate: int = 9600, 28 bytesize: ByteSize = ByteSize.EIGHTBITS, 29 parity: Parity = Parity.NONE, 30 stopbits: StopBits = StopBits.ONE, 31 xonxoff: bool = False, 32 rtscts: bool = False, 33 dsrdtr: bool = False, 34 silent_interval: float = 0 35 ) -> Endpoint: 36 """Open serial port 37 38 Args: 39 port: port name dependent of operating system 40 (e.g. `/dev/ttyUSB0`, `COM3`, ...) 41 baudrate: baud rate 42 bytesize: number of data bits 43 parity: parity checking 44 stopbits: number of stop bits 45 xonxoff: enable software flow control 46 rtscts: enable hardware RTS/CTS flow control 47 dsrdtr: enable hardware DSR/DTR flow control 48 silent_interval: minimum time in seconds between writing two 49 consecutive messages 50 51 """ 52 impl = native_serial or py_serial 53 return await impl.create(port=port, 54 baudrate=baudrate, 55 bytesize=bytesize, 56 parity=parity, 57 stopbits=stopbits, 58 xonxoff=xonxoff, 59 rtscts=rtscts, 60 dsrdtr=dsrdtr, 61 silent_interval=silent_interval)
class
ByteSize(enum.Enum):
An enumeration.
FIVEBITS =
<ByteSize.FIVEBITS: 5>
SIXBITS =
<ByteSize.SIXBITS: 6>
SEVENBITS =
<ByteSize.SEVENBITS: 7>
EIGHTBITS =
<ByteSize.EIGHTBITS: 8>
class
Parity(enum.Enum):
An enumeration.
NONE =
<Parity.NONE: 'N'>
EVEN =
<Parity.EVEN: 'E'>
ODD =
<Parity.ODD: 'O'>
MARK =
<Parity.MARK: 'M'>
SPACE =
<Parity.SPACE: 'S'>
class
StopBits(enum.Enum):
An enumeration.
ONE =
<StopBits.ONE: 1>
ONE_POINT_FIVE =
<StopBits.ONE_POINT_FIVE: 1.5>
TWO =
<StopBits.TWO: 2>
class
Endpoint(hat.aio.group.Resource):
30class Endpoint(aio.Resource): 31 """Serial endpoint""" 32 33 @property 34 @abc.abstractmethod 35 def port(self) -> str: 36 """Port name""" 37 38 @abc.abstractmethod 39 async def read(self, size: int) -> util.Bytes: 40 """Read 41 42 Args: 43 size: number of bytes to read 44 45 Raises: 46 ConnectionError 47 48 """ 49 50 @abc.abstractmethod 51 async def write(self, data: util.Bytes): 52 """Write 53 54 Raises: 55 ConnectionError 56 57 """ 58 59 @abc.abstractmethod 60 async def drain(self): 61 """Drain output buffer 62 63 Raises: 64 ConnectionError 65 66 """ 67 68 @abc.abstractmethod 69 async def reset_input_buffer(self) -> int: 70 """Reset input buffer 71 72 Returns number of bytes available in buffer immediately before 73 buffer was cleared. 74 75 Raises: 76 ConnectionError 77 78 """
Serial endpoint
@abc.abstractmethod
async def
read(self, size: int) -> bytes | bytearray | memoryview:
38 @abc.abstractmethod 39 async def read(self, size: int) -> util.Bytes: 40 """Read 41 42 Args: 43 size: number of bytes to read 44 45 Raises: 46 ConnectionError 47 48 """
Read
Arguments:
- size: number of bytes to read
Raises:
- ConnectionError
@abc.abstractmethod
async def
write(self, data: bytes | bytearray | memoryview):
50 @abc.abstractmethod 51 async def write(self, data: util.Bytes): 52 """Write 53 54 Raises: 55 ConnectionError 56 57 """
Write
Raises:
- ConnectionError
@abc.abstractmethod
async def
drain(self):
59 @abc.abstractmethod 60 async def drain(self): 61 """Drain output buffer 62 63 Raises: 64 ConnectionError 65 66 """
Drain output buffer
Raises:
- ConnectionError
@abc.abstractmethod
async def
reset_input_buffer(self) -> int:
68 @abc.abstractmethod 69 async def reset_input_buffer(self) -> int: 70 """Reset input buffer 71 72 Returns number of bytes available in buffer immediately before 73 buffer was cleared. 74 75 Raises: 76 ConnectionError 77 78 """
Reset input buffer
Returns number of bytes available in buffer immediately before buffer was cleared.
Raises:
- ConnectionError
async def
create( port: str, *, baudrate: int = 9600, bytesize: ByteSize = <ByteSize.EIGHTBITS: 8>, parity: Parity = <Parity.NONE: 'N'>, stopbits: StopBits = <StopBits.ONE: 1>, xonxoff: bool = False, rtscts: bool = False, dsrdtr: bool = False, silent_interval: float = 0) -> Endpoint:
27async def create(port: str, *, 28 baudrate: int = 9600, 29 bytesize: ByteSize = ByteSize.EIGHTBITS, 30 parity: Parity = Parity.NONE, 31 stopbits: StopBits = StopBits.ONE, 32 xonxoff: bool = False, 33 rtscts: bool = False, 34 dsrdtr: bool = False, 35 silent_interval: float = 0 36 ) -> Endpoint: 37 """Open serial port 38 39 Args: 40 port: port name dependent of operating system 41 (e.g. `/dev/ttyUSB0`, `COM3`, ...) 42 baudrate: baud rate 43 bytesize: number of data bits 44 parity: parity checking 45 stopbits: number of stop bits 46 xonxoff: enable software flow control 47 rtscts: enable hardware RTS/CTS flow control 48 dsrdtr: enable hardware DSR/DTR flow control 49 silent_interval: minimum time in seconds between writing two 50 consecutive messages 51 52 """ 53 impl = native_serial or py_serial 54 return await impl.create(port=port, 55 baudrate=baudrate, 56 bytesize=bytesize, 57 parity=parity, 58 stopbits=stopbits, 59 xonxoff=xonxoff, 60 rtscts=rtscts, 61 dsrdtr=dsrdtr, 62 silent_interval=silent_interval)
Open serial port
Arguments:
- port: port name dependent of operating system
(e.g.
/dev/ttyUSB0
,COM3
, ...) - baudrate: baud rate
- bytesize: number of data bits
- parity: parity checking
- stopbits: number of stop bits
- xonxoff: enable software flow control
- rtscts: enable hardware RTS/CTS flow control
- dsrdtr: enable hardware DSR/DTR flow control
- silent_interval: minimum time in seconds between writing two consecutive messages