hat.drivers.serial.native_serial

Implementation based on native serial communication

  1"""Implementation based on native serial communication"""
  2
  3import asyncio
  4import contextlib
  5import functools
  6import logging
  7import sys
  8
  9from hat import aio
 10from hat import util
 11
 12from hat.drivers.serial import common
 13
 14from hat.drivers.serial import _native_serial
 15
 16
 17if sys.platform == 'win32':
 18    raise ImportError('WIP')
 19
 20
 21mlog: logging.Logger = logging.getLogger(__name__)
 22"""Module logger"""
 23
 24
 25async def create(port, *,
 26                 baudrate=9600,
 27                 bytesize=common.ByteSize.EIGHTBITS,
 28                 parity=common.Parity.NONE,
 29                 stopbits=common.StopBits.ONE,
 30                 xonxoff=False,
 31                 rtscts=False,
 32                 dsrdtr=False,
 33                 silent_interval=0):
 34    endpoint = Endpoint()
 35    endpoint._port = port
 36    endpoint._silent_interval = silent_interval
 37    endpoint._loop = asyncio.get_running_loop()
 38    endpoint._input_buffer = util.BytesBuffer()
 39    endpoint._input_cv = asyncio.Condition()
 40    endpoint._write_queue = aio.Queue()
 41
 42    endpoint._serial = _native_serial.Serial(in_buff_size=0xFFFF,
 43                                             out_buff_size=0xFFFF)
 44
 45    endpoint._close_cb_future = endpoint._loop.create_future()
 46    close_cb = endpoint._create_serial_cb(endpoint._close_cb_future)
 47    endpoint._serial.set_close_cb(close_cb)
 48
 49    endpoint._serial.open(
 50        port=port,
 51        baudrate=baudrate,
 52        bytesize=bytesize.value,
 53        parity=parity.value,
 54        stopbits=(2 if stopbits == common.StopBits.ONE_POINT_FIVE
 55                  else stopbits.value),
 56        xonxoff=xonxoff,
 57        rtscts=rtscts,
 58        dsrdtr=dsrdtr)
 59
 60    endpoint._async_group = aio.Group()
 61    endpoint._async_group.spawn(aio.call_on_done,
 62                                asyncio.shield(endpoint._close_cb_future),
 63                                endpoint.close)
 64    endpoint._async_group.spawn(aio.call_on_cancel, endpoint._on_close)
 65    endpoint._async_group.spawn(endpoint._read_loop)
 66    endpoint._async_group.spawn(endpoint._write_loop)
 67
 68    return endpoint
 69
 70
 71class Endpoint(common.Endpoint):
 72
 73    @property
 74    def async_group(self):
 75        return self._async_group
 76
 77    @property
 78    def port(self):
 79        return self._port
 80
 81    async def read(self, size):
 82        async with self._input_cv:
 83            while len(self._input_buffer) < size:
 84                if not self.is_open:
 85                    raise ConnectionError()
 86
 87                await self._input_cv.wait()
 88
 89            return self._input_buffer.read(size)
 90
 91    async def write(self, data):
 92        future = self._loop.create_future()
 93        try:
 94            self._write_queue.put_nowait((data, future))
 95            await future
 96
 97        except aio.QueueClosedError:
 98            raise ConnectionError()
 99
