hat.drivers.serial

Asyncio serial communication driver

 1"""Asyncio serial communication driver"""
 2
 3from hat.drivers.serial.common import (ByteSize,
 4                                       Parity,
 5                                       StopBits,
 6                                       EndpointInfo,
 7                                       Endpoint)
 8
 9from hat.drivers.serial import py_serial
10
11try:
12    from hat.drivers.serial import native_serial
13
14except ImportError:
15    native_serial = None
16
17
18__all__ = ['ByteSize',
19           'Parity',
20           'StopBits',
21           'EndpointInfo',
22           'Endpoint',
23           'create',
24           'py_serial',
25           'native_serial']
26
27
28async def create(port: str,
29                 *,
30                 name: str | None = None,
31                 baudrate: int = 9600,
32                 bytesize: ByteSize = ByteSize.EIGHTBITS,
33                 parity: Parity = Parity.NONE,
34                 stopbits: StopBits = StopBits.ONE,
35                 xonxoff: bool = False,
36                 rtscts: bool = False,
37                 dsrdtr: bool = False,
38                 silent_interval: float = 0
39                 ) -> Endpoint:
40    """Open serial port
41
42    Args:
43        port: port name dependent of operating system
44            (e.g. `/dev/ttyUSB0`, `COM3`, ...)
45        name: endpoint name
46        baudrate: baud rate
47        bytesize: number of data bits
48        parity: parity checking
49        stopbits: number of stop bits
50        xonxoff: enable software flow control
51        rtscts: enable hardware RTS/CTS flow control
52        dsrdtr: enable hardware DSR/DTR flow control
53        silent_interval: minimum time in seconds between writing two
54            consecutive messages
55
56    """
57    impl = native_serial or py_serial
58    return await impl.create(port=port,
59                             name=name,
60                             baudrate=baudrate,
61                             bytesize=bytesize,
62                             parity=parity,
63                             stopbits=stopbits,
64                             xonxoff=xonxoff,
65                             rtscts=rtscts,
66                             dsrdtr=dsrdtr,
67                             silent_interval=silent_interval)
class ByteSize(enum.Enum):
15class ByteSize(enum.Enum):
16    FIVEBITS = 5
17    SIXBITS = 6
18    SEVENBITS = 7
19    EIGHTBITS = 8
FIVEBITS = <ByteSize.FIVEBITS: 5>
SIXBITS = <ByteSize.SIXBITS: 6>
SEVENBITS = <ByteSize.SEVENBITS: 7>
EIGHTBITS = <ByteSize.EIGHTBITS: 8>
class Parity(enum.Enum):
22class Parity(enum.Enum):
23    NONE = 'N'
24    EVEN = 'E'
25    ODD = 'O'
26    MARK = 'M'
27    SPACE = 'S'
NONE = <Parity.NONE: 'N'>
EVEN = <Parity.EVEN: 'E'>
ODD = <Parity.ODD: 'O'>
MARK = <Parity.MARK: 'M'>
SPACE = <Parity.SPACE: 'S'>
class StopBits(enum.Enum):
30class StopBits(enum.Enum):
31    ONE = 1
32    ONE_POINT_FIVE = 1.5
33    TWO = 2
ONE = <StopBits.ONE: 1>
ONE_POINT_FIVE = <StopBits.ONE_POINT_FIVE: 1.5>
TWO = <StopBits.TWO: 2>
class EndpointInfo(typing.NamedTuple):
36class EndpointInfo(typing.NamedTuple):
37    name: str | None
38    port: str

EndpointInfo(name, port)

EndpointInfo(name: str | None, port: str)

Create new instance of EndpointInfo(name, port)

name: str | None

Alias for field number 0

port: str

Alias for field number 1

class Endpoint(hat.aio.group.Resource):
41class Endpoint(aio.Resource):
42    """Serial endpoint"""
43
44    @property
45    @abc.abstractmethod
46    def info(self) -> EndpointInfo:
47        """Endpoint info"""
48
49    @abc.abstractmethod
50    async def read(self, size: int) -> util.Bytes:
51        """Read
52
53        Args:
54            size: number of bytes to read
55
56        Raises:
57            ConnectionError
58
59        """
60
61    @abc.abstractmethod
62    async def write(self, data: util.Bytes):
63        """Write
64
65        Raises:
66            ConnectionError
67
68        """
69
70    @abc.abstractmethod
71    async def drain(self):
72        """Drain output buffer
73
74        Raises:
75            ConnectionError
76
77        """
78
79    @abc.abstractmethod
80    async def clear_input_buffer(self) -> int:
81        """Reset input buffer
82
83        Returns number of bytes available in buffer immediately before
84        buffer was cleared.
85
86        Raises:
87            ConnectionError
88
89        """

