hat.drivers.serial.native_serial
Implementation based on native serial communication
1"""Implementation based on native serial communication""" 2 3import asyncio 4import contextlib 5import functools 6import logging 7import sys 8 9from hat import aio 10from hat import util 11 12from hat.drivers.serial import common 13 14from hat.drivers.serial import _native_serial 15 16 17if sys.platform == 'win32': 18 raise ImportError('WIP') 19 20 21mlog: logging.Logger = logging.getLogger(__name__) 22"""Module logger""" 23 24 25async def create(port, *, 26 baudrate=9600, 27 bytesize=common.ByteSize.EIGHTBITS, 28 parity=common.Parity.NONE, 29 stopbits=common.StopBits.ONE, 30 xonxoff=False, 31 rtscts=False, 32 dsrdtr=False, 33 silent_interval=0): 34 endpoint = Endpoint() 35 endpoint._port = port 36 endpoint._silent_interval = silent_interval 37 endpoint._loop = asyncio.get_running_loop() 38 endpoint._input_buffer = util.BytesBuffer() 39 endpoint._input_cv = asyncio.Condition() 40 endpoint._write_queue = aio.Queue() 41 42 endpoint._serial = _native_serial.Serial(in_buff_size=0xFFFF, 43 out_buff_size=0xFFFF) 44 45 endpoint._close_cb_future = endpoint._loop.create_future() 46 close_cb = endpoint._create_serial_cb(endpoint._close_cb_future) 47 endpoint._serial.set_close_cb(close_cb) 48 49 endpoint._serial.open( 50 port=port, 51 baudrate=baudrate, 52 bytesize=bytesize.value, 53 parity=parity.value, 54 stopbits=(2 if stopbits == common.StopBits.ONE_POINT_FIVE 55 else stopbits.value), 56 xonxoff=xonxoff, 57 rtscts=rtscts, 58 dsrdtr=dsrdtr) 59 60 endpoint._async_group = aio.Group() 61 endpoint._async_group.spawn(aio.call_on_done, 62 asyncio.shield(endpoint._close_cb_future), 63 endpoint.close) 64 endpoint._async_group.spawn(aio.call_on_cancel, endpoint._on_close) 65 endpoint._async_group.spawn(endpoint._read_loop) 66 endpoint._async_group.spawn(endpoint._write_loop) 67 68 return endpoint 69 70 71class Endpoint(common.Endpoint): 72 73 @property 74 def async_group(self): 75 return self._async_group 76 77 @property 78 def port(self): 79 return self._port 80 81 async def read(self, size): 82 async with self._input_cv: 83 while len(self._input_buffer) < size: 84 if not self.is_open: 85 raise ConnectionError() 86 87 await self._input_cv.wait() 88 89 return self._input_buffer.read(size) 90 91 async def write(self, data): 92 future = self._loop.create_future() 93 try: 94 self._write_queue.put_nowait((data, future)) 95 await future 96 97 except aio.QueueClosedError: 98 raise ConnectionError() 99 100 async def drain(self): 101 future = self._loop.create_future() 102 try: 103 self._write_queue.put_nowait((None, future)) 104 await future 105 106 except aio.QueueClosedError: 107 raise ConnectionError() 108 109 async def reset_input_buffer(self): 110 async with self._input_cv: 111 return self._input_buffer.clear() 112 113 async def _on_close(self): 114 self._serial.close() 115 116 await self._close_cb_future 117 118 async with self._input_cv: 119 self._input_cv.notify_all() 120 121 self._serial.set_close_cb(None) 122 123 async def _read_loop(self): 124 try: 125 while True: 126 with self._create_in_change_future() as change_future: 127 128 data = self._serial.read() 129 130 if not data: 131 await change_future 132 continue 133 134 async with self._input_cv: 135 self._input_buffer.add(data) 136 self._input_cv.notify_all() 137 138 except Exception as e: 139 mlog.warning('read loop error: %s', e, exc_info=e) 140 141 finally: 142 self.close() 143 144 async def _write_loop(self): 145 future = None 146 try: 147 while True: 148 data, future = await self._write_queue.get() 149 150 if data is None: 151 with self._create_drain_future() as drain_future: 152 self._serial.drain() 153 154 await drain_future 155 156 else: 157 data = memoryview(data) 158 while data: 159 with self._create_out_change_future() as change_future: 160 result = self._serial.write(bytes(data)) 161 if result < 0: 162 raise Exception('write error') 163 164 data = data[result:] 165 166 if data: 167 await change_future 168 169 if not future.done(): 170 future.set_result(None) 171 172 await asyncio.sleep(self._silent_interval) 173 174 except Exception as e: 175 mlog.warning('write loop error: %s', e, exc_info=e) 176 177 finally: 178 self.close() 179 self._write_queue.close() 180 181 while True: 182 if future and not future.done(): 183 future.set_exception(ConnectionError()) 184 if self._write_queue.empty(): 185 break 186 _, future = self._write_queue.get_nowait() 187 188 @contextlib.contextmanager 189 def _create_in_change_future(self): 190 with self._create_serial_future(self._serial.set_in_change_cb) as f: 191 yield f 192 193 @contextlib.contextmanager 194 def _create_out_change_future(self): 195 with self._create_serial_future(self._serial.set_out_change_cb) as f: 196 yield f 197 198 @contextlib.contextmanager 199 def _create_drain_future(self): 200 with self._create_serial_future(self._serial.set_drain_cb) as f: 201 yield f 202 203 @contextlib.contextmanager 204 def _create_serial_future(self, serial_set_cb): 205 future = self._loop.create_future() 206 cb = self._create_serial_cb(future) 207 serial_set_cb(cb) 208 209 try: 210 yield future 211 212 finally: 213 serial_set_cb(None) 214 215 def _create_serial_cb(self, future): 216 return functools.partial(self._loop.call_soon_threadsafe, 217 _try_set_result, future, None) 218 219 220def _try_set_result(future, result): 221 if not future.done(): 222 future.set_result(result)
Module logger
async def
create( port, *, baudrate=9600, bytesize=<ByteSize.EIGHTBITS: 8>, parity=<Parity.NONE: 'N'>, stopbits=<StopBits.ONE: 1>, xonxoff=False, rtscts=False, dsrdtr=False, silent_interval=0):
26async def create(port, *, 27 baudrate=9600, 28 bytesize=common.ByteSize.EIGHTBITS, 29 parity=common.Parity.NONE, 30 stopbits=common.StopBits.ONE, 31 xonxoff=False, 32 rtscts=False, 33 dsrdtr=False, 34 silent_interval=0): 35 endpoint = Endpoint() 36 endpoint._port = port 37 endpoint._silent_interval = silent_interval 38 endpoint._loop = asyncio.get_running_loop() 39 endpoint._input_buffer = util.BytesBuffer() 40 endpoint._input_cv = asyncio.Condition() 41 endpoint._write_queue = aio.Queue() 42 43 endpoint._serial = _native_serial.Serial(in_buff_size=0xFFFF, 44 out_buff_size=0xFFFF) 45 46 endpoint._close_cb_future = endpoint._loop.create_future() 47 close_cb = endpoint._create_serial_cb(endpoint._close_cb_future) 48 endpoint._serial.set_close_cb(close_cb) 49 50 endpoint._serial.open( 51 port=port, 52 baudrate=baudrate, 53 bytesize=bytesize.value, 54 parity=parity.value, 55 stopbits=(2 if stopbits == common.StopBits.ONE_POINT_FIVE 56 else stopbits.value), 57 xonxoff=xonxoff, 58 rtscts=rtscts, 59 dsrdtr=dsrdtr) 60 61 endpoint._async_group = aio.Group() 62 endpoint._async_group.spawn(aio.call_on_done, 63 asyncio.shield(endpoint._close_cb_future), 64 endpoint.close) 65 endpoint._async_group.spawn(aio.call_on_cancel, endpoint._on_close) 66 endpoint._async_group.spawn(endpoint._read_loop) 67 endpoint._async_group.spawn(endpoint._write_loop) 68 69 return endpoint
class
Endpoint(hat.drivers.serial.common.Endpoint):
72class Endpoint(common.Endpoint): 73 74 @property 75 def async_group(self): 76 return self._async_group 77 78 @property 79 def port(self): 80 return self._port 81 82 async def read(self, size): 83 async with self._input_cv: 84 while len(self._input_buffer) < size: 85 if not self.is_open: 86 raise ConnectionError() 87 88 await self._input_cv.wait() 89 90 return self._input_buffer.read(size) 91 92 async def write(self, data): 93 future = self._loop.create_future() 94 try: 95 self._write_queue.put_nowait((data, future)) 96 await future 97 98 except aio.QueueClosedError: 99 raise ConnectionError() 100 101 async def drain(self): 102 future = self._loop.create_future() 103 try: 104 self._write_queue.put_nowait((None, future)) 105 await future 106 107 except aio.QueueClosedError: 108 raise ConnectionError() 109 110 async def reset_input_buffer(self): 111 async with self._input_cv: 112 return self._input_buffer.clear() 113 114 async def _on_close(self): 115 self._serial.close() 116 117 await self._close_cb_future 118 119 async with self._input_cv: 120 self._input_cv.notify_all() 121 122 self._serial.set_close_cb(None) 123 124 async def _read_loop(self): 125 try: 126 while True: 127 with self._create_in_change_future() as change_future: 128 129 data = self._serial.read() 130 131 if not data: 132 await change_future 133 continue 134 135 async with self._input_cv: 136 self._input_buffer.add(data) 137 self._input_cv.notify_all() 138 139 except Exception as e: 140 mlog.warning('read loop error: %s', e, exc_info=e) 141 142 finally: 143 self.close() 144 145 async def _write_loop(self): 146 future = None 147 try: 148 while True: 149 data, future = await self._write_queue.get() 150 151 if data is None: 152 with self._create_drain_future() as drain_future: 153 self._serial.drain() 154 155 await drain_future 156 157 else: 158 data = memoryview(data) 159 while data: 160 with self._create_out_change_future() as change_future: 161 result = self._serial.write(bytes(data)) 162 if result < 0: 163 raise Exception('write error') 164 165 data = data[result:] 166 167 if data: 168 await change_future 169 170 if not future.done(): 171 future.set_result(None) 172 173 await asyncio.sleep(self._silent_interval) 174 175 except Exception as e: 176 mlog.warning('write loop error: %s', e, exc_info=e) 177 178 finally: 179 self.close() 180 self._write_queue.close() 181 182 while True: 183 if future and not future.done(): 184 future.set_exception(ConnectionError()) 185 if self._write_queue.empty(): 186 break 187 _, future = self._write_queue.get_nowait() 188 189 @contextlib.contextmanager 190 def _create_in_change_future(self): 191 with self._create_serial_future(self._serial.set_in_change_cb) as f: 192 yield f 193 194 @contextlib.contextmanager 195 def _create_out_change_future(self): 196 with self._create_serial_future(self._serial.set_out_change_cb) as f: 197 yield f 198 199 @contextlib.contextmanager 200 def _create_drain_future(self): 201 with self._create_serial_future(self._serial.set_drain_cb) as f: 202 yield f 203 204 @contextlib.contextmanager 205 def _create_serial_future(self, serial_set_cb): 206 future = self._loop.create_future() 207 cb = self._create_serial_cb(future) 208 serial_set_cb(cb) 209 210 try: 211 yield future 212 213 finally: 214 serial_set_cb(None) 215 216 def _create_serial_cb(self, future): 217 return functools.partial(self._loop.call_soon_threadsafe, 218 _try_set_result, future, None)
Serial endpoint
async def
read(self, size):
82 async def read(self, size): 83 async with self._input_cv: 84 while len(self._input_buffer) < size: 85 if not self.is_open: 86 raise ConnectionError() 87 88 await self._input_cv.wait() 89 90 return self._input_buffer.read(size)
Read
Arguments:
- size: number of bytes to read
Raises:
- ConnectionError
async def
write(self, data):
92 async def write(self, data): 93 future = self._loop.create_future() 94 try: 95 self._write_queue.put_nowait((data, future)) 96 await future 97 98 except aio.QueueClosedError: 99 raise ConnectionError()
Write
Raises:
- ConnectionError