hat.drivers.serial.py_serial
Implementation based on PySerial
implementation is based on read with timeout for periodical checking if connection is closed by user - better way of canceling active read operation is needed
1"""Implementation based on PySerial 2 3.. warning:: 4 5 implementation is based on read with timeout for periodical checking 6 if connection is closed by user - better way of canceling active read 7 operation is needed 8 9""" 10 11import asyncio 12import logging 13 14import serial 15 16from hat import aio 17 18from hat.drivers.serial import common 19 20 21mlog: logging.Logger = logging.getLogger(__name__) 22"""Module logger""" 23 24read_timeout: float = 0.5 25"""Read timeout""" 26 27 28async def create(port, 29 *, 30 name=None, 31 baudrate=9600, 32 bytesize=common.ByteSize.EIGHTBITS, 33 parity=common.Parity.NONE, 34 stopbits=common.StopBits.ONE, 35 xonxoff=False, 36 rtscts=False, 37 dsrdtr=False, 38 silent_interval=0): 39 endpoint = Endpoint() 40 endpoint._silent_interval = silent_interval 41 endpoint._input_buffer = bytearray() 42 endpoint._input_cv = asyncio.Condition() 43 endpoint._write_queue = aio.Queue() 44 endpoint._executor = aio.Executor(log_exceptions=False) 45 endpoint._info = common.EndpointInfo(name=name, 46 port=port) 47 endpoint._log = common.create_logger(mlog, endpoint._info) 48 endpoint._comm_log = common.CommunicationLogger(mlog, endpoint._info) 49 50 endpoint._serial = await endpoint._executor.spawn( 51 serial.Serial, 52 port=port, 53 baudrate=baudrate, 54 bytesize=bytesize.value, 55 parity=parity.value, 56 stopbits=stopbits.value, 57 xonxoff=xonxoff, 58 rtscts=rtscts, 59 dsrdtr=dsrdtr, 60 timeout=read_timeout) 61 62 endpoint._comm_log.log(common.CommLogAction.OPEN) 63 64 endpoint._async_group = aio.Group() 65 endpoint._async_group.spawn(aio.call_on_cancel, endpoint._on_close) 66 endpoint._async_group.spawn(endpoint._read_loop) 67 endpoint._async_group.spawn(endpoint._write_loop) 68 69 return endpoint 70 71 72class Endpoint(common.Endpoint): 73 74 @property 75 def async_group(self): 76 return self._async_group 77 78 @property 79 def info(self): 80 return self._info 81 82 async def read(self, size): 83 async with self._input_cv: 84 while len(self._input_buffer) < size: 85 if not self.is_open: 86 raise ConnectionError() 87 await self._input_cv.wait() 88 89 if size < 1: 90 return b'' 91 92 buffer = memoryview(self._input_buffer) 93 data, self._input_buffer = buffer[:size], bytearray(buffer[size:]) 94 return data 95 96 async def write(self, data): 97 future = asyncio.Future() 98 try: 99 self._write_queue.put_nowait((data, future)) 100 await future 101 102 except aio.QueueClosedError: 103 raise ConnectionError() 104 105 async def drain(self): 106 future = asyncio.Future() 107 try: 108 self._write_queue.put_nowait((None, future)) 109 await future 110 111 except aio.QueueClosedError: 112 raise ConnectionError() 113 114 async def clear_input_buffer(self): 115 async with self._input_cv: 116 count = len(self._input_buffer) 117 self._input_buffer = bytearray() 118 return count 119 120 async def _on_close(self): 121 await self._executor.spawn(self._ext_close) 122 123 async with self._input_cv: 124 self._input_cv.notify_all() 125 126 await self._executor.async_close() 127 128 self._comm_log.log(common.CommLogAction.CLOSE) 129 130 async def _read_loop(self): 131 try: 132 while True: 133 data_head = await self._executor.spawn(self._ext_read, 1) 134 if not data_head: 135 continue 136 137 data_rest = await self._executor.spawn(self._ext_read, -1) 138 139 self._comm_log.log(common.CommLogAction.RECEIVE, data_head, 140 data_rest) 141 142 async with self._input_cv: 143 self._input_buffer.extend(data_head) 144 if data_rest: 145 self._input_buffer.extend(data_rest) 146 147 self._input_cv.notify_all() 148 149 except Exception as e: 150 self._log.warning('read loop error: %s', e, exc_info=e) 151 152 finally: 153 self.close() 154 155 async def _write_loop(self): 156 future = None 157 try: 158 while True: 159 data, future = await self._write_queue.get() 160 161 if data is not None: 162 self._comm_log.log(common.CommLogAction.SEND, data) 163 164 await self._executor.spawn(self._ext_write, data) 165 166 else: 167 await self._executor.spawn(self._ext_flush) 168 169 if not future.done(): 170 future.set_result(None) 171 172 await asyncio.sleep(self._silent_interval) 173 174 except Exception as e: 175 self._log.warning('write loop error: %s', e, exc_info=e) 176 177 finally: 178 self.close() 179 self._write_queue.close() 180 181 while True: 182 if future and not future.done(): 183 future.set_exception(ConnectionError()) 184 if self._write_queue.empty(): 185 break 186 _, future = self._write_queue.get_nowait() 187 188 def _ext_close(self): 189 self._serial.close() 190 191 def _ext_read(self, n=-1): 192 if n < 0: 193 n = self._serial.in_waiting 194 195 if n < 1: 196 return b'' 197 198 return self._serial.read(n) 199 200 def _ext_write(self, data): 201 self._serial.write(data) 202 203 def _ext_flush(self): 204 self._serial.flush()
Module logger
read_timeout: float =
0.5
Read timeout
async def
create( port, *, name=None, baudrate=9600, bytesize=<ByteSize.EIGHTBITS: 8>, parity=<Parity.NONE: 'N'>, stopbits=<StopBits.ONE: 1>, xonxoff=False, rtscts=False, dsrdtr=False, silent_interval=0):
29async def create(port, 30 *, 31 name=None, 32 baudrate=9600, 33 bytesize=common.ByteSize.EIGHTBITS, 34 parity=common.Parity.NONE, 35 stopbits=common.StopBits.ONE, 36 xonxoff=False, 37 rtscts=False, 38 dsrdtr=False, 39 silent_interval=0): 40 endpoint = Endpoint() 41 endpoint._silent_interval = silent_interval 42 endpoint._input_buffer = bytearray() 43 endpoint._input_cv = asyncio.Condition() 44 endpoint._write_queue = aio.Queue() 45 endpoint._executor = aio.Executor(log_exceptions=False) 46 endpoint._info = common.EndpointInfo(name=name, 47 port=port) 48 endpoint._log = common.create_logger(mlog, endpoint._info) 49 endpoint._comm_log = common.CommunicationLogger(mlog, endpoint._info) 50 51 endpoint._serial = await endpoint._executor.spawn( 52 serial.Serial, 53 port=port, 54 baudrate=baudrate, 55 bytesize=bytesize.value, 56 parity=parity.value, 57 stopbits=stopbits.value, 58 xonxoff=xonxoff, 59 rtscts=rtscts, 60 dsrdtr=dsrdtr, 61 timeout=read_timeout) 62 63 endpoint._comm_log.log(common.CommLogAction.OPEN) 64 65 endpoint._async_group = aio.Group() 66 endpoint._async_group.spawn(aio.call_on_cancel, endpoint._on_close) 67 endpoint._async_group.spawn(endpoint._read_loop) 68 endpoint._async_group.spawn(endpoint._write_loop) 69 70 return endpoint
class
Endpoint(hat.drivers.serial.common.Endpoint):
73class Endpoint(common.Endpoint): 74 75 @property 76 def async_group(self): 77 return self._async_group 78 79 @property 80 def info(self): 81 return self._info 82 83 async def read(self, size): 84 async with self._input_cv: 85 while len(self._input_buffer) < size: 86 if not self.is_open: 87 raise ConnectionError() 88 await self._input_cv.wait() 89 90 if size < 1: 91 return b'' 92 93 buffer = memoryview(self._input_buffer) 94 data, self._input_buffer = buffer[:size], bytearray(buffer[size:]) 95 return data 96 97 async def write(self, data): 98 future = asyncio.Future() 99 try: 100 self._write_queue.put_nowait((data, future)) 101 await future 102 103 except aio.QueueClosedError: 104 raise ConnectionError() 105 106 async def drain(self): 107 future = asyncio.Future() 108 try: 109 self._write_queue.put_nowait((None, future)) 110 await future 111 112 except aio.QueueClosedError: 113 raise ConnectionError() 114 115 async def clear_input_buffer(self): 116 async with self._input_cv: 117 count = len(self._input_buffer) 118 self._input_buffer = bytearray() 119 return count 120 121 async def _on_close(self): 122 await self._executor.spawn(self._ext_close) 123 124 async with self._input_cv: 125 self._input_cv.notify_all() 126 127 await self._executor.async_close() 128 129 self._comm_log.log(common.CommLogAction.CLOSE) 130 131 async def _read_loop(self): 132 try: 133 while True: 134 data_head = await self._executor.spawn(self._ext_read, 1) 135 if not data_head: 136 continue 137 138 data_rest = await self._executor.spawn(self._ext_read, -1) 139 140 self._comm_log.log(common.CommLogAction.RECEIVE, data_head, 141 data_rest) 142 143 async with self._input_cv: 144 self._input_buffer.extend(data_head) 145 if data_rest: 146 self._input_buffer.extend(data_rest) 147 148 self._input_cv.notify_all() 149 150 except Exception as e: 151 self._log.warning('read loop error: %s', e, exc_info=e) 152 153 finally: 154 self.close() 155 156 async def _write_loop(self): 157 future = None 158 try: 159 while True: 160 data, future = await self._write_queue.get() 161 162 if data is not None: 163 self._comm_log.log(common.CommLogAction.SEND, data) 164 165 await self._executor.spawn(self._ext_write, data) 166 167 else: 168 await self._executor.spawn(self._ext_flush) 169 170 if not future.done(): 171 future.set_result(None) 172 173 await asyncio.sleep(self._silent_interval) 174 175 except Exception as e: 176 self._log.warning('write loop error: %s', e, exc_info=e) 177 178 finally: 179 self.close() 180 self._write_queue.close() 181 182 while True: 183 if future and not future.done(): 184 future.set_exception(ConnectionError()) 185 if self._write_queue.empty(): 186 break 187 _, future = self._write_queue.get_nowait() 188 189 def _ext_close(self): 190 self._serial.close() 191 192 def _ext_read(self, n=-1): 193 if n < 0: 194 n = self._serial.in_waiting 195 196 if n < 1: 197 return b'' 198 199 return self._serial.read(n) 200 201 def _ext_write(self, data): 202 self._serial.write(data) 203 204 def _ext_flush(self): 205 self._serial.flush()
Serial endpoint
async def
read(self, size):
83 async def read(self, size): 84 async with self._input_cv: 85 while len(self._input_buffer) < size: 86 if not self.is_open: 87 raise ConnectionError() 88 await self._input_cv.wait() 89 90 if size < 1: 91 return b'' 92 93 buffer = memoryview(self._input_buffer) 94 data, self._input_buffer = buffer[:size], bytearray(buffer[size:]) 95 return data
Read
Arguments:
- size: number of bytes to read
Raises:
- ConnectionError
async def
write(self, data):
97 async def write(self, data): 98 future = asyncio.Future() 99 try: 100 self._write_queue.put_nowait((data, future)) 101 await future 102 103 except aio.QueueClosedError: 104 raise ConnectionError()
Write
Raises:
- ConnectionError
async def
drain(self):
106 async def drain(self): 107 future = asyncio.Future() 108 try: 109 self._write_queue.put_nowait((None, future)) 110 await future 111 112 except aio.QueueClosedError: 113 raise ConnectionError()
Drain output buffer
Raises:
- ConnectionError
async def
clear_input_buffer(self):
115 async def clear_input_buffer(self): 116 async with self._input_cv: 117 count = len(self._input_buffer) 118 self._input_buffer = bytearray() 119 return count
Reset input buffer
Returns number of bytes available in buffer immediately before buffer was cleared.
Raises:
- ConnectionError