hat.drivers.serial.py_serial

Implementation based on PySerial

implementation is based on read with timeout for periodical checking if connection is closed by user - better way of canceling active read operation is needed

  1"""Implementation based on PySerial
  2
  3.. warning::
  4
  5    implementation is based on read with timeout for periodical checking
  6    if connection is closed by user - better way of canceling active read
  7    operation is needed
  8
  9"""
 10
 11import asyncio
 12import logging
 13
 14import serial
 15
 16from hat import aio
 17
 18from hat.drivers.serial import common
 19
 20
 21mlog: logging.Logger = logging.getLogger(__name__)
 22"""Module logger"""
 23
 24read_timeout: float = 0.5
 25"""Read timeout"""
 26
 27
 28async def create(port, *,
 29                 baudrate=9600,
 30                 bytesize=common.ByteSize.EIGHTBITS,
 31                 parity=common.Parity.NONE,
 32                 stopbits=common.StopBits.ONE,
 33                 xonxoff=False,
 34                 rtscts=False,
 35                 dsrdtr=False,
 36                 silent_interval=0):
 37    endpoint = Endpoint()
 38    endpoint._port = port
 39    endpoint._silent_interval = silent_interval
 40    endpoint._input_buffer = bytearray()
 41    endpoint._input_cv = asyncio.Condition()
 42    endpoint._write_queue = aio.Queue()
 43    endpoint._executor = aio.Executor(log_exceptions=False)
 44
 45    endpoint._serial = await endpoint._executor.spawn(
 46        serial.Serial,
 47        port=port,
 48        baudrate=baudrate,
 49        bytesize=bytesize.value,
 50        parity=parity.value,
 51        stopbits=stopbits.value,
 52        xonxoff=xonxoff,
 53        rtscts=rtscts,
 54        dsrdtr=dsrdtr,
 55        timeout=read_timeout)
 56
 57    endpoint._async_group = aio.Group()
 58    endpoint._async_group.spawn(aio.call_on_cancel, endpoint._on_close)
 59    endpoint._async_group.spawn(endpoint._read_loop)
 60    endpoint._async_group.spawn(endpoint._write_loop)
 61
 62    return endpoint
 63
 64
 65class Endpoint(common.Endpoint):
 66
 67    @property
 68    def async_group(self):
 69        return self._async_group
 70
 71    @property
 72    def port(self):
 73        return self._port
 74
 75    async def read(self, size):
 76        async with self._input_cv:
 77            while len(self._input_buffer) < size:
 78                if not self.is_open:
 79                    raise ConnectionError()
 80                await self._input_cv.wait()
 81
 82            if size < 1:
 83                return b''
 84
 85            buffer = memoryview(self._input_buffer)
 86            data, self._input_buffer = buffer[:size], bytearray(buffer[size:])
 87            return data
 88
 89    async def write(self, data):
 90        future = asyncio.Future()
 91        try:
 92            self._write_queue.put_nowait((data, future))
 93            await future
 94
 95        except aio.QueueClosedError:
 96            raise ConnectionError()
 97
 98    async def drain(self):
 99        future = asyncio.Future()
