hat.drivers.serial.native_serial

Implementation based on native serial communication

  1"""Implementation based on native serial communication"""
  2
  3import asyncio
  4import contextlib
  5import logging
  6import sys
  7
  8from hat import aio
  9from hat import util
 10
 11from hat.drivers.serial import common
 12
 13from hat.drivers.serial import _native_serial
 14
 15
 16if sys.platform == 'win32':
 17    raise ImportError('WIP')
 18
 19
 20mlog: logging.Logger = logging.getLogger(__name__)
 21"""Module logger"""
 22
 23
 24async def create(port,
 25                 *,
 26                 name=None,
 27                 baudrate=9600,
 28                 bytesize=common.ByteSize.EIGHTBITS,
 29                 parity=common.Parity.NONE,
 30                 stopbits=common.StopBits.ONE,
 31                 xonxoff=False,
 32                 rtscts=False,
 33                 dsrdtr=False,
 34                 silent_interval=0):
 35    endpoint = Endpoint()
 36    endpoint._silent_interval = silent_interval
 37    endpoint._loop = asyncio.get_running_loop()
 38    endpoint._input_buffer = util.BytesBuffer()
 39    endpoint._input_cv = asyncio.Condition()
 40    endpoint._write_queue = aio.Queue()
 41    endpoint._info = common.EndpointInfo(name=name,
 42                                         port=port)
 43    endpoint._log = common.create_logger(mlog, endpoint._info)
 44    endpoint._comm_log = common.CommunicationLogger(mlog, endpoint._info)
 45
 46    endpoint._serial = _native_serial.Serial(in_buff_size=0xFFFF,
 47                                             out_buff_size=0xFFFF)
 48
 49    endpoint._close_cb_future = endpoint._loop.create_future()
 50    close_cb = endpoint._create_serial_cb(endpoint._close_cb_future)
 51    endpoint._serial.set_close_cb(close_cb)
 52
 53    endpoint._serial.open(
 54        port=port,
 55        baudrate=baudrate,
 56        bytesize=bytesize.value,
 57        parity=parity.value,
 58        stopbits=(2 if stopbits == common.StopBits.ONE_POINT_FIVE
 59                  else stopbits.value),
 60        xonxoff=xonxoff,
 61        rtscts=rtscts,
 62        dsrdtr=dsrdtr)
 63
 64    endpoint._comm_log.log(common.CommLogAction.OPEN)
 65
 66    endpoint._async_group = aio.Group()
 67    endpoint._async_group.spawn(aio.call_on_done,
 68                                asyncio.shield(endpoint._close_cb_future),
 69                                endpoint.close)
 70    endpoint._async_group.spawn(aio.call_on_cancel, endpoint._on_close)
 71    endpoint._async_group.spawn(endpoint._read_loop)
 72    endpoint._async_group.spawn(endpoint._write_loop)
 73
 74    return endpoint
 75
 76
 77class Endpoint(common.Endpoint):
 78
 79    @property
 80    def async_group(self):
 81        return self._async_group
 82
 83    @property
 84    def info(self):
 85        return self._info
 86
 87    async def read(self, size):
 88        async with self._input_cv:
 89            while len(self._input_buffer) < size:
 90                if not self.is_open:
 91                    raise ConnectionError()
 92
 93                await self._input_cv.wait()
 94
 95            return self._input_buffer.read(size)
 96
 97    async def write(self, data):
 98        future = self._loop.create_future()
 99        try:
100            self._write_queue.put_nowait((data, future))
101            await future
102
103        except aio.QueueClosedError:
104            raise ConnectionError()
105
106    async def drain(self):
107        future = self._loop.create_future()
108        try:
109            self._write_queue.put_nowait((None, future))
110            await future
111
112        except aio.QueueClosedError:
113            raise ConnectionError()
114
115    async def clear_input_buffer(self):
116        async with self._input_cv:
117            return self._input_buffer.clear()
118
119    async def _on_close(self):
120        self._serial.close()
121
122        await self._close_cb_future
123
124        async with self._input_cv:
125            self._input_cv.notify_all()
126
127        self._serial.set_close_cb(None)
128
129        self._comm_log.log(common.CommLogAction.CLOSE)
130
131    async def _read_loop(self):
132        try:
133            while True:
134                with self._create_in_change_future() as change_future:
135
136                    data = self._serial.read()
137
138                    if not data:
139                        await change_future
140                        continue
141
142                self._comm_log.log(common.CommLogAction.RECEIVE, data)
143
144                async with self._input_cv:
145                    self._input_buffer.add(data)
146                    self._input_cv.notify_all()
147
148        except Exception as e:
149            self._log.warning('read loop error: %s', e, exc_info=e)
150
151        finally:
152            self.close()
153
154    async def _write_loop(self):
155        future = None
156        try:
157            while True:
158                data, future = await self._write_queue.get()
159
160                if data is None:
161                    with self._create_drain_future() as drain_future:
162                        self._serial.drain()
163
164                        await drain_future
165
166                else:
167                    data = memoryview(data)
168                    while data:
169                        with self._create_out_change_future() as change_future:
170                            result = self._serial.write(bytes(data))
171                            if result < 0:
172                                raise Exception('write error')
173
174                            self._comm_log.log(common.CommLogAction.SEND,
175                                               data[:result])
176
177                            data = data[result:]
178
179                            if data:
180                                await change_future
181
182                if not future.done():
183                    future.set_result(None)
184
185                await asyncio.sleep(self._silent_interval)
186
187        except Exception as e:
188            self._log.warning('write loop error: %s', e, exc_info=e)
189
190        finally:
191            self.close()
192            self._write_queue.close()
193
194            while True:
195                if future and not future.done():
196                    future.set_exception(ConnectionError())
197                if self._write_queue.empty():
198                    break
199                _, future = self._write_queue.get_nowait()
200
201    @contextlib.contextmanager
202    def _create_in_change_future(self):
203        with self._create_serial_future(self._serial.set_in_change_cb) as f:
204            yield f
205
206    @contextlib.contextmanager
207    def _create_out_change_future(self):
208        with self._create_serial_future(self._serial.set_out_change_cb) as f:
209            yield f
210
211    @contextlib.contextmanager
212    def _create_drain_future(self):
213        with self._create_serial_future(self._serial.set_drain_cb) as f:
214            yield f
215
216    @contextlib.contextmanager
217    def _create_serial_future(self, serial_set_cb):
218        future = self._loop.create_future()
219        cb = self._create_serial_cb(future)
220        serial_set_cb(cb)
221
222        try:
223            yield future
224
225        finally:
226            serial_set_cb(None)
227
228    def _create_serial_cb(self, future):
229
230        # use wrapper to keep reference to self
231        def wrapper():
232            self._loop.call_soon_threadsafe(_try_set_result, future, None)
233
234        return wrapper
235
236
237def _try_set_result(future, result):
238    if not future.done():
239        future.set_result(result)
mlog: logging.Logger = <Logger hat.drivers.serial.native_serial (WARNING)>

Module logger

async def create( port, *, name=None, baudrate=9600, bytesize=<ByteSize.EIGHTBITS: 8>, parity=<Parity.NONE: 'N'>, stopbits=<StopBits.ONE: 1>, xonxoff=False, rtscts=False, dsrdtr=False, silent_interval=0):
25async def create(port,
26                 *,
27                 name=None,
28                 baudrate=9600,
29                 bytesize=common.ByteSize.EIGHTBITS,
30                 parity=common.Parity.NONE,
31                 stopbits=common.StopBits.ONE,
32                 xonxoff=False,
33                 rtscts=False,
34                 dsrdtr=False,
35                 silent_interval=0):
36    endpoint = Endpoint()
37    endpoint._silent_interval = silent_interval
38    endpoint._loop = asyncio.get_running_loop()
39    endpoint._input_buffer = util.BytesBuffer()
40    endpoint._input_cv = asyncio.Condition()
41    endpoint._write_queue = aio.Queue()
42    endpoint._info = common.EndpointInfo(name=name,
43                                         port=port)
44    endpoint._log = common.create_logger(mlog, endpoint._info)
45    endpoint._comm_log = common.CommunicationLogger(mlog, endpoint._info)
46
47    endpoint._serial = _native_serial.Serial(in_buff_size=0xFFFF,
48                                             out_buff_size=0xFFFF)
49
50    endpoint._close_cb_future = endpoint._loop.create_future()
51    close_cb = endpoint._create_serial_cb(endpoint._close_cb_future)
52    endpoint._serial.set_close_cb(close_cb)
53
54    endpoint._serial.open(
55        port=port,
56        baudrate=baudrate,
57        bytesize=bytesize.value,
58        parity=parity.value,
59        stopbits=(2 if stopbits == common.StopBits.ONE_POINT_FIVE
60                  else stopbits.value),
61        xonxoff=xonxoff,
62        rtscts=rtscts,
63        dsrdtr=dsrdtr)
64
65    endpoint._comm_log.log(common.CommLogAction.OPEN)
66
67    endpoint._async_group = aio.Group()
68    endpoint._async_group.spawn(aio.call_on_done,
69                                asyncio.shield(endpoint._close_cb_future),
70                                endpoint.close)
71    endpoint._async_group.spawn(aio.call_on_cancel, endpoint._on_close)
72    endpoint._async_group.spawn(endpoint._read_loop)
73    endpoint._async_group.spawn(endpoint._write_loop)
74
75    return endpoint
class Endpoint(hat.drivers.serial.common.Endpoint):
 78class Endpoint(common.Endpoint):
 79
 80    @property
 81    def async_group(self):
 82        return self._async_group
 83
 84    @property
 85    def info(self):
 86        return self._info
 87
 88    async def read(self, size):
 89        async with self._input_cv:
 90            while len(self._input_buffer) < size:
 91                if not self.is_open:
 92                    raise ConnectionError()
 93
 94                await self._input_cv.wait()
 95
 96            return self._input_buffer.read(size)
 97
 98    async def write(self, data):
 99        future = self._loop.create_future()