Serial endpoint

info: EndpointInfo
44    @property
45    @abc.abstractmethod
46    def info(self) -> EndpointInfo:
47        """Endpoint info"""

Endpoint info

@abc.abstractmethod
async def read(self, size: int) -> bytes | bytearray | memoryview:
49    @abc.abstractmethod
50    async def read(self, size: int) -> util.Bytes:
51        """Read
52
53        Args:
54            size: number of bytes to read
55
56        Raises:
57            ConnectionError
58
59        """

Read

Arguments:
  • size: number of bytes to read
Raises:
  • ConnectionError
@abc.abstractmethod
async def write(self, data: bytes | bytearray | memoryview):
61    @abc.abstractmethod
62    async def write(self, data: util.Bytes):
63        """Write
64
65        Raises:
66            ConnectionError
67
68        """

Write

Raises:
  • ConnectionError
@abc.abstractmethod
async def drain(self):
70    @abc.abstractmethod
71    async def drain(self):
72        """Drain output buffer
73
74        Raises:
75            ConnectionError
76
77        """

Drain output buffer

Raises:
  • ConnectionError
@abc.abstractmethod
async def clear_input_buffer(self) -> int:
79    @abc.abstractmethod
80    async def clear_input_buffer(self) -> int:
81        """Reset input buffer
82
83        Returns number of bytes available in buffer immediately before
84        buffer was cleared.
85
86        Raises:
87            ConnectionError
88
89        """

Reset input buffer

Returns number of bytes available in buffer immediately before buffer was cleared.

Raises:
  • ConnectionError
async def create( port: str, *, name: str | None = None, baudrate: int = 9600, bytesize: ByteSize = <ByteSize.EIGHTBITS: 8>, parity: Parity = <Parity.NONE: 'N'>, stopbits: StopBits = <StopBits.ONE: 1>, xonxoff: bool = False, rtscts: bool = False, dsrdtr: bool = False, silent_interval: float = 0) -> Endpoint:
29async def create(port: str,
30                 *,
31                 name: str | None = None,
32                 baudrate: int = 9600,
33                 bytesize: ByteSize = ByteSize.EIGHTBITS,
34                 parity: Parity = Parity.NONE,
35                 stopbits: StopBits = StopBits.ONE,
36                 xonxoff: bool = False,
37                 rtscts: bool = False,
38                 dsrdtr: bool = False,
39                 silent_interval: float = 0
40                 ) -> Endpoint:
41    """Open serial port
42
43    Args:
44        port: port name dependent of operating system
45            (e.g. `/dev/ttyUSB0`, `COM3`, ...)
46        name: endpoint name
47        baudrate: baud rate
48        bytesize: number of data bits
49        parity: parity checking
50        stopbits: number of stop bits
51        xonxoff: enable software flow control
52        rtscts: enable hardware RTS/CTS flow control
53        dsrdtr: enable hardware DSR/DTR flow control
54        silent_interval: minimum time in seconds between writing two
55            consecutive messages
56
57    """
58    impl = native_serial or py_serial
59    return await impl.create(port=port,
60                             name=name,
61                             baudrate=baudrate,
62                             bytesize=bytesize,
63                             parity=parity,
64                             stopbits=stopbits,
65                             xonxoff=xonxoff,
66                             rtscts=rtscts,
67                             dsrdtr=dsrdtr,
68                             silent_interval=silent_interval)

Open serial port

Arguments:
  • port: port name dependent of operating system (e.g. /dev/ttyUSB0, COM3, ...)
  • name: endpoint name
  • baudrate: baud rate
  • bytesize: number of data bits
  • parity: parity checking
  • stopbits: number of stop bits
  • xonxoff: enable software flow control
  • rtscts: enable hardware RTS/CTS flow control
  • dsrdtr: enable hardware DSR/DTR flow control
  • silent_interval: minimum time in seconds between writing two consecutive messages