hat.drivers.serial.py_serial

Implementation based on PySerial

implementation is based on read with timeout for periodical checking if connection is closed by user - better way of canceling active read operation is needed

  1"""Implementation based on PySerial
  2
  3.. warning::
  4
  5    implementation is based on read with timeout for periodical checking
  6    if connection is closed by user - better way of canceling active read
  7    operation is needed
  8
  9"""
 10
 11import asyncio
 12import logging
 13
 14import serial
 15
 16from hat import aio
 17
 18from hat.drivers.serial import common
 19
 20
 21mlog: logging.Logger = logging.getLogger(__name__)
 22"""Module logger"""
 23
 24read_timeout: float = 0.5
 25"""Read timeout"""
 26
 27
 28async def create(port,
 29                 *,
 30                 name=None,
 31                 baudrate=9600,
 32                 bytesize=common.ByteSize.EIGHTBITS,
 33                 parity=common.Parity.NONE,
 34                 stopbits=common.StopBits.ONE,
 35                 xonxoff=False,
 36                 rtscts=False,
 37                 dsrdtr=False,
 38                 silent_interval=0):
 39    endpoint = Endpoint()
 40    endpoint._silent_interval = silent_interval
 41    endpoint._input_buffer = bytearray()
 42    endpoint._input_cv = asyncio.Condition()
 43    endpoint._write_queue = aio.Queue()
 44    endpoint._executor = aio.Executor(log_exceptions=False)
 45    endpoint._info = common.EndpointInfo(name=name,
 46                                         port=port)
 47    endpoint._log = common.create_logger(mlog, endpoint._info)
 48    endpoint._comm_log = common.CommunicationLogger(mlog, endpoint._info)
 49
 50    endpoint._serial = await endpoint._executor.spawn(
 51        serial.Serial,
 52        port=port,
 53        baudrate=baudrate,
 54        bytesize=bytesize.value,
 55        parity=parity.value,
 56        stopbits=stopbits.value,
 57        xonxoff=xonxoff,
 58        rtscts=rtscts,
 59        dsrdtr=dsrdtr,
 60        timeout=read_timeout)
 61
 62    endpoint._comm_log.log(common.CommLogAction.OPEN)
 63
 64    endpoint._async_group = aio.Group()
 65    endpoint._async_group.spawn(aio.call_on_cancel, endpoint._on_close)
 66    endpoint._async_group.spawn(endpoint._read_loop)
 67    endpoint._async_group.spawn(endpoint._write_loop)
 68
 69    return endpoint
 70
 71
 72class Endpoint(common.Endpoint):
 73
 74    @property
 75    def async_group(self):
 76        return self._async_group
 77
 78    @property
 79    def info(self):
 80        return self._info
 81
 82    async def read(self, size):
 83        async with self._input_cv:
 84            while len(self._input_buffer) < size:
 85                if not self.is_open:
 86                    raise ConnectionError()
 87                await self._input_cv.wait()
 88
 89            if size < 1:
 90                return b''
 91
 92            buffer = memoryview(self._input_buffer)
 93            data, self._input_buffer = buffer[:size], bytearray(buffer[size:])
 94            return data
 95
 96    async def write(self, data):
 97        future = asyncio.Future()
 98        try:
 99            self._write_queue.put_nowait((data, future))