100        try:
101            self._write_queue.put_nowait((None, future))
102            await future
103
104        except aio.QueueClosedError:
105            raise ConnectionError()
106
107    async def reset_input_buffer(self):
108        async with self._input_cv:
109            count = len(self._input_buffer)
110            self._input_buffer = bytearray()
111            return count
112
113    async def _on_close(self):
114        await self._executor.spawn(self._ext_close)
115
116        async with self._input_cv:
117            self._input_cv.notify_all()
118
119        await self._executor.async_close()
120
121    async def _read_loop(self):
122        try:
123            while True:
124                data_head = await self._executor.spawn(self._ext_read, 1)
125                if not data_head:
126                    continue
127
128                data_rest = await self._executor.spawn(self._ext_read, -1)
129
130                async with self._input_cv:
131                    self._input_buffer.extend(data_head)
132                    if data_rest:
133                        self._input_buffer.extend(data_rest)
134
135                    self._input_cv.notify_all()
136
137        except Exception as e:
138            mlog.warning('read loop error: %s', e, exc_info=e)
139
140        finally:
141            self.close()
142
143    async def _write_loop(self):
144        future = None
145        try:
146            while True:
147                data, future = await self._write_queue.get()
148
149                if data is not None:
150                    await self._executor.spawn(self._ext_write, data)
151
152                else:
153                    await self._executor.spawn(self._ext_flush)
154
155                if not future.done():
156                    future.set_result(None)
157
158                await asyncio.sleep(self._silent_interval)
159
160        except Exception as e:
161            mlog.warning('write loop error: %s', e, exc_info=e)
162
163        finally:
164            self.close()
165            self._write_queue.close()
166
167            while True:
168                if future and not future.done():
169                    future.set_exception(ConnectionError())
170                if self._write_queue.empty():
171                    break
172                _, future = self._write_queue.get_nowait()
173
174    def _ext_close(self):
175        self._serial.close()
176
177    def _ext_read(self, n=-1):
178        if n < 0:
179            n = self._serial.in_waiting
180
181        if n < 1:
182            return b''
183
184        return self._serial.read(n)
185
186    def _ext_write(self, data):
187        self._serial.write(data)
188
189    def _ext_flush(self):
190        self._serial.flush()
mlog: logging.Logger = <Logger hat.drivers.serial.py_serial (WARNING)>

Module logger

read_timeout: float = 0.5

Read timeout

async def create( port, *, baudrate=9600, bytesize=<ByteSize.EIGHTBITS: 8>, parity=<Parity.NONE: 'N'>, stopbits=<StopBits.ONE: 1>, xonxoff=False, rtscts=False, dsrdtr=False, silent_interval=0):
29async def create(port, *,
30                 baudrate=9600,
31                 bytesize=common.ByteSize.EIGHTBITS,
32                 parity=common.Parity.NONE,
33                 stopbits=common.StopBits.ONE,
34                 xonxoff=False,
35                 rtscts=False,
36                 dsrdtr=False,
37                 silent_interval=0):
38    endpoint = Endpoint()
39    endpoint._port = port
40    endpoint._silent_interval = silent_interval
41    endpoint._input_buffer = bytearray()
42    endpoint._input_cv = asyncio.Condition()
43    endpoint._write_queue = aio.Queue()
44    endpoint._executor = aio.Executor(log_exceptions=False)
45
46    endpoint._serial = await endpoint._executor.spawn(
47        serial.Serial,
48        port=port,
49        baudrate=baudrate,
50        bytesize=bytesize.value,
51        parity=parity.value,
52        stopbits=stopbits.value,
53        xonxoff=xonxoff,
54        rtscts=rtscts,
55        dsrdtr=dsrdtr,
56        timeout=read_timeout)
57
58    endpoint._async_group = aio.Group()
59    endpoint._async_group.spawn(aio.call_on_cancel, endpoint._on_close)
60    endpoint._async_group.spawn(endpoint._read_loop)
61    endpoint._async_group.spawn(endpoint._write_loop)
62
63    return endpoint
class Endpoint(hat.drivers.serial.common.Endpoint):
 66class Endpoint(common.Endpoint):
 67
 68    @property
 69    def async_group(self):
 70        return self._async_group
 71
 72    @property
 73    def port(self):
 74        return self._port
 75
 76    async def read(self, size):
 77        async with self._input_cv:
 78            while len(self._input_buffer) < size:
 79                if not self.is_open:
 80                    raise ConnectionError()
 81                await self._input_cv.wait()
 82
 83            if size < 1:
 84                return b''
 85
 86            buffer = memoryview(self._input_buffer)
 87            data, self._input_buffer = buffer[:size], bytearray(buffer[size:])
 88            return data
 89
 90    async def write(self, data):
 91        future = asyncio.Future()
 92        try:
 93            self._write_queue.put_nowait((data, future))
 94            await future
 95
 96        except aio.QueueClosedError:
 97            raise ConnectionError()
 98
 99    async def drain(self):
