hat.drivers.serial.native_serial

Implementation based on native serial communication

  1"""Implementation based on native serial communication"""
  2
  3import asyncio
  4import contextlib
  5import logging
  6import sys
  7
  8from hat import aio
  9from hat import util
 10
 11from hat.drivers.serial import common
 12
 13from hat.drivers.serial import _native_serial
 14
 15
 16if sys.platform == 'win32':
 17    raise ImportError('WIP')
 18
 19
 20mlog: logging.Logger = logging.getLogger(__name__)
 21"""Module logger"""
 22
 23
 24async def create(port, *,
 25                 baudrate=9600,
 26                 bytesize=common.ByteSize.EIGHTBITS,
 27                 parity=common.Parity.NONE,
 28                 stopbits=common.StopBits.ONE,
 29                 xonxoff=False,
 30                 rtscts=False,
 31                 dsrdtr=False,
 32                 silent_interval=0):
 33    endpoint = Endpoint()
 34    endpoint._port = port
 35    endpoint._silent_interval = silent_interval
 36    endpoint._loop = asyncio.get_running_loop()
 37    endpoint._input_buffer = util.BytesBuffer()
 38    endpoint._input_cv = asyncio.Condition()
 39    endpoint._write_queue = aio.Queue()
 40
 41    endpoint._serial = _native_serial.Serial(in_buff_size=0xFFFF,
 42                                             out_buff_size=0xFFFF)
 43
 44    endpoint._close_cb_future = endpoint._loop.create_future()
 45    close_cb = endpoint._create_serial_cb(endpoint._close_cb_future)
 46    endpoint._serial.set_close_cb(close_cb)
 47
 48    endpoint._serial.open(
 49        port=port,
 50        baudrate=baudrate,
 51        bytesize=bytesize.value,
 52        parity=parity.value,
 53        stopbits=(2 if stopbits == common.StopBits.ONE_POINT_FIVE
 54                  else stopbits.value),
 55        xonxoff=xonxoff,
 56        rtscts=rtscts,
 57        dsrdtr=dsrdtr)
 58
 59    endpoint._async_group = aio.Group()
 60    endpoint._async_group.spawn(aio.call_on_done,
 61                                asyncio.shield(endpoint._close_cb_future),
 62                                endpoint.close)
 63    endpoint._async_group.spawn(aio.call_on_cancel, endpoint._on_close)
 64    endpoint._async_group.spawn(endpoint._read_loop)
 65    endpoint._async_group.spawn(endpoint._write_loop)
 66
 67    return endpoint
 68
 69
 70class Endpoint(common.Endpoint):
 71
 72    @property
 73    def async_group(self):
 74        return self._async_group
 75
 76    @property
 77    def port(self):
 78        return self._port
 79
 80    async def read(self, size):
 81        async with self._input_cv:
 82            while len(self._input_buffer) < size:
 83                if not self.is_open:
 84                    raise ConnectionError()
 85
 86                await self._input_cv.wait()
 87
 88            return self._input_buffer.read(size)
 89
 90    async def write(self, data):
 91        future = self._loop.create_future()
 92        try:
 93            self._write_queue.put_nowait((data, future))
 94            await future
 95
 96        except aio.QueueClosedError:
 97            raise ConnectionError()
 98
 99    async def drain(self):