100    async def drain(self):
101        future = self._loop.create_future()
102        try:
103            self._write_queue.put_nowait((None, future))
104            await future
105
106        except aio.QueueClosedError:
107            raise ConnectionError()
108
109    async def reset_input_buffer(self):
110        async with self._input_cv:
111            return self._input_buffer.clear()
112
113    async def _on_close(self):
114        self._serial.close()
115
116        await self._close_cb_future
117
118        async with self._input_cv:
119            self._input_cv.notify_all()
120
121        self._serial.set_close_cb(None)
122
123    async def _read_loop(self):
124        try:
125            while True:
126                with self._create_in_change_future() as change_future:
127
128                    data = self._serial.read()
129
130                    if not data:
131                        await change_future
132                        continue
133
134                async with self._input_cv:
135                    self._input_buffer.add(data)
136                    self._input_cv.notify_all()
137
138        except Exception as e:
139            mlog.warning('read loop error: %s', e, exc_info=e)
140
141        finally:
142            self.close()
143
144    async def _write_loop(self):
145        future = None
146        try:
147            while True:
148                data, future = await self._write_queue.get()
149
150                if data is None:
151                    with self._create_drain_future() as drain_future:
152                        self._serial.drain()
153
154                        await drain_future
155
156                else:
157                    data = memoryview(data)
158                    while data:
159                        with self._create_out_change_future() as change_future:
160                            result = self._serial.write(bytes(data))
161                            if result < 0:
162                                raise Exception('write error')
163
164                            data = data[result:]
165
166                            if data:
167                                await change_future
168
169                if not future.done():
170                    future.set_result(None)
171
172                await asyncio.sleep(self._silent_interval)
173
174        except Exception as e:
175            mlog.warning('write loop error: %s', e, exc_info=e)
176
177        finally:
178            self.close()
179            self._write_queue.close()
180
181            while True:
182                if future and not future.done():
183                    future.set_exception(ConnectionError())
184                if self._write_queue.empty():
185                    break
186                _, future = self._write_queue.get_nowait()
187
188    @contextlib.contextmanager
189    def _create_in_change_future(self):
190        with self._create_serial_future(self._serial.set_in_change_cb) as f:
191            yield f
192
193    @contextlib.contextmanager
194    def _create_out_change_future(self):
195        with self._create_serial_future(self._serial.set_out_change_cb) as f:
196            yield f
197
198    @contextlib.contextmanager
199    def _create_drain_future(self):
200        with self._create_serial_future(self._serial.set_drain_cb) as f:
201            yield f
202
203    @contextlib.contextmanager
204    def _create_serial_future(self, serial_set_cb):
205        future = self._loop.create_future()
206        cb = self._create_serial_cb(future)
207        serial_set_cb(cb)
208
209        try:
210            yield future
211
212        finally:
213            serial_set_cb(None)
214
215    def _create_serial_cb(self, future):
216        return functools.partial(self._loop.call_soon_threadsafe,
217                                 _try_set_result, future, None)
218
219
220def _try_set_result(future, result):
221    if not future.done():
222        future.set_result(result)
mlog: logging.Logger = <Logger hat.drivers.serial.native_serial (WARNING)>

Module logger

async def create( port, *, baudrate=9600, bytesize=<ByteSize.EIGHTBITS: 8>, parity=<Parity.NONE: 'N'>, stopbits=<StopBits.ONE: 1>, xonxoff=False, rtscts=False, dsrdtr=False, silent_interval=0):
26async def create(port, *,
27                 baudrate=9600,
28                 bytesize=common.ByteSize.EIGHTBITS,
29                 parity=common.Parity.NONE,
30                 stopbits=common.StopBits.ONE,
31                 xonxoff=False,
32                 rtscts=False,
33                 dsrdtr=False,
34                 silent_interval=0):
35    endpoint = Endpoint()
36    endpoint._port = port
37    endpoint._silent_interval = silent_interval
38    endpoint._loop = asyncio.get_running_loop()
39    endpoint._input_buffer = util.BytesBuffer()
40    endpoint._input_cv = asyncio.Condition()
41    endpoint._write_queue = aio.Queue()
42
43    endpoint._serial = _native_serial.Serial(in_buff_size=0xFFFF,
44                                             out_buff_size=0xFFFF)
45
46    endpoint._close_cb_future = endpoint._loop.create_future()
47    close_cb = endpoint._create_serial_cb(endpoint._close_cb_future)
48    endpoint._serial.set_close_cb(close_cb)
49
50    endpoint._serial.open(
51        port=port,
52        baudrate=baudrate,
53        bytesize=bytesize.value,
54        parity=parity.value,
55        stopbits=(2 if stopbits == common.StopBits.ONE_POINT_FIVE
56                  else stopbits.value),
57        xonxoff=xonxoff,
58        rtscts=rtscts,
59        dsrdtr=dsrdtr)
60
61    endpoint._async_group = aio.Group()
62    endpoint._async_group.spawn(aio.call_on_done,
63                                asyncio.shield(endpoint._close_cb_future),
64                                endpoint.close)
65    endpoint._async_group.spawn(aio.call_on_cancel, endpoint._on_close)
66    endpoint._async_group.spawn(endpoint._read_loop)
67    endpoint._async_group.spawn(endpoint._write_loop)
68
69    return endpoint
class Endpoint(hat.drivers.serial.common.Endpoint):
 72class Endpoint(common.Endpoint):
 73
 74    @property
 75    def async_group(self):
 76        return self._async_group
 77
 78    @property
 79    def port(self):
 80        return self._port
 81
 82    async def read(self, size):
 83        async with self._input_cv:
 84            while len(self._input_buffer) < size:
 85                if not self.is_open:
 86                    raise ConnectionError()
 87
 88                await self._input_cv.wait()
 89
 90            return self._input_buffer.read(size)
 91
 92    async def write(self, data):
 93        future = self._loop.create_future()
 94        try:
 95            self._write_queue.put_nowait((data, future))
 96            await future
 97
 98        except aio.QueueClosedError:
 99            raise ConnectionError()
