hat.drivers.serial.native_serial
Implementation based on native serial communication
1"""Implementation based on native serial communication""" 2 3import asyncio 4import contextlib 5import logging 6import sys 7 8from hat import aio 9from hat import util 10 11from hat.drivers.serial import common 12 13from hat.drivers.serial import _native_serial 14 15 16if sys.platform == 'win32': 17 raise ImportError('WIP') 18 19 20mlog: logging.Logger = logging.getLogger(__name__) 21"""Module logger""" 22 23 24async def create(port, 25 *, 26 name=None, 27 baudrate=9600, 28 bytesize=common.ByteSize.EIGHTBITS, 29 parity=common.Parity.NONE, 30 stopbits=common.StopBits.ONE, 31 xonxoff=False, 32 rtscts=False, 33 dsrdtr=False, 34 silent_interval=0): 35 endpoint = Endpoint() 36 endpoint._silent_interval = silent_interval 37 endpoint._loop = asyncio.get_running_loop() 38 endpoint._input_buffer = util.BytesBuffer() 39 endpoint._input_cv = asyncio.Condition() 40 endpoint._write_queue = aio.Queue() 41 endpoint._info = common.EndpointInfo(name=name, 42 port=port) 43 endpoint._log = common.create_logger(mlog, endpoint._info) 44 endpoint._comm_log = common.CommunicationLogger(mlog, endpoint._info) 45 46 endpoint._serial = _native_serial.Serial(in_buff_size=0xFFFF, 47 out_buff_size=0xFFFF) 48 49 endpoint._close_cb_future = endpoint._loop.create_future() 50 close_cb = endpoint._create_serial_cb(endpoint._close_cb_future) 51 endpoint._serial.set_close_cb(close_cb) 52 53 endpoint._serial.open( 54 port=port, 55 baudrate=baudrate, 56 bytesize=bytesize.value, 57 parity=parity.value, 58 stopbits=(2 if stopbits == common.StopBits.ONE_POINT_FIVE 59 else stopbits.value), 60 xonxoff=xonxoff, 61 rtscts=rtscts, 62 dsrdtr=dsrdtr) 63 64 endpoint._comm_log.log(common.CommLogAction.OPEN) 65 66 endpoint._async_group = aio.Group() 67 endpoint._async_group.spawn(aio.call_on_done, 68 asyncio.shield(endpoint._close_cb_future), 69 endpoint.close) 70 endpoint._async_group.spawn(aio.call_on_cancel, endpoint._on_close) 71 endpoint._async_group.spawn(endpoint._read_loop) 72 endpoint._async_group.spawn(endpoint._write_loop) 73 74 return endpoint 75 76 77class Endpoint(common.Endpoint): 78 79 @property 80 def async_group(self): 81 return self._async_group 82 83 @property 84 def info(self): 85 return self._info 86 87 async def read(self, size): 88 async with self._input_cv: 89 while len(self._input_buffer) < size: 90 if not self.is_open: 91 raise ConnectionError() 92 93 await self._input_cv.wait() 94 95 return self._input_buffer.read(size) 96 97 async def write(self, data): 98 future = self._loop.create_future() 99 try: 100 self._write_queue.put_nowait((data, future)) 101 await future 102 103 except aio.QueueClosedError: 104 raise ConnectionError() 105 106 async def drain(self): 107 future = self._loop.create_future() 108 try: 109 self._write_queue.put_nowait((None, future)) 110 await future 111 112 except aio.QueueClosedError: 113 raise ConnectionError() 114 115 async def clear_input_buffer(self): 116 async with self._input_cv: 117 return self._input_buffer.clear() 118 119 async def _on_close(self): 120 self._serial.close() 121 122 await self._close_cb_future 123 124 async with self._input_cv: 125 self._input_cv.notify_all() 126 127 self._serial.set_close_cb(None) 128 129 self._comm_log.log(common.CommLogAction.CLOSE) 130 131 async def _read_loop(self): 132 try: 133 while True: 134 with self._create_in_change_future() as change_future: 135 136 data = self._serial.read() 137 138 if not data: 139 await change_future 140 continue 141 142 self._comm_log.log(common.CommLogAction.RECEIVE, data) 143 144 async with self._input_cv: 145 self._input_buffer.add(data) 146 self._input_cv.notify_all() 147 148 except Exception as e: 149 self._log.warning('read loop error: %s', e, exc_info=e) 150 151 finally: 152 self.close() 153 154 async def _write_loop(self): 155 future = None 156 try: 157 while True: 158 data, future = await self._write_queue.get() 159 160 if data is None: 161 with self._create_drain_future() as drain_future: 162 self._serial.drain() 163 164 await drain_future 165 166 else: 167 data = memoryview(data) 168 while data: 169 with self._create_out_change_future() as change_future: 170 result = self._serial.write(bytes(data)) 171 if result < 0: 172 raise Exception('write error') 173 174 self._comm_log.log(common.CommLogAction.SEND, 175 data[:result]) 176 177 data = data[result:] 178 179 if data: 180 await change_future 181 182 if not future.done(): 183 future.set_result(None) 184 185 await asyncio.sleep(self._silent_interval) 186 187 except Exception as e: 188 self._log.warning('write loop error: %s', e, exc_info=e) 189 190 finally: 191 self.close() 192 self._write_queue.close() 193 194 while True: 195 if future and not future.done(): 196 future.set_exception(ConnectionError()) 197 if self._write_queue.empty(): 198 break 199 _, future = self._write_queue.get_nowait() 200 201 @contextlib.contextmanager 202 def _create_in_change_future(self): 203 with self._create_serial_future(self._serial.set_in_change_cb) as f: 204 yield f 205 206 @contextlib.contextmanager 207 def _create_out_change_future(self): 208 with self._create_serial_future(self._serial.set_out_change_cb) as f: 209 yield f 210 211 @contextlib.contextmanager 212 def _create_drain_future(self): 213 with self._create_serial_future(self._serial.set_drain_cb) as f: 214 yield f 215 216 @contextlib.contextmanager 217 def _create_serial_future(self, serial_set_cb): 218 future = self._loop.create_future() 219 cb = self._create_serial_cb(future) 220 serial_set_cb(cb) 221 222 try: 223 yield future 224 225 finally: 226 serial_set_cb(None) 227 228 def _create_serial_cb(self, future): 229 230 # use wrapper to keep reference to self 231 def wrapper(): 232 self._loop.call_soon_threadsafe(_try_set_result, future, None) 233 234 return wrapper 235 236 237def _try_set_result(future, result): 238 if not future.done(): 239 future.set_result(result)
Module logger
async def
create( port, *, name=None, baudrate=9600, bytesize=<ByteSize.EIGHTBITS: 8>, parity=<Parity.NONE: 'N'>, stopbits=<StopBits.ONE: 1>, xonxoff=False, rtscts=False, dsrdtr=False, silent_interval=0):
25async def create(port, 26 *, 27 name=None, 28 baudrate=9600, 29 bytesize=common.ByteSize.EIGHTBITS, 30 parity=common.Parity.NONE, 31 stopbits=common.StopBits.ONE, 32 xonxoff=False, 33 rtscts=False, 34 dsrdtr=False, 35 silent_interval=0): 36 endpoint = Endpoint() 37 endpoint._silent_interval = silent_interval 38 endpoint._loop = asyncio.get_running_loop() 39 endpoint._input_buffer = util.BytesBuffer() 40 endpoint._input_cv = asyncio.Condition() 41 endpoint._write_queue = aio.Queue() 42 endpoint._info = common.EndpointInfo(name=name, 43 port=port) 44 endpoint._log = common.create_logger(mlog, endpoint._info) 45 endpoint._comm_log = common.CommunicationLogger(mlog, endpoint._info) 46 47 endpoint._serial = _native_serial.Serial(in_buff_size=0xFFFF, 48 out_buff_size=0xFFFF) 49 50 endpoint._close_cb_future = endpoint._loop.create_future() 51 close_cb = endpoint._create_serial_cb(endpoint._close_cb_future) 52 endpoint._serial.set_close_cb(close_cb) 53 54 endpoint._serial.open( 55 port=port, 56 baudrate=baudrate, 57 bytesize=bytesize.value, 58 parity=parity.value, 59 stopbits=(2 if stopbits == common.StopBits.ONE_POINT_FIVE 60 else stopbits.value), 61 xonxoff=xonxoff, 62 rtscts=rtscts, 63 dsrdtr=dsrdtr) 64 65 endpoint._comm_log.log(common.CommLogAction.OPEN) 66 67 endpoint._async_group = aio.Group() 68 endpoint._async_group.spawn(aio.call_on_done, 69 asyncio.shield(endpoint._close_cb_future), 70 endpoint.close) 71 endpoint._async_group.spawn(aio.call_on_cancel, endpoint._on_close) 72 endpoint._async_group.spawn(endpoint._read_loop) 73 endpoint._async_group.spawn(endpoint._write_loop) 74 75 return endpoint
class
Endpoint(hat.drivers.serial.common.Endpoint):
78class Endpoint(common.Endpoint): 79 80 @property 81 def async_group(self): 82 return self._async_group 83 84 @property 85 def info(self): 86 return self._info 87 88 async def read(self, size): 89 async with self._input_cv: 90 while len(self._input_buffer) < size: 91 if not self.is_open: 92 raise ConnectionError() 93 94 await self._input_cv.wait() 95 96 return self._input_buffer.read(size) 97 98 async def write(self, data): 99 future = self._loop.create_future() 100 try: 101 self._write_queue.put_nowait((data, future)) 102 await future 103 104 except aio.QueueClosedError: 105 raise ConnectionError() 106 107 async def drain(self): 108 future = self._loop.create_future() 109 try: 110 self._write_queue.put_nowait((None, future)) 111 await future 112 113 except aio.QueueClosedError: 114 raise ConnectionError() 115 116 async def clear_input_buffer(self): 117 async with self._input_cv: 118 return self._input_buffer.clear() 119 120 async def _on_close(self): 121 self._serial.close() 122 123 await self._close_cb_future 124 125 async with self._input_cv: 126 self._input_cv.notify_all() 127 128 self._serial.set_close_cb(None) 129 130 self._comm_log.log(common.CommLogAction.CLOSE) 131 132 async def _read_loop(self): 133 try: 134 while True: 135 with self._create_in_change_future() as change_future: 136 137 data = self._serial.read() 138 139 if not data: 140 await change_future 141 continue 142 143 self._comm_log.log(common.CommLogAction.RECEIVE, data) 144 145 async with self._input_cv: 146 self._input_buffer.add(data) 147 self._input_cv.notify_all() 148 149 except Exception as e: 150 self._log.warning('read loop error: %s', e, exc_info=e) 151 152 finally: 153 self.close() 154 155 async def _write_loop(self): 156 future = None 157 try: 158 while True: 159 data, future = await self._write_queue.get() 160 161 if data is None: 162 with self._create_drain_future() as drain_future: 163 self._serial.drain() 164 165 await drain_future 166 167 else: 168 data = memoryview(data) 169 while data: 170 with self._create_out_change_future() as change_future: 171 result = self._serial.write(bytes(data)) 172 if result < 0: 173 raise Exception('write error') 174 175 self._comm_log.log(common.CommLogAction.SEND, 176 data[:result]) 177 178 data = data[result:] 179 180 if data: 181 await change_future 182 183 if not future.done(): 184 future.set_result(None) 185 186 await asyncio.sleep(self._silent_interval) 187 188 except Exception as e: 189 self._log.warning('write loop error: %s', e, exc_info=e) 190 191 finally: 192 self.close() 193 self._write_queue.close() 194 195 while True: 196 if future and not future.done(): 197 future.set_exception(ConnectionError()) 198 if self._write_queue.empty(): 199 break 200 _, future = self._write_queue.get_nowait() 201 202 @contextlib.contextmanager 203 def _create_in_change_future(self): 204 with self._create_serial_future(self._serial.set_in_change_cb) as f: 205 yield f 206 207 @contextlib.contextmanager 208 def _create_out_change_future(self): 209 with self._create_serial_future(self._serial.set_out_change_cb) as f: 210 yield f 211 212 @contextlib.contextmanager 213 def _create_drain_future(self): 214 with self._create_serial_future(self._serial.set_drain_cb) as f: 215 yield f 216 217 @contextlib.contextmanager 218 def _create_serial_future(self, serial_set_cb): 219 future = self._loop.create_future() 220 cb = self._create_serial_cb(future) 221 serial_set_cb(cb) 222 223 try: 224 yield future 225 226 finally: 227 serial_set_cb(None) 228 229 def _create_serial_cb(self, future): 230 231 # use wrapper to keep reference to self 232 def wrapper(): 233 self._loop.call_soon_threadsafe(_try_set_result, future, None) 234 235 return wrapper
Serial endpoint
async def
read(self, size):
88 async def read(self, size): 89 async with self._input_cv: 90 while len(self._input_buffer) < size: 91 if not self.is_open: 92 raise ConnectionError() 93 94 await self._input_cv.wait() 95 96 return self._input_buffer.read(size)
Read
Arguments:
- size: number of bytes to read
Raises:
- ConnectionError
async def
write(self, data):
98 async def write(self, data): 99 future = self._loop.create_future() 100 try: 101 self._write_queue.put_nowait((data, future)) 102 await future 103 104 except aio.QueueClosedError: 105 raise ConnectionError()
Write
Raises:
- ConnectionError