100        try:
101            self._write_queue.put_nowait((data, future))
102            await future
103
104        except aio.QueueClosedError:
105            raise ConnectionError()
106
107    async def drain(self):
108        future = self._loop.create_future()
109        try:
110            self._write_queue.put_nowait((None, future))
111            await future
112
113        except aio.QueueClosedError:
114            raise ConnectionError()
115
116    async def clear_input_buffer(self):
117        async with self._input_cv:
118            return self._input_buffer.clear()
119
120    async def _on_close(self):
121        self._serial.close()
122
123        await self._close_cb_future
124
125        async with self._input_cv:
126            self._input_cv.notify_all()
127
128        self._serial.set_close_cb(None)
129
130        self._comm_log.log(common.CommLogAction.CLOSE)
131
132    async def _read_loop(self):
133        try:
134            while True:
135                with self._create_in_change_future() as change_future:
136
137                    data = self._serial.read()
138
139                    if not data:
140                        await change_future
141                        continue
142
143                self._comm_log.log(common.CommLogAction.RECEIVE, data)
144
145                async with self._input_cv:
146                    self._input_buffer.add(data)
147                    self._input_cv.notify_all()
148
149        except Exception as e:
150            self._log.warning('read loop error: %s', e, exc_info=e)
151
152        finally:
153            self.close()
154
155    async def _write_loop(self):
156        future = None
157        try:
158            while True:
159                data, future = await self._write_queue.get()
160
161                if data is None:
162                    with self._create_drain_future() as drain_future:
163                        self._serial.drain()
164
165                        await drain_future
166
167                else:
168                    data = memoryview(data)
169                    while data:
170                        with self._create_out_change_future() as change_future:
171                            result = self._serial.write(bytes(data))
172                            if result < 0:
173                                raise Exception('write error')
174
175                            self._comm_log.log(common.CommLogAction.SEND,
176                                               data[:result])
177
178                            data = data[result:]
179
180                            if data:
181                                await change_future
182
183                if not future.done():
184                    future.set_result(None)
185
186                await asyncio.sleep(self._silent_interval)
187
188        except Exception as e:
189            self._log.warning('write loop error: %s', e, exc_info=e)
190
191        finally:
192            self.close()
193            self._write_queue.close()
194
195            while True:
196                if future and not future.done():
197                    future.set_exception(ConnectionError())
198                if self._write_queue.empty():
199                    break
200                _, future = self._write_queue.get_nowait()
201
202    @contextlib.contextmanager
203    def _create_in_change_future(self):
204        with self._create_serial_future(self._serial.set_in_change_cb) as f:
205            yield f
206
207    @contextlib.contextmanager
208    def _create_out_change_future(self):
209        with self._create_serial_future(self._serial.set_out_change_cb) as f:
210            yield f
211
212    @contextlib.contextmanager
213    def _create_drain_future(self):
214        with self._create_serial_future(self._serial.set_drain_cb) as f:
215            yield f
216
217    @contextlib.contextmanager
218    def _create_serial_future(self, serial_set_cb):
219        future = self._loop.create_future()
220        cb = self._create_serial_cb(future)
221        serial_set_cb(cb)
222
223        try:
224            yield future
225
226        finally:
227            serial_set_cb(None)
228
229    def _create_serial_cb(self, future):
230
231        # use wrapper to keep reference to self
232        def wrapper():
233            self._loop.call_soon_threadsafe(_try_set_result, future, None)
234
235        return wrapper

Serial endpoint

async_group
80    @property
81    def async_group(self):
82        return self._async_group

Group controlling resource's lifetime.

info
84    @property
85    def info(self):
86        return self._info

Endpoint info

async def read(self, size):
88    async def read(self, size):
89        async with self._input_cv:
90            while len(self._input_buffer) < size:
91                if not self.is_open:
92                    raise ConnectionError()
93
94                await self._input_cv.wait()
95
96            return self._input_buffer.read(size)

Read

Arguments:
  • size: number of bytes to read
Raises:
  • ConnectionError
async def write(self, data):
 98    async def write(self, data):
 99        future = self._loop.create_future()
100        try:
101            self._write_queue.put_nowait((data, future))
102            await future
103
104        except aio.QueueClosedError:
105            raise ConnectionError()

Write

Raises:
  • ConnectionError
async def drain(self):
107    async def drain(self):
108        future = self._loop.create_future()
109        try:
110            self._write_queue.put_nowait((None, future))
111            await future
112
113        except aio.QueueClosedError:
114            raise ConnectionError()

Drain output buffer

Raises:
  • ConnectionError
async def clear_input_buffer(self):
116    async def clear_input_buffer(self):
117        async with self._input_cv:
118            return self._input_buffer.clear()

Reset input buffer

Returns number of bytes available in buffer immediately before buffer was cleared.

Raises:
  • ConnectionError