100        future = self._loop.create_future()
101        try:
102            self._write_queue.put_nowait((None, future))
103            await future
104
105        except aio.QueueClosedError:
106            raise ConnectionError()
107
108    async def reset_input_buffer(self):
109        async with self._input_cv:
110            return self._input_buffer.clear()
111
112    async def _on_close(self):
113        self._serial.close()
114
115        await self._close_cb_future
116
117        async with self._input_cv:
118            self._input_cv.notify_all()
119
120        self._serial.set_close_cb(None)
121
122    async def _read_loop(self):
123        try:
124            while True:
125                with self._create_in_change_future() as change_future:
126
127                    data = self._serial.read()
128
129                    if not data:
130                        await change_future
131                        continue
132
133                async with self._input_cv:
134                    self._input_buffer.add(data)
135                    self._input_cv.notify_all()
136
137        except Exception as e:
138            mlog.warning('read loop error: %s', e, exc_info=e)
139
140        finally:
141            self.close()
142
143    async def _write_loop(self):
144        future = None
145        try:
146            while True:
147                data, future = await self._write_queue.get()
148
149                if data is None:
150                    with self._create_drain_future() as drain_future:
151                        self._serial.drain()
152
153                        await drain_future
154
155                else:
156                    data = memoryview(data)
157                    while data:
158                        with self._create_out_change_future() as change_future:
159                            result = self._serial.write(bytes(data))
160                            if result < 0:
161                                raise Exception('write error')
162
163                            data = data[result:]
164
165                            if data:
166                                await change_future
167
168                if not future.done():
169                    future.set_result(None)
170
171                await asyncio.sleep(self._silent_interval)
172
173        except Exception as e:
174            mlog.warning('write loop error: %s', e, exc_info=e)
175
176        finally:
177            self.close()
178            self._write_queue.close()
179
180            while True:
181                if future and not future.done():
182                    future.set_exception(ConnectionError())
183                if self._write_queue.empty():
184                    break
185                _, future = self._write_queue.get_nowait()
186
187    @contextlib.contextmanager
188    def _create_in_change_future(self):
189        with self._create_serial_future(self._serial.set_in_change_cb) as f:
190            yield f
191
192    @contextlib.contextmanager
193    def _create_out_change_future(self):
194        with self._create_serial_future(self._serial.set_out_change_cb) as f:
195            yield f
196
197    @contextlib.contextmanager
198    def _create_drain_future(self):
199        with self._create_serial_future(self._serial.set_drain_cb) as f:
200            yield f
201
202    @contextlib.contextmanager
203    def _create_serial_future(self, serial_set_cb):
204        future = self._loop.create_future()
205        cb = self._create_serial_cb(future)
206        serial_set_cb(cb)
207
208        try:
209            yield future
210
211        finally:
212            serial_set_cb(None)
213
214    def _create_serial_cb(self, future):
215
216        # use wrapper to keep reference to self
217        def wrapper():
218            self._loop.call_soon_threadsafe(_try_set_result, future, None)
219
220        return wrapper
221
222
223def _try_set_result(future, result):
224    if not future.done():
225        future.set_result(result)
mlog: logging.Logger = <Logger hat.drivers.serial.native_serial (WARNING)>

Module logger

async def create( port, *, baudrate=9600, bytesize=<ByteSize.EIGHTBITS: 8>, parity=<Parity.NONE: 'N'>, stopbits=<StopBits.ONE: 1>, xonxoff=False, rtscts=False, dsrdtr=False, silent_interval=0):
25async def create(port, *,
26                 baudrate=9600,
27                 bytesize=common.ByteSize.EIGHTBITS,
28                 parity=common.Parity.NONE,
29                 stopbits=common.StopBits.ONE,
30                 xonxoff=False,
31                 rtscts=False,
32                 dsrdtr=False,
33                 silent_interval=0):
34    endpoint = Endpoint()
35    endpoint._port = port
36    endpoint._silent_interval = silent_interval
37    endpoint._loop = asyncio.get_running_loop()
38    endpoint._input_buffer = util.BytesBuffer()
39    endpoint._input_cv = asyncio.Condition()
40    endpoint._write_queue = aio.Queue()
41
42    endpoint._serial = _native_serial.Serial(in_buff_size=0xFFFF,
43                                             out_buff_size=0xFFFF)
44
45    endpoint._close_cb_future = endpoint._loop.create_future()
46    close_cb = endpoint._create_serial_cb(endpoint._close_cb_future)
47    endpoint._serial.set_close_cb(close_cb)
48
49    endpoint._serial.open(
50        port=port,
51        baudrate=baudrate,
52        bytesize=bytesize.value,
53        parity=parity.value,
54        stopbits=(2 if stopbits == common.StopBits.ONE_POINT_FIVE
55                  else stopbits.value),
56        xonxoff=xonxoff,
57        rtscts=rtscts,
58        dsrdtr=dsrdtr)
59
60    endpoint._async_group = aio.Group()
61    endpoint._async_group.spawn(aio.call_on_done,
62                                asyncio.shield(endpoint._close_cb_future),
63                                endpoint.close)
64    endpoint._async_group.spawn(aio.call_on_cancel, endpoint._on_close)
65    endpoint._async_group.spawn(endpoint._read_loop)
66    endpoint._async_group.spawn(endpoint._write_loop)
67
68    return endpoint
class Endpoint(hat.drivers.serial.common.Endpoint):
 71class Endpoint(common.Endpoint):
 72
 73    @property
 74    def async_group(self):
 75        return self._async_group
 76
 77    @property
 78    def port(self):
 79        return self._port
 80
 81    async def read(self, size):
 82        async with self._input_cv:
 83            while len(self._input_buffer) < size:
 84                if not self.is_open:
 85                    raise ConnectionError()
 86
 87                await self._input_cv.wait()
 88
 89            return self._input_buffer.read(size)
 90
 91    async def write(self, data):
 92        future = self._loop.create_future()
 93        try:
 94            self._write_queue.put_nowait((data, future))
 95            await future
 96
 97        except aio.QueueClosedError:
 98            raise ConnectionError()
 99