100            await future
101
102        except aio.QueueClosedError:
103            raise ConnectionError()
104
105    async def drain(self):
106        future = asyncio.Future()
107        try:
108            self._write_queue.put_nowait((None, future))
109            await future
110
111        except aio.QueueClosedError:
112            raise ConnectionError()
113
114    async def clear_input_buffer(self):
115        async with self._input_cv:
116            count = len(self._input_buffer)
117            self._input_buffer = bytearray()
118            return count
119
120    async def _on_close(self):
121        await self._executor.spawn(self._ext_close)
122
123        async with self._input_cv:
124            self._input_cv.notify_all()
125
126        await self._executor.async_close()
127
128        self._comm_log.log(common.CommLogAction.CLOSE)
129
130    async def _read_loop(self):
131        try:
132            while True:
133                data_head = await self._executor.spawn(self._ext_read, 1)
134                if not data_head:
135                    continue
136
137                data_rest = await self._executor.spawn(self._ext_read, -1)
138
139                self._comm_log.log(common.CommLogAction.RECEIVE, data_head,
140                                   data_rest)
141
142                async with self._input_cv:
143                    self._input_buffer.extend(data_head)
144                    if data_rest:
145                        self._input_buffer.extend(data_rest)
146
147                    self._input_cv.notify_all()
148
149        except Exception as e:
150            self._log.warning('read loop error: %s', e, exc_info=e)
151
152        finally:
153            self.close()
154
155    async def _write_loop(self):
156        future = None
157        try:
158            while True:
159                data, future = await self._write_queue.get()
160
161                if data is not None:
162                    self._comm_log.log(common.CommLogAction.SEND, data)
163
164                    await self._executor.spawn(self._ext_write, data)
165
166                else:
167                    await self._executor.spawn(self._ext_flush)
168
169                if not future.done():
170                    future.set_result(None)
171
172                await asyncio.sleep(self._silent_interval)
173
174        except Exception as e:
175            self._log.warning('write loop error: %s', e, exc_info=e)
176
177        finally:
178            self.close()
179            self._write_queue.close()
180
181            while True:
182                if future and not future.done():
183                    future.set_exception(ConnectionError())
184                if self._write_queue.empty():
185                    break
186                _, future = self._write_queue.get_nowait()
187
188    def _ext_close(self):
189        self._serial.close()
190
191    def _ext_read(self, n=-1):
192        if n < 0:
193            n = self._serial.in_waiting
194
195        if n < 1:
196            return b''
197
198        return self._serial.read(n)
199
200    def _ext_write(self, data):
201        self._serial.write(data)
202
203    def _ext_flush(self):
204        self._serial.flush()
mlog: logging.Logger = <Logger hat.drivers.serial.py_serial (WARNING)>

Module logger

read_timeout: float = 0.5

Read timeout

async def create( port, *, name=None, baudrate=9600, bytesize=<ByteSize.EIGHTBITS: 8>, parity=<Parity.NONE: 'N'>, stopbits=<StopBits.ONE: 1>, xonxoff=False, rtscts=False, dsrdtr=False, silent_interval=0):
29async def create(port,
30                 *,
31                 name=None,
32                 baudrate=9600,
33                 bytesize=common.ByteSize.EIGHTBITS,
34                 parity=common.Parity.NONE,
35                 stopbits=common.StopBits.ONE,
36                 xonxoff=False,
37                 rtscts=False,
38                 dsrdtr=False,
39                 silent_interval=0):
40    endpoint = Endpoint()
41    endpoint._silent_interval = silent_interval
42    endpoint._input_buffer = bytearray()
43    endpoint._input_cv = asyncio.Condition()
44    endpoint._write_queue = aio.Queue()
45    endpoint._executor = aio.Executor(log_exceptions=False)
46    endpoint._info = common.EndpointInfo(name=name,
47                                         port=port)
48    endpoint._log = common.create_logger(mlog, endpoint._info)
49    endpoint._comm_log = common.CommunicationLogger(mlog, endpoint._info)
50
51    endpoint._serial = await endpoint._executor.spawn(
52        serial.Serial,
53        port=port,
54        baudrate=baudrate,
55        bytesize=bytesize.value,
56        parity=parity.value,
57        stopbits=stopbits.value,
58        xonxoff=xonxoff,
59        rtscts=rtscts,
60        dsrdtr=dsrdtr,
61        timeout=read_timeout)
62
63    endpoint._comm_log.log(common.CommLogAction.OPEN)
64
65    endpoint._async_group = aio.Group()
66    endpoint._async_group.spawn(aio.call_on_cancel, endpoint._on_close)
67    endpoint._async_group.spawn(endpoint._read_loop)
68    endpoint._async_group.spawn(endpoint._write_loop)
69
70    return endpoint
class Endpoint(hat.drivers.serial.common.Endpoint):
 73class Endpoint(common.Endpoint):
 74
 75    @property
 76    def async_group(self):
 77        return self._async_group
 78
 79    @property
 80    def info(self):
 81        return self._info
 82
 83    async def read(self, size):
 84        async with self._input_cv:
 85            while len(self._input_buffer) < size:
 86                if not self.is_open:
 87                    raise ConnectionError()
 88                await self._input_cv.wait()
 89
 90            if size < 1:
 91                return b''
 92
 93            buffer = memoryview(self._input_buffer)
 94            data, self._input_buffer = buffer[:size], bytearray(buffer[size:])
 95            return data
 96
 97    async def write(self, data):
 98        future = asyncio.Future()
 99        try:
