hat.drivers.serial.native_serial
Implementation based on native serial communication
1"""Implementation based on native serial communication""" 2 3import asyncio 4import contextlib 5import logging 6import sys 7 8from hat import aio 9from hat import util 10 11from hat.drivers.serial import common 12 13from hat.drivers.serial import _native_serial 14 15 16if sys.platform == 'win32': 17 raise ImportError('WIP') 18 19 20mlog: logging.Logger = logging.getLogger(__name__) 21"""Module logger""" 22 23 24async def create(port, *, 25 baudrate=9600, 26 bytesize=common.ByteSize.EIGHTBITS, 27 parity=common.Parity.NONE, 28 stopbits=common.StopBits.ONE, 29 xonxoff=False, 30 rtscts=False, 31 dsrdtr=False, 32 silent_interval=0): 33 endpoint = Endpoint() 34 endpoint._port = port 35 endpoint._silent_interval = silent_interval 36 endpoint._loop = asyncio.get_running_loop() 37 endpoint._input_buffer = util.BytesBuffer() 38 endpoint._input_cv = asyncio.Condition() 39 endpoint._write_queue = aio.Queue() 40 41 endpoint._serial = _native_serial.Serial(in_buff_size=0xFFFF, 42 out_buff_size=0xFFFF) 43 44 endpoint._close_cb_future = endpoint._loop.create_future() 45 close_cb = endpoint._create_serial_cb(endpoint._close_cb_future) 46 endpoint._serial.set_close_cb(close_cb) 47 48 endpoint._serial.open( 49 port=port, 50 baudrate=baudrate, 51 bytesize=bytesize.value, 52 parity=parity.value, 53 stopbits=(2 if stopbits == common.StopBits.ONE_POINT_FIVE 54 else stopbits.value), 55 xonxoff=xonxoff, 56 rtscts=rtscts, 57 dsrdtr=dsrdtr) 58 59 endpoint._async_group = aio.Group() 60 endpoint._async_group.spawn(aio.call_on_done, 61 asyncio.shield(endpoint._close_cb_future), 62 endpoint.close) 63 endpoint._async_group.spawn(aio.call_on_cancel, endpoint._on_close) 64 endpoint._async_group.spawn(endpoint._read_loop) 65 endpoint._async_group.spawn(endpoint._write_loop) 66 67 return endpoint 68 69 70class Endpoint(common.Endpoint): 71 72 @property 73 def async_group(self): 74 return self._async_group 75 76 @property 77 def port(self): 78 return self._port 79 80 async def read(self, size): 81 async with self._input_cv: 82 while len(self._input_buffer) < size: 83 if not self.is_open: 84 raise ConnectionError() 85 86 await self._input_cv.wait() 87 88 return self._input_buffer.read(size) 89 90 async def write(self, data): 91 future = self._loop.create_future() 92 try: 93 self._write_queue.put_nowait((data, future)) 94 await future 95 96 except aio.QueueClosedError: 97 raise ConnectionError() 98 99 async def drain(self): 100 future = self._loop.create_future() 101 try: 102 self._write_queue.put_nowait((None, future)) 103 await future 104 105 except aio.QueueClosedError: 106 raise ConnectionError() 107 108 async def reset_input_buffer(self): 109 async with self._input_cv: 110 return self._input_buffer.clear() 111 112 async def _on_close(self): 113 self._serial.close() 114 115 await self._close_cb_future 116 117 async with self._input_cv: 118 self._input_cv.notify_all() 119 120 self._serial.set_close_cb(None) 121 122 async def _read_loop(self): 123 try: 124 while True: 125 with self._create_in_change_future() as change_future: 126 127 data = self._serial.read() 128 129 if not data: 130 await change_future 131 continue 132 133 async with self._input_cv: 134 self._input_buffer.add(data) 135 self._input_cv.notify_all() 136 137 except Exception as e: 138 mlog.warning('read loop error: %s', e, exc_info=e) 139 140 finally: 141 self.close() 142 143 async def _write_loop(self): 144 future = None 145 try: 146 while True: 147 data, future = await self._write_queue.get() 148 149 if data is None: 150 with self._create_drain_future() as drain_future: 151 self._serial.drain() 152 153 await drain_future 154 155 else: 156 data = memoryview(data) 157 while data: 158 with self._create_out_change_future() as change_future: 159 result = self._serial.write(bytes(data)) 160 if result < 0: 161 raise Exception('write error') 162 163 data = data[result:] 164 165 if data: 166 await change_future 167 168 if not future.done(): 169 future.set_result(None) 170 171 await asyncio.sleep(self._silent_interval) 172 173 except Exception as e: 174 mlog.warning('write loop error: %s', e, exc_info=e) 175 176 finally: 177 self.close() 178 self._write_queue.close() 179 180 while True: 181 if future and not future.done(): 182 future.set_exception(ConnectionError()) 183 if self._write_queue.empty(): 184 break 185 _, future = self._write_queue.get_nowait() 186 187 @contextlib.contextmanager 188 def _create_in_change_future(self): 189 with self._create_serial_future(self._serial.set_in_change_cb) as f: 190 yield f 191 192 @contextlib.contextmanager 193 def _create_out_change_future(self): 194 with self._create_serial_future(self._serial.set_out_change_cb) as f: 195 yield f 196 197 @contextlib.contextmanager 198 def _create_drain_future(self): 199 with self._create_serial_future(self._serial.set_drain_cb) as f: 200 yield f 201 202 @contextlib.contextmanager 203 def _create_serial_future(self, serial_set_cb): 204 future = self._loop.create_future() 205 cb = self._create_serial_cb(future) 206 serial_set_cb(cb) 207 208 try: 209 yield future 210 211 finally: 212 serial_set_cb(None) 213 214 def _create_serial_cb(self, future): 215 216 # use wrapper to keep reference to self 217 def wrapper(): 218 self._loop.call_soon_threadsafe(_try_set_result, future, None) 219 220 return wrapper 221 222 223def _try_set_result(future, result): 224 if not future.done(): 225 future.set_result(result)
Module logger
async def
create( port, *, baudrate=9600, bytesize=<ByteSize.EIGHTBITS: 8>, parity=<Parity.NONE: 'N'>, stopbits=<StopBits.ONE: 1>, xonxoff=False, rtscts=False, dsrdtr=False, silent_interval=0):
25async def create(port, *, 26 baudrate=9600, 27 bytesize=common.ByteSize.EIGHTBITS, 28 parity=common.Parity.NONE, 29 stopbits=common.StopBits.ONE, 30 xonxoff=False, 31 rtscts=False, 32 dsrdtr=False, 33 silent_interval=0): 34 endpoint = Endpoint() 35 endpoint._port = port 36 endpoint._silent_interval = silent_interval 37 endpoint._loop = asyncio.get_running_loop() 38 endpoint._input_buffer = util.BytesBuffer() 39 endpoint._input_cv = asyncio.Condition() 40 endpoint._write_queue = aio.Queue() 41 42 endpoint._serial = _native_serial.Serial(in_buff_size=0xFFFF, 43 out_buff_size=0xFFFF) 44 45 endpoint._close_cb_future = endpoint._loop.create_future() 46 close_cb = endpoint._create_serial_cb(endpoint._close_cb_future) 47 endpoint._serial.set_close_cb(close_cb) 48 49 endpoint._serial.open( 50 port=port, 51 baudrate=baudrate, 52 bytesize=bytesize.value, 53 parity=parity.value, 54 stopbits=(2 if stopbits == common.StopBits.ONE_POINT_FIVE 55 else stopbits.value), 56 xonxoff=xonxoff, 57 rtscts=rtscts, 58 dsrdtr=dsrdtr) 59 60 endpoint._async_group = aio.Group() 61 endpoint._async_group.spawn(aio.call_on_done, 62 asyncio.shield(endpoint._close_cb_future), 63 endpoint.close) 64 endpoint._async_group.spawn(aio.call_on_cancel, endpoint._on_close) 65 endpoint._async_group.spawn(endpoint._read_loop) 66 endpoint._async_group.spawn(endpoint._write_loop) 67 68 return endpoint
class
Endpoint(hat.drivers.serial.common.Endpoint):
71class Endpoint(common.Endpoint): 72 73 @property 74 def async_group(self): 75 return self._async_group 76 77 @property 78 def port(self): 79 return self._port 80 81 async def read(self, size): 82 async with self._input_cv: 83 while len(self._input_buffer) < size: 84 if not self.is_open: 85 raise ConnectionError() 86 87 await self._input_cv.wait() 88 89 return self._input_buffer.read(size) 90 91 async def write(self, data): 92 future = self._loop.create_future() 93 try: 94 self._write_queue.put_nowait((data, future)) 95 await future 96 97 except aio.QueueClosedError: 98 raise ConnectionError() 99 100 async def drain(self): 101 future = self._loop.create_future() 102 try: 103 self._write_queue.put_nowait((None, future)) 104 await future 105 106 except aio.QueueClosedError: 107 raise ConnectionError() 108 109 async def reset_input_buffer(self): 110 async with self._input_cv: 111 return self._input_buffer.clear() 112 113 async def _on_close(self): 114 self._serial.close() 115 116 await self._close_cb_future 117 118 async with self._input_cv: 119 self._input_cv.notify_all() 120 121 self._serial.set_close_cb(None) 122 123 async def _read_loop(self): 124 try: 125 while True: 126 with self._create_in_change_future() as change_future: 127 128 data = self._serial.read() 129 130 if not data: 131 await change_future 132 continue 133 134 async with self._input_cv: 135 self._input_buffer.add(data) 136 self._input_cv.notify_all() 137 138 except Exception as e: 139 mlog.warning('read loop error: %s', e, exc_info=e) 140 141 finally: 142 self.close() 143 144 async def _write_loop(self): 145 future = None 146 try: 147 while True: 148 data, future = await self._write_queue.get() 149 150 if data is None: 151 with self._create_drain_future() as drain_future: 152 self._serial.drain() 153 154 await drain_future 155 156 else: 157 data = memoryview(data) 158 while data: 159 with self._create_out_change_future() as change_future: 160 result = self._serial.write(bytes(data)) 161 if result < 0: 162 raise Exception('write error') 163 164 data = data[result:] 165 166 if data: 167 await change_future 168 169 if not future.done(): 170 future.set_result(None) 171 172 await asyncio.sleep(self._silent_interval) 173 174 except Exception as e: 175 mlog.warning('write loop error: %s', e, exc_info=e) 176 177 finally: 178 self.close() 179 self._write_queue.close() 180 181 while True: 182 if future and not future.done(): 183 future.set_exception(ConnectionError()) 184 if self._write_queue.empty(): 185 break 186 _, future = self._write_queue.get_nowait() 187 188 @contextlib.contextmanager 189 def _create_in_change_future(self): 190 with self._create_serial_future(self._serial.set_in_change_cb) as f: 191 yield f 192 193 @contextlib.contextmanager 194 def _create_out_change_future(self): 195 with self._create_serial_future(self._serial.set_out_change_cb) as f: 196 yield f 197 198 @contextlib.contextmanager 199 def _create_drain_future(self): 200 with self._create_serial_future(self._serial.set_drain_cb) as f: 201 yield f 202 203 @contextlib.contextmanager 204 def _create_serial_future(self, serial_set_cb): 205 future = self._loop.create_future() 206 cb = self._create_serial_cb(future) 207 serial_set_cb(cb) 208 209 try: 210 yield future 211 212 finally: 213 serial_set_cb(None) 214 215 def _create_serial_cb(self, future): 216 217 # use wrapper to keep reference to self 218 def wrapper(): 219 self._loop.call_soon_threadsafe(_try_set_result, future, None) 220 221 return wrapper
Serial endpoint
async def
read(self, size):
81 async def read(self, size): 82 async with self._input_cv: 83 while len(self._input_buffer) < size: 84 if not self.is_open: 85 raise ConnectionError() 86 87 await self._input_cv.wait() 88 89 return self._input_buffer.read(size)
Read
Arguments:
- size: number of bytes to read
Raises:
- ConnectionError
async def
write(self, data):
91 async def write(self, data): 92 future = self._loop.create_future() 93 try: 94 self._write_queue.put_nowait((data, future)) 95 await future 96 97 except aio.QueueClosedError: 98 raise ConnectionError()
Write
Raises:
- ConnectionError