hat.drivers.serial

Asyncio serial communication driver

 1"""Asyncio serial communication driver"""
 2
 3from hat.drivers.serial.common import (ByteSize,
 4                                       Parity,
 5                                       StopBits,
 6                                       Endpoint)
 7
 8from hat.drivers.serial import py_serial
 9
10try:
11    from hat.drivers.serial import native_serial
12
13except ImportError:
14    native_serial = None
15
16
17__all__ = ['ByteSize',
18           'Parity',
19           'StopBits',
20           'Endpoint',
21           'create',
22           'py_serial',
23           'native_serial']
24
25
26async def create(port: str, *,
27                 baudrate: int = 9600,
28                 bytesize: ByteSize = ByteSize.EIGHTBITS,
29                 parity: Parity = Parity.NONE,
30                 stopbits: StopBits = StopBits.ONE,
31                 xonxoff: bool = False,
32                 rtscts: bool = False,
33                 dsrdtr: bool = False,
34                 silent_interval: float = 0
35                 ) -> Endpoint:
36    """Open serial port
37
38    Args:
39        port: port name dependent of operating system
40            (e.g. `/dev/ttyUSB0`, `COM3`, ...)
41        baudrate: baud rate
42        bytesize: number of data bits
43        parity: parity checking
44        stopbits: number of stop bits
45        xonxoff: enable software flow control
46        rtscts: enable hardware RTS/CTS flow control
47        dsrdtr: enable hardware DSR/DTR flow control
48        silent_interval: minimum time in seconds between writing two
49            consecutive messages
50
51    """
52    impl = native_serial or py_serial
53    return await impl.create(port=port,
54                             baudrate=baudrate,
55                             bytesize=bytesize,
56                             parity=parity,
57                             stopbits=stopbits,
58                             xonxoff=xonxoff,
59                             rtscts=rtscts,
60                             dsrdtr=dsrdtr,
61                             silent_interval=silent_interval)
class ByteSize(enum.Enum):
 9class ByteSize(enum.Enum):
10    FIVEBITS = 5
11    SIXBITS = 6
12    SEVENBITS = 7
13    EIGHTBITS = 8

An enumeration.

FIVEBITS = <ByteSize.FIVEBITS: 5>
SIXBITS = <ByteSize.SIXBITS: 6>
SEVENBITS = <ByteSize.SEVENBITS: 7>
EIGHTBITS = <ByteSize.EIGHTBITS: 8>
Inherited Members
enum.Enum
name
value
class Parity(enum.Enum):
16class Parity(enum.Enum):
17    NONE = 'N'
18    EVEN = 'E'
19    ODD = 'O'
20    MARK = 'M'
21    SPACE = 'S'

An enumeration.

NONE = <Parity.NONE: 'N'>
EVEN = <Parity.EVEN: 'E'>
ODD = <Parity.ODD: 'O'>
MARK = <Parity.MARK: 'M'>
SPACE = <Parity.SPACE: 'S'>
Inherited Members
enum.Enum
name
value
class StopBits(enum.Enum):
24class StopBits(enum.Enum):
25    ONE = 1
26    ONE_POINT_FIVE = 1.5
27    TWO = 2

An enumeration.

ONE = <StopBits.ONE: 1>
ONE_POINT_FIVE = <StopBits.ONE_POINT_FIVE: 1.5>
TWO = <StopBits.TWO: 2>
Inherited Members
enum.Enum
name
value
class Endpoint(hat.aio.group.Resource):
30class Endpoint(aio.Resource):
31    """Serial endpoint"""
32
33    @property
34    @abc.abstractmethod
35    def port(self) -> str:
36        """Port name"""
37
38    @abc.abstractmethod
39    async def read(self, size: int) -> util.Bytes:
40        """Read
41
42        Args:
43            size: number of bytes to read
44
45        Raises:
46            ConnectionError
47
48        """
49
50    @abc.abstractmethod
51    async def write(self, data: util.Bytes):
52        """Write
53
54        Raises:
55            ConnectionError
56
57        """
58
59    @abc.abstractmethod
60    async def drain(self):
61        """Drain output buffer
62
63        Raises:
64            ConnectionError
65
66        """
67
68    @abc.abstractmethod
69    async def reset_input_buffer(self) -> int:
70        """Reset input buffer
71
72        Returns number of bytes available in buffer immediately before
73        buffer was cleared.
74
75        Raises:
76            ConnectionError
77
78        """