100    async def drain(self):
101        future = self._loop.create_future()
102        try:
103            self._write_queue.put_nowait((None, future))
104            await future
105
106        except aio.QueueClosedError:
107            raise ConnectionError()
108
109    async def reset_input_buffer(self):
110        async with self._input_cv:
111            return self._input_buffer.clear()
112
113    async def _on_close(self):
114        self._serial.close()
115
116        await self._close_cb_future
117
118        async with self._input_cv:
119            self._input_cv.notify_all()
120
121        self._serial.set_close_cb(None)
122
123    async def _read_loop(self):
124        try:
125            while True:
126                with self._create_in_change_future() as change_future:
127
128                    data = self._serial.read()
129
130                    if not data:
131                        await change_future
132                        continue
133
134                async with self._input_cv:
135                    self._input_buffer.add(data)
136                    self._input_cv.notify_all()
137
138        except Exception as e:
139            mlog.warning('read loop error: %s', e, exc_info=e)
140
141        finally:
142            self.close()
143
144    async def _write_loop(self):
145        future = None
146        try:
147            while True:
148                data, future = await self._write_queue.get()
149
150                if data is None:
151                    with self._create_drain_future() as drain_future:
152                        self._serial.drain()
153
154                        await drain_future
155
156                else:
157                    data = memoryview(data)
158                    while data:
159                        with self._create_out_change_future() as change_future:
160                            result = self._serial.write(bytes(data))
161                            if result < 0:
162                                raise Exception('write error')
163
164                            data = data[result:]
165
166                            if data:
167                                await change_future
168
169                if not future.done():
170                    future.set_result(None)
171
172                await asyncio.sleep(self._silent_interval)
173
174        except Exception as e:
175            mlog.warning('write loop error: %s', e, exc_info=e)
176
177        finally:
178            self.close()
179            self._write_queue.close()
180
181            while True:
182                if future and not future.done():
183                    future.set_exception(ConnectionError())
184                if self._write_queue.empty():
185                    break
186                _, future = self._write_queue.get_nowait()
187
188    @contextlib.contextmanager
189    def _create_in_change_future(self):
190        with self._create_serial_future(self._serial.set_in_change_cb) as f:
191            yield f
192
193    @contextlib.contextmanager
194    def _create_out_change_future(self):
195        with self._create_serial_future(self._serial.set_out_change_cb) as f:
196            yield f
197
198    @contextlib.contextmanager
199    def _create_drain_future(self):
200        with self._create_serial_future(self._serial.set_drain_cb) as f:
201            yield f
202
203    @contextlib.contextmanager
204    def _create_serial_future(self, serial_set_cb):
205        future = self._loop.create_future()
206        cb = self._create_serial_cb(future)
207        serial_set_cb(cb)
208
209        try:
210            yield future
211
212        finally:
213            serial_set_cb(None)
214
215    def _create_serial_cb(self, future):
216
217        # use wrapper to keep reference to self
218        def wrapper():
219            self._loop.call_soon_threadsafe(_try_set_result, future, None)
220
221        return wrapper

Serial endpoint

async_group
73    @property
74    def async_group(self):
75        return self._async_group

Group controlling resource's lifetime.

port
77    @property
78    def port(self):
79        return self._port

Port name

async def read(self, size):
81    async def read(self, size):
82        async with self._input_cv:
83            while len(self._input_buffer) < size:
84                if not self.is_open:
85                    raise ConnectionError()
86
87                await self._input_cv.wait()
88
89            return self._input_buffer.read(size)

Read

Arguments:
  • size: number of bytes to read
Raises:
  • ConnectionError
async def write(self, data):
91    async def write(self, data):
92        future = self._loop.create_future()
93        try:
94            self._write_queue.put_nowait((data, future))
95            await future
96
97        except aio.QueueClosedError:
98            raise ConnectionError()

Write

Raises:
  • ConnectionError
async def drain(self):
100    async def drain(self):
101        future = self._loop.create_future()
102        try:
103            self._write_queue.put_nowait((None, future))
104            await future
105
106        except aio.QueueClosedError:
107            raise ConnectionError()

Drain output buffer

Raises:
  • ConnectionError
async def reset_input_buffer(self):
109    async def reset_input_buffer(self):
110        async with self._input_cv:
111            return self._input_buffer.clear()

Reset input buffer

Returns number of bytes available in buffer immediately before buffer was cleared.

Raises:
  • ConnectionError