100
101    async def drain(self):
102        future = self._loop.create_future()
103        try:
104            self._write_queue.put_nowait((None, future))
105            await future
106
107        except aio.QueueClosedError:
108            raise ConnectionError()
109
110    async def reset_input_buffer(self):
111        async with self._input_cv:
112            return self._input_buffer.clear()
113
114    async def _on_close(self):
115        self._serial.close()
116
117        await self._close_cb_future
118
119        async with self._input_cv:
120            self._input_cv.notify_all()
121
122        self._serial.set_close_cb(None)
123
124    async def _read_loop(self):
125        try:
126            while True:
127                with self._create_in_change_future() as change_future:
128
129                    data = self._serial.read()
130
131                    if not data:
132                        await change_future
133                        continue
134
135                async with self._input_cv:
136                    self._input_buffer.add(data)
137                    self._input_cv.notify_all()
138
139        except Exception as e:
140            mlog.warning('read loop error: %s', e, exc_info=e)
141
142        finally:
143            self.close()
144
145    async def _write_loop(self):
146        future = None
147        try:
148            while True:
149                data, future = await self._write_queue.get()
150
151                if data is None:
152                    with self._create_drain_future() as drain_future:
153                        self._serial.drain()
154
155                        await drain_future
156
157                else:
158                    data = memoryview(data)
159                    while data:
160                        with self._create_out_change_future() as change_future:
161                            result = self._serial.write(bytes(data))
162                            if result < 0:
163                                raise Exception('write error')
164
165                            data = data[result:]
166
167                            if data:
168                                await change_future
169
170                if not future.done():
171                    future.set_result(None)
172
173                await asyncio.sleep(self._silent_interval)
174
175        except Exception as e:
176            mlog.warning('write loop error: %s', e, exc_info=e)
177
178        finally:
179            self.close()
180            self._write_queue.close()
181
182            while True:
183                if future and not future.done():
184                    future.set_exception(ConnectionError())
185                if self._write_queue.empty():
186                    break
187                _, future = self._write_queue.get_nowait()
188
189    @contextlib.contextmanager
190    def _create_in_change_future(self):
191        with self._create_serial_future(self._serial.set_in_change_cb) as f:
192            yield f
193
194    @contextlib.contextmanager
195    def _create_out_change_future(self):
196        with self._create_serial_future(self._serial.set_out_change_cb) as f:
197            yield f
198
199    @contextlib.contextmanager
200    def _create_drain_future(self):
201        with self._create_serial_future(self._serial.set_drain_cb) as f:
202            yield f
203
204    @contextlib.contextmanager
205    def _create_serial_future(self, serial_set_cb):
206        future = self._loop.create_future()
207        cb = self._create_serial_cb(future)
208        serial_set_cb(cb)
209
210        try:
211            yield future
212
213        finally:
214            serial_set_cb(None)
215
216    def _create_serial_cb(self, future):
217        return functools.partial(self._loop.call_soon_threadsafe,
218                                 _try_set_result, future, None)

Serial endpoint

async_group
74    @property
75    def async_group(self):
76        return self._async_group

Group controlling resource's lifetime.

port
78    @property
79    def port(self):
80        return self._port

Port name

async def read(self, size):
82    async def read(self, size):
83        async with self._input_cv:
84            while len(self._input_buffer) < size:
85                if not self.is_open:
86                    raise ConnectionError()
87
88                await self._input_cv.wait()
89
90            return self._input_buffer.read(size)

Read

Arguments:
  • size: number of bytes to read
Raises:
  • ConnectionError
async def write(self, data):
92    async def write(self, data):
93        future = self._loop.create_future()
94        try:
95            self._write_queue.put_nowait((data, future))
96            await future
97
98        except aio.QueueClosedError:
99            raise ConnectionError()

Write

Raises:
  • ConnectionError
async def drain(self):
101    async def drain(self):
102        future = self._loop.create_future()
103        try:
104            self._write_queue.put_nowait((None, future))
105            await future
106
107        except aio.QueueClosedError:
108            raise ConnectionError()

Drain output buffer

Raises:
  • ConnectionError
async def reset_input_buffer(self):
110    async def reset_input_buffer(self):
111        async with self._input_cv:
112            return self._input_buffer.clear()

Reset input buffer

Returns number of bytes available in buffer immediately before buffer was cleared.

Raises:
  • ConnectionError
Inherited Members
hat.aio.group.Resource
is_open
is_closing
is_closed
wait_closing
wait_closed
close
async_close