100            self._write_queue.put_nowait((data, future))
101            await future
102
103        except aio.QueueClosedError:
104            raise ConnectionError()
105
106    async def drain(self):
107        future = asyncio.Future()
108        try:
109            self._write_queue.put_nowait((None, future))
110            await future
111
112        except aio.QueueClosedError:
113            raise ConnectionError()
114
115    async def clear_input_buffer(self):
116        async with self._input_cv:
117            count = len(self._input_buffer)
118            self._input_buffer = bytearray()
119            return count
120
121    async def _on_close(self):
122        await self._executor.spawn(self._ext_close)
123
124        async with self._input_cv:
125            self._input_cv.notify_all()
126
127        await self._executor.async_close()
128
129        self._comm_log.log(common.CommLogAction.CLOSE)
130
131    async def _read_loop(self):
132        try:
133            while True:
134                data_head = await self._executor.spawn(self._ext_read, 1)
135                if not data_head:
136                    continue
137
138                data_rest = await self._executor.spawn(self._ext_read, -1)
139
140                self._comm_log.log(common.CommLogAction.RECEIVE, data_head,
141                                   data_rest)
142
143                async with self._input_cv:
144                    self._input_buffer.extend(data_head)
145                    if data_rest:
146                        self._input_buffer.extend(data_rest)
147
148                    self._input_cv.notify_all()
149
150        except Exception as e:
151            self._log.warning('read loop error: %s', e, exc_info=e)
152
153        finally:
154            self.close()
155
156    async def _write_loop(self):
157        future = None
158        try:
159            while True:
160                data, future = await self._write_queue.get()
161
162                if data is not None:
163                    self._comm_log.log(common.CommLogAction.SEND, data)
164
165                    await self._executor.spawn(self._ext_write, data)
166
167                else:
168                    await self._executor.spawn(self._ext_flush)
169
170                if not future.done():
171                    future.set_result(None)
172
173                await asyncio.sleep(self._silent_interval)
174
175        except Exception as e:
176            self._log.warning('write loop error: %s', e, exc_info=e)
177
178        finally:
179            self.close()
180            self._write_queue.close()
181
182            while True:
183                if future and not future.done():
184                    future.set_exception(ConnectionError())
185                if self._write_queue.empty():
186                    break
187                _, future = self._write_queue.get_nowait()
188
189    def _ext_close(self):
190        self._serial.close()
191
192    def _ext_read(self, n=-1):
193        if n < 0:
194            n = self._serial.in_waiting
195
196        if n < 1:
197            return b''
198
199        return self._serial.read(n)
200
201    def _ext_write(self, data):
202        self._serial.write(data)
203
204    def _ext_flush(self):
205        self._serial.flush()

Serial endpoint

async_group
75    @property
76    def async_group(self):
77        return self._async_group

Group controlling resource's lifetime.

info
79    @property
80    def info(self):
81        return self._info

Endpoint info

async def read(self, size):
83    async def read(self, size):
84        async with self._input_cv:
85            while len(self._input_buffer) < size:
86                if not self.is_open:
87                    raise ConnectionError()
88                await self._input_cv.wait()
89
90            if size < 1:
91                return b''
92
93            buffer = memoryview(self._input_buffer)
94            data, self._input_buffer = buffer[:size], bytearray(buffer[size:])
95            return data

Read

Arguments:
  • size: number of bytes to read
Raises:
  • ConnectionError
async def write(self, data):
 97    async def write(self, data):
 98        future = asyncio.Future()
 99        try:
100            self._write_queue.put_nowait((data, future))
101            await future
102
103        except aio.QueueClosedError:
104            raise ConnectionError()

Write

Raises:
  • ConnectionError
async def drain(self):
106    async def drain(self):
107        future = asyncio.Future()
108        try:
109            self._write_queue.put_nowait((None, future))
110            await future
111
112        except aio.QueueClosedError:
113            raise ConnectionError()

Drain output buffer

Raises:
  • ConnectionError
async def clear_input_buffer(self):
115    async def clear_input_buffer(self):
116        async with self._input_cv:
117            count = len(self._input_buffer)
118            self._input_buffer = bytearray()
119            return count

Reset input buffer

Returns number of bytes available in buffer immediately before buffer was cleared.

Raises:
  • ConnectionError