Serial endpoint

port: str
33    @property
34    @abc.abstractmethod
35    def port(self) -> str:
36        """Port name"""

Port name

@abc.abstractmethod
async def read(self, size: int) -> bytes | bytearray | memoryview:
38    @abc.abstractmethod
39    async def read(self, size: int) -> util.Bytes:
40        """Read
41
42        Args:
43            size: number of bytes to read
44
45        Raises:
46            ConnectionError
47
48        """

Read

Arguments:
  • size: number of bytes to read
Raises:
  • ConnectionError
@abc.abstractmethod
async def write(self, data: bytes | bytearray | memoryview):
50    @abc.abstractmethod
51    async def write(self, data: util.Bytes):
52        """Write
53
54        Raises:
55            ConnectionError
56
57        """

Write

Raises:
  • ConnectionError
@abc.abstractmethod
async def drain(self):
59    @abc.abstractmethod
60    async def drain(self):
61        """Drain output buffer
62
63        Raises:
64            ConnectionError
65
66        """

Drain output buffer

Raises:
  • ConnectionError
@abc.abstractmethod
async def reset_input_buffer(self) -> int:
68    @abc.abstractmethod
69    async def reset_input_buffer(self) -> int:
70        """Reset input buffer
71
72        Returns number of bytes available in buffer immediately before
73        buffer was cleared.
74
75        Raises:
76            ConnectionError
77
78        """

Reset input buffer

Returns number of bytes available in buffer immediately before buffer was cleared.

Raises:
  • ConnectionError
Inherited Members
hat.aio.group.Resource
async_group
is_open
is_closing
is_closed
wait_closing
wait_closed
close
async_close
async def create( port: str, *, baudrate: int = 9600, bytesize: ByteSize = <ByteSize.EIGHTBITS: 8>, parity: Parity = <Parity.NONE: 'N'>, stopbits: StopBits = <StopBits.ONE: 1>, xonxoff: bool = False, rtscts: bool = False, dsrdtr: bool = False, silent_interval: float = 0) -> Endpoint:
27async def create(port: str, *,
28                 baudrate: int = 9600,
29                 bytesize: ByteSize = ByteSize.EIGHTBITS,
30                 parity: Parity = Parity.NONE,
31                 stopbits: StopBits = StopBits.ONE,
32                 xonxoff: bool = False,
33                 rtscts: bool = False,
34                 dsrdtr: bool = False,
35                 silent_interval: float = 0
36                 ) -> Endpoint:
37    """Open serial port
38
39    Args:
40        port: port name dependent of operating system
41            (e.g. `/dev/ttyUSB0`, `COM3`, ...)
42        baudrate: baud rate
43        bytesize: number of data bits
44        parity: parity checking
45        stopbits: number of stop bits
46        xonxoff: enable software flow control
47        rtscts: enable hardware RTS/CTS flow control
48        dsrdtr: enable hardware DSR/DTR flow control
49        silent_interval: minimum time in seconds between writing two
50            consecutive messages
51
52    """
53    impl = native_serial or py_serial
54    return await impl.create(port=port,
55                             baudrate=baudrate,
56                             bytesize=bytesize,
57                             parity=parity,
58                             stopbits=stopbits,
59                             xonxoff=xonxoff,
60                             rtscts=rtscts,
61                             dsrdtr=dsrdtr,
62                             silent_interval=silent_interval)

Open serial port

Arguments:
  • port: port name dependent of operating system (e.g. /dev/ttyUSB0, COM3, ...)
  • baudrate: baud rate
  • bytesize: number of data bits
  • parity: parity checking
  • stopbits: number of stop bits
  • xonxoff: enable software flow control
  • rtscts: enable hardware RTS/CTS flow control
  • dsrdtr: enable hardware DSR/DTR flow control
  • silent_interval: minimum time in seconds between writing two consecutive messages