100        future = asyncio.Future()
101        try:
102            self._write_queue.put_nowait((None, future))
103            await future
104
105        except aio.QueueClosedError:
106            raise ConnectionError()
107
108    async def reset_input_buffer(self):
109        async with self._input_cv:
110            count = len(self._input_buffer)
111            self._input_buffer = bytearray()
112            return count
113
114    async def _on_close(self):
115        await self._executor.spawn(self._ext_close)
116
117        async with self._input_cv:
118            self._input_cv.notify_all()
119
120        await self._executor.async_close()
121
122    async def _read_loop(self):
123        try:
124            while True:
125                data_head = await self._executor.spawn(self._ext_read, 1)
126                if not data_head:
127                    continue
128
129                data_rest = await self._executor.spawn(self._ext_read, -1)
130
131                async with self._input_cv:
132                    self._input_buffer.extend(data_head)
133                    if data_rest:
134                        self._input_buffer.extend(data_rest)
135
136                    self._input_cv.notify_all()
137
138        except Exception as e:
139            mlog.warning('read loop error: %s', e, exc_info=e)
140
141        finally:
142            self.close()
143
144    async def _write_loop(self):
145        future = None
146        try:
147            while True:
148                data, future = await self._write_queue.get()
149
150                if data is not None:
151                    await self._executor.spawn(self._ext_write, data)
152
153                else:
154                    await self._executor.spawn(self._ext_flush)
155
156                if not future.done():
157                    future.set_result(None)
158
159                await asyncio.sleep(self._silent_interval)
160
161        except Exception as e:
162            mlog.warning('write loop error: %s', e, exc_info=e)
163
164        finally:
165            self.close()
166            self._write_queue.close()
167
168            while True:
169                if future and not future.done():
170                    future.set_exception(ConnectionError())
171                if self._write_queue.empty():
172                    break
173                _, future = self._write_queue.get_nowait()
174
175    def _ext_close(self):
176        self._serial.close()
177
178    def _ext_read(self, n=-1):
179        if n < 0:
180            n = self._serial.in_waiting
181
182        if n < 1:
183            return b''
184
185        return self._serial.read(n)
186
187    def _ext_write(self, data):
188        self._serial.write(data)
189
190    def _ext_flush(self):
191        self._serial.flush()

Serial endpoint

async_group
68    @property
69    def async_group(self):
70        return self._async_group

Group controlling resource's lifetime.

port
72    @property
73    def port(self):
74        return self._port

Port name

async def read(self, size):
76    async def read(self, size):
77        async with self._input_cv:
78            while len(self._input_buffer) < size:
79                if not self.is_open:
80                    raise ConnectionError()
81                await self._input_cv.wait()
82
83            if size < 1:
84                return b''
85
86            buffer = memoryview(self._input_buffer)
87            data, self._input_buffer = buffer[:size], bytearray(buffer[size:])
88            return data

Read

Arguments:
  • size: number of bytes to read
Raises:
  • ConnectionError
async def write(self, data):
90    async def write(self, data):
91        future = asyncio.Future()
92        try:
93            self._write_queue.put_nowait((data, future))
94            await future
95
96        except aio.QueueClosedError:
97            raise ConnectionError()

Write

Raises:
  • ConnectionError
async def drain(self):
 99    async def drain(self):
100        future = asyncio.Future()
101        try:
102            self._write_queue.put_nowait((None, future))
103            await future
104
105        except aio.QueueClosedError:
106            raise ConnectionError()

Drain output buffer

Raises:
  • ConnectionError
async def reset_input_buffer(self):
108    async def reset_input_buffer(self):
109        async with self._input_cv:
110            count = len(self._input_buffer)
111            self._input_buffer = bytearray()
112            return count

Reset input buffer

Returns number of bytes available in buffer immediately before buffer was cleared.

Raises:
  • ConnectionError
Inherited Members
hat.aio.group.Resource
is_open
is_closing
is_closed
wait_closing
wait_closed
close
async_close