hat.drivers.serial.py_serial
Implementation based on PySerial
implementation is based on read with timeout for periodical checking if connection is closed by user - better way of canceling active read operation is needed
1"""Implementation based on PySerial 2 3.. warning:: 4 5 implementation is based on read with timeout for periodical checking 6 if connection is closed by user - better way of canceling active read 7 operation is needed 8 9""" 10 11import asyncio 12import logging 13 14import serial 15 16from hat import aio 17 18from hat.drivers.serial import common 19 20 21mlog: logging.Logger = logging.getLogger(__name__) 22"""Module logger""" 23 24read_timeout: float = 0.5 25"""Read timeout""" 26 27 28async def create(port, *, 29 baudrate=9600, 30 bytesize=common.ByteSize.EIGHTBITS, 31 parity=common.Parity.NONE, 32 stopbits=common.StopBits.ONE, 33 xonxoff=False, 34 rtscts=False, 35 dsrdtr=False, 36 silent_interval=0): 37 endpoint = Endpoint() 38 endpoint._port = port 39 endpoint._silent_interval = silent_interval 40 endpoint._input_buffer = bytearray() 41 endpoint._input_cv = asyncio.Condition() 42 endpoint._write_queue = aio.Queue() 43 endpoint._executor = aio.Executor(log_exceptions=False) 44 45 endpoint._serial = await endpoint._executor.spawn( 46 serial.Serial, 47 port=port, 48 baudrate=baudrate, 49 bytesize=bytesize.value, 50 parity=parity.value, 51 stopbits=stopbits.value, 52 xonxoff=xonxoff, 53 rtscts=rtscts, 54 dsrdtr=dsrdtr, 55 timeout=read_timeout) 56 57 endpoint._async_group = aio.Group() 58 endpoint._async_group.spawn(aio.call_on_cancel, endpoint._on_close) 59 endpoint._async_group.spawn(endpoint._read_loop) 60 endpoint._async_group.spawn(endpoint._write_loop) 61 62 return endpoint 63 64 65class Endpoint(common.Endpoint): 66 67 @property 68 def async_group(self): 69 return self._async_group 70 71 @property 72 def port(self): 73 return self._port 74 75 async def read(self, size): 76 async with self._input_cv: 77 while len(self._input_buffer) < size: 78 if not self.is_open: 79 raise ConnectionError() 80 await self._input_cv.wait() 81 82 if size < 1: 83 return b'' 84 85 buffer = memoryview(self._input_buffer) 86 data, self._input_buffer = buffer[:size], bytearray(buffer[size:]) 87 return data 88 89 async def write(self, data): 90 future = asyncio.Future() 91 try: 92 self._write_queue.put_nowait((data, future)) 93 await future 94 95 except aio.QueueClosedError: 96 raise ConnectionError() 97 98 async def drain(self): 99 future = asyncio.Future() 100 try: 101 self._write_queue.put_nowait((None, future)) 102 await future 103 104 except aio.QueueClosedError: 105 raise ConnectionError() 106 107 async def reset_input_buffer(self): 108 async with self._input_cv: 109 count = len(self._input_buffer) 110 self._input_buffer = bytearray() 111 return count 112 113 async def _on_close(self): 114 await self._executor.spawn(self._ext_close) 115 116 async with self._input_cv: 117 self._input_cv.notify_all() 118 119 await self._executor.async_close() 120 121 async def _read_loop(self): 122 try: 123 while True: 124 data_head = await self._executor.spawn(self._ext_read, 1) 125 if not data_head: 126 continue 127 128 data_rest = await self._executor.spawn(self._ext_read, -1) 129 130 async with self._input_cv: 131 self._input_buffer.extend(data_head) 132 if data_rest: 133 self._input_buffer.extend(data_rest) 134 135 self._input_cv.notify_all() 136 137 except Exception as e: 138 mlog.warning('read loop error: %s', e, exc_info=e) 139 140 finally: 141 self.close() 142 143 async def _write_loop(self): 144 future = None 145 try: 146 while True: 147 data, future = await self._write_queue.get() 148 149 if data is not None: 150 await self._executor.spawn(self._ext_write, data) 151 152 else: 153 await self._executor.spawn(self._ext_flush) 154 155 if not future.done(): 156 future.set_result(None) 157 158 await asyncio.sleep(self._silent_interval) 159 160 except Exception as e: 161 mlog.warning('write loop error: %s', e, exc_info=e) 162 163 finally: 164 self.close() 165 self._write_queue.close() 166 167 while True: 168 if future and not future.done(): 169 future.set_exception(ConnectionError()) 170 if self._write_queue.empty(): 171 break 172 _, future = self._write_queue.get_nowait() 173 174 def _ext_close(self): 175 self._serial.close() 176 177 def _ext_read(self, n=-1): 178 if n < 0: 179 n = self._serial.in_waiting 180 181 if n < 1: 182 return b'' 183 184 return self._serial.read(n) 185 186 def _ext_write(self, data): 187 self._serial.write(data) 188 189 def _ext_flush(self): 190 self._serial.flush()
Module logger
read_timeout: float =
0.5
Read timeout
async def
create( port, *, baudrate=9600, bytesize=<ByteSize.EIGHTBITS: 8>, parity=<Parity.NONE: 'N'>, stopbits=<StopBits.ONE: 1>, xonxoff=False, rtscts=False, dsrdtr=False, silent_interval=0):
29async def create(port, *, 30 baudrate=9600, 31 bytesize=common.ByteSize.EIGHTBITS, 32 parity=common.Parity.NONE, 33 stopbits=common.StopBits.ONE, 34 xonxoff=False, 35 rtscts=False, 36 dsrdtr=False, 37 silent_interval=0): 38 endpoint = Endpoint() 39 endpoint._port = port 40 endpoint._silent_interval = silent_interval 41 endpoint._input_buffer = bytearray() 42 endpoint._input_cv = asyncio.Condition() 43 endpoint._write_queue = aio.Queue() 44 endpoint._executor = aio.Executor(log_exceptions=False) 45 46 endpoint._serial = await endpoint._executor.spawn( 47 serial.Serial, 48 port=port, 49 baudrate=baudrate, 50 bytesize=bytesize.value, 51 parity=parity.value, 52 stopbits=stopbits.value, 53 xonxoff=xonxoff, 54 rtscts=rtscts, 55 dsrdtr=dsrdtr, 56 timeout=read_timeout) 57 58 endpoint._async_group = aio.Group() 59 endpoint._async_group.spawn(aio.call_on_cancel, endpoint._on_close) 60 endpoint._async_group.spawn(endpoint._read_loop) 61 endpoint._async_group.spawn(endpoint._write_loop) 62 63 return endpoint
class
Endpoint(hat.drivers.serial.common.Endpoint):
66class Endpoint(common.Endpoint): 67 68 @property 69 def async_group(self): 70 return self._async_group 71 72 @property 73 def port(self): 74 return self._port 75 76 async def read(self, size): 77 async with self._input_cv: 78 while len(self._input_buffer) < size: 79 if not self.is_open: 80 raise ConnectionError() 81 await self._input_cv.wait() 82 83 if size < 1: 84 return b'' 85 86 buffer = memoryview(self._input_buffer) 87 data, self._input_buffer = buffer[:size], bytearray(buffer[size:]) 88 return data 89 90 async def write(self, data): 91 future = asyncio.Future() 92 try: 93 self._write_queue.put_nowait((data, future)) 94 await future 95 96 except aio.QueueClosedError: 97 raise ConnectionError() 98 99 async def drain(self): 100 future = asyncio.Future() 101 try: 102 self._write_queue.put_nowait((None, future)) 103 await future 104 105 except aio.QueueClosedError: 106 raise ConnectionError() 107 108 async def reset_input_buffer(self): 109 async with self._input_cv: 110 count = len(self._input_buffer) 111 self._input_buffer = bytearray() 112 return count 113 114 async def _on_close(self): 115 await self._executor.spawn(self._ext_close) 116 117 async with self._input_cv: 118 self._input_cv.notify_all() 119 120 await self._executor.async_close() 121 122 async def _read_loop(self): 123 try: 124 while True: 125 data_head = await self._executor.spawn(self._ext_read, 1) 126 if not data_head: 127 continue 128 129 data_rest = await self._executor.spawn(self._ext_read, -1) 130 131 async with self._input_cv: 132 self._input_buffer.extend(data_head) 133 if data_rest: 134 self._input_buffer.extend(data_rest) 135 136 self._input_cv.notify_all() 137 138 except Exception as e: 139 mlog.warning('read loop error: %s', e, exc_info=e) 140 141 finally: 142 self.close() 143 144 async def _write_loop(self): 145 future = None 146 try: 147 while True: 148 data, future = await self._write_queue.get() 149 150 if data is not None: 151 await self._executor.spawn(self._ext_write, data) 152 153 else: 154 await self._executor.spawn(self._ext_flush) 155 156 if not future.done(): 157 future.set_result(None) 158 159 await asyncio.sleep(self._silent_interval) 160 161 except Exception as e: 162 mlog.warning('write loop error: %s', e, exc_info=e) 163 164 finally: 165 self.close() 166 self._write_queue.close() 167 168 while True: 169 if future and not future.done(): 170 future.set_exception(ConnectionError()) 171 if self._write_queue.empty(): 172 break 173 _, future = self._write_queue.get_nowait() 174 175 def _ext_close(self): 176 self._serial.close() 177 178 def _ext_read(self, n=-1): 179 if n < 0: 180 n = self._serial.in_waiting 181 182 if n < 1: 183 return b'' 184 185 return self._serial.read(n) 186 187 def _ext_write(self, data): 188 self._serial.write(data) 189 190 def _ext_flush(self): 191 self._serial.flush()
Serial endpoint
async def
read(self, size):
76 async def read(self, size): 77 async with self._input_cv: 78 while len(self._input_buffer) < size: 79 if not self.is_open: 80 raise ConnectionError() 81 await self._input_cv.wait() 82 83 if size < 1: 84 return b'' 85 86 buffer = memoryview(self._input_buffer) 87 data, self._input_buffer = buffer[:size], bytearray(buffer[size:]) 88 return data
Read
Arguments:
- size: number of bytes to read
Raises:
- ConnectionError
async def
write(self, data):
90 async def write(self, data): 91 future = asyncio.Future() 92 try: 93 self._write_queue.put_nowait((data, future)) 94 await future 95 96 except aio.QueueClosedError: 97 raise ConnectionError()
Write
Raises:
- ConnectionError
async def
drain(self):
99 async def drain(self): 100 future = asyncio.Future() 101 try: 102 self._write_queue.put_nowait((None, future)) 103 await future 104 105 except aio.QueueClosedError: 106 raise ConnectionError()
Drain output buffer
Raises:
- ConnectionError
async def
reset_input_buffer(self):
108 async def reset_input_buffer(self): 109 async with self._input_cv: 110 count = len(self._input_buffer) 111 self._input_buffer = bytearray() 112 return count
Reset input buffer
Returns number of bytes available in buffer immediately before buffer was cleared.
Raises:
- ConnectionError