hat.drivers.pnetgateway
1from hat.drivers.pnetgateway.common import (Status, 2 Quality, 3 Source, 4 DataType, 5 Data, 6 Change, 7 Command) 8from hat.drivers.pnetgateway.client import (StatusCb, 9 DataCb, 10 connect, 11 Connection) 12 13 14__all__ = ['Status', 15 'Quality', 16 'Source', 17 'DataType', 18 'Data', 19 'Change', 20 'Command', 21 'StatusCb', 22 'DataCb', 23 'connect', 24 'Connection']
class
Status(enum.Enum):
8class Status(enum.Enum): 9 DISCONNECTED = 'DISCONNECTED' 10 CONNECTING = 'CONNECTING' 11 CONNECTED = 'CONNECTED'
DISCONNECTED =
<Status.DISCONNECTED: 'DISCONNECTED'>
CONNECTING =
<Status.CONNECTING: 'CONNECTING'>
CONNECTED =
<Status.CONNECTED: 'CONNECTED'>
class
Quality(enum.Enum):
14class Quality(enum.Enum): 15 NONE = 'NONE' 16 GOOD = 'GOOD' 17 UNRELIABLE = 'UNRELIABLE' 18 BAD = 'BAD'
NONE =
<Quality.NONE: 'NONE'>
GOOD =
<Quality.GOOD: 'GOOD'>
UNRELIABLE =
<Quality.UNRELIABLE: 'UNRELIABLE'>
BAD =
<Quality.BAD: 'BAD'>
class
Source(enum.Enum):
21class Source(enum.Enum): 22 APPLICATION = 'APPLICATION' 23 CALCULATED = 'CALCULATED' 24 MANUAL_ENTRY = 'MANUAL_ENTRY' 25 NONSPECIFIC = 'NONSPECIFIC' 26 PROC_MOD = 'PROC_MOD' 27 REMOTE_SRC = 'REMOTE_SRC' 28 REMOTE_SRC_INVALID = 'REMOTE_SRC_INVALID' 29 SCHEDULED = 'SCHEDULED'
APPLICATION =
<Source.APPLICATION: 'APPLICATION'>
CALCULATED =
<Source.CALCULATED: 'CALCULATED'>
MANUAL_ENTRY =
<Source.MANUAL_ENTRY: 'MANUAL_ENTRY'>
NONSPECIFIC =
<Source.NONSPECIFIC: 'NONSPECIFIC'>
PROC_MOD =
<Source.PROC_MOD: 'PROC_MOD'>
REMOTE_SRC =
<Source.REMOTE_SRC: 'REMOTE_SRC'>
REMOTE_SRC_INVALID =
<Source.REMOTE_SRC_INVALID: 'REMOTE_SRC_INVALID'>
SCHEDULED =
<Source.SCHEDULED: 'SCHEDULED'>
class
DataType(enum.Enum):
32class DataType(enum.Enum): 33 BLOB = 'BLOB' 34 COMMAND = 'COMMAND' 35 COUNTER = 'COUNTER' 36 EVENT = 'EVENT' 37 GROUP = 'GROUP' 38 NUMERIC = 'NUMERIC' 39 STATES = 'STATES' 40 UNKNOWN = 'UNKNOWN'
BLOB =
<DataType.BLOB: 'BLOB'>
COMMAND =
<DataType.COMMAND: 'COMMAND'>
COUNTER =
<DataType.COUNTER: 'COUNTER'>
EVENT =
<DataType.EVENT: 'EVENT'>
GROUP =
<DataType.GROUP: 'GROUP'>
NUMERIC =
<DataType.NUMERIC: 'NUMERIC'>
STATES =
<DataType.STATES: 'STATES'>
UNKNOWN =
<DataType.UNKNOWN: 'UNKNOWN'>
class
Data(typing.NamedTuple):
43class Data(typing.NamedTuple): 44 key: str 45 value: json.Data 46 quality: Quality 47 timestamp: float 48 type: DataType 49 source: Source
Data(key, value, quality, timestamp, type, source)
class
Change(typing.NamedTuple):
52class Change(typing.NamedTuple): 53 key: str 54 value: json.Data | None 55 quality: Quality | None 56 timestamp: float | None 57 source: Source | None
Change(key, value, quality, timestamp, source)
class
Command(typing.NamedTuple):
Command(key, value)
StatusCb =
typing.Callable[[Status], None | collections.abc.Awaitable[None]]
DataCb =
typing.Callable[[list[Data]], None | collections.abc.Awaitable[None]]
async def
connect( addr: hat.drivers.tcp.Address, username: str, password: str, status_cb: Callable[[Status], None | Awaitable[None]], data_cb: Callable[[list[Data]], None | Awaitable[None]], subscriptions: list[str] | None = None, **kwargs) -> Connection:
25async def connect(addr: tcp.Address, 26 username: str, 27 password: str, 28 status_cb: StatusCb, 29 data_cb: DataCb, 30 subscriptions: list[str] | None = None, 31 **kwargs 32 ) -> 'Connection': 33 """Connect to PNET Gateway server 34 35 Additional arguments are passed directly to `hat.drivers.tcp.connect`. 36 37 Args: 38 address: PNET Gateway server address 39 username: user name 40 password: password 41 status_cb: status change callback 42 data_cb: data change callback 43 subscriptions: list of data keys for subscriptions 44 45 If subscription is ``None``, client is subscribed to all data changes 46 from the server. 47 48 """ 49 conn = Connection() 50 conn._pnet_status = common.Status.DISCONNECTED 51 conn._data = {} 52 conn._status_cb = status_cb 53 conn._data_cb = data_cb 54 conn._next_ids = itertools.count(0) 55 conn._id_futures = {} 56 57 conn._conn = transport.Transport(await tcp.connect(addr, **kwargs)) 58 59 conn._log = _create_logger(conn._conn.info) 60 61 try: 62 await conn._conn.send({'type': 'authentication_request', 63 'body': {'username': username, 64 'password': password, 65 'subscriptions': subscriptions}}) 66 67 msg = None 68 while msg is None or msg['type'] != 'authentication_response': 69 msg = await conn._conn.receive() 70 71 if not msg['body']['success']: 72 raise Exception('authentication failed') 73 74 conn._pnet_status = common.Status(msg['body']['status']) 75 for i in msg['body']['data']: 76 data = encoder.data_from_json(i) 77 conn._data[data.key] = data 78 79 except BaseException: 80 await aio.uncancellable(conn.async_close()) 81 raise 82 83 conn.async_group.spawn(conn._read_loop) 84 85 return conn
Connect to PNET Gateway server
Additional arguments are passed directly to hat.drivers.tcp.connect.
Arguments:
- address: PNET Gateway server address
- username: user name
- password: password
- status_cb: status change callback
- data_cb: data change callback
- subscriptions: list of data keys for subscriptions
If subscription is None, client is subscribed to all data changes
from the server.
class
Connection(hat.aio.group.Resource):
88class Connection(aio.Resource): 89 """PNET Gateway connection 90 91 For creating new connection see :func:`connect` 92 93 """ 94 95 @property 96 def async_group(self) -> aio.Group: 97 """Async group""" 98 return self._conn.async_group 99 100 @property 101 def pnet_status(self) -> common.Status: 102 """PNET Gateway server's status""" 103 return self._pnet_status 104 105 @property 106 def data(self) -> dict[str, common.Data]: 107 """Subscribed data""" 108 return self._data 109 110 async def change_data(self, 111 changes: typing.Iterable[common.Change] 112 ) -> list[bool]: 113 """Send change data request to PNET Gateway server""" 114 return await self._send_with_response( 115 'change_data_request', 116 [encoder.change_to_json(i) for i in changes]) 117 118 async def send_commands(self, 119 commands: typing.Iterable[common.Command] 120 ) -> list[bool]: 121 """Send commands to PNET Gateway""" 122 return await self._send_with_response( 123 'command_request', 124 [encoder.command_to_json(i) for i in commands]) 125 126 async def _send_with_response(self, msg_type, msg_data): 127 if not self.is_open: 128 raise ConnectionError() 129 130 msg_id = next(self._next_ids) 131 msg = {'type': msg_type, 132 'body': {'id': msg_id, 133 'data': msg_data}} 134 135 future = asyncio.Future() 136 self._id_futures[msg_id] = future 137 138 try: 139 await self._conn.send(msg) 140 return await future 141 142 finally: 143 self._id_futures.pop(msg_id) 144 145 async def _read_loop(self): 146 try: 147 while True: 148 msg = await self._conn.receive() 149 150 if msg['type'] == 'change_data_response': 151 self._on_change_data_response(msg['body']) 152 153 elif msg['type'] == 'command_response': 154 self._on_command_response(msg['body']) 155 156 elif msg['type'] == 'data_changed_unsolicited': 157 await self._on_data_changed_unsolicited(msg['body']) 158 159 elif msg['type'] == 'status_changed_unsolicited': 160 await self._on_status_changed_unsolicited(msg['body']) 161 162 except ConnectionError: 163 pass 164 165 except Exception as e: 166 self._log.warning('read loop error: %s', e, exc_info=e) 167 168 finally: 169 self.close() 170 for future in self._id_futures.values(): 171 if not future.done(): 172 future.set_exception(ConnectionError()) 173 174 def _on_change_data_response(self, body): 175 future = self._id_futures.get(body.get('id')) 176 if not future or future.done(): 177 return 178 179 self._log.debug('received change data response for %s', body.get('id')) 180 future.set_result(body['success']) 181 182 def _on_command_response(self, body): 183 future = self._id_futures.get(body.get('id')) 184 if not future or future.done(): 185 return 186 187 self._log.debug('received command response for %s', body.get('id')) 188 future.set_result(body['success']) 189 190 async def _on_data_changed_unsolicited(self, body): 191 if not body['data']: 192 return 193 194 self._log.debug('received data change unsolicited') 195 data = [encoder.data_from_json(i) for i in body['data']] 196 for i in data: 197 self._data[i.key] = i 198 await aio.call(self._data_cb, data) 199 200 async def _on_status_changed_unsolicited(self, body): 201 self._log.debug('received status changed unsolicited') 202 self._pnet_status = common.Status(body['status']) 203 204 self._data = {} 205 for i in body['data']: 206 data = encoder.data_from_json(i) 207 self._data[data.key] = data 208 209 await aio.call(self._status_cb, self._pnet_status)
PNET Gateway connection
For creating new connection see connect()
async_group: hat.aio.group.Group
95 @property 96 def async_group(self) -> aio.Group: 97 """Async group""" 98 return self._conn.async_group
Async group
pnet_status: Status
100 @property 101 def pnet_status(self) -> common.Status: 102 """PNET Gateway server's status""" 103 return self._pnet_status
PNET Gateway server's status
data: dict[str, Data]
105 @property 106 def data(self) -> dict[str, common.Data]: 107 """Subscribed data""" 108 return self._data
Subscribed data
110 async def change_data(self, 111 changes: typing.Iterable[common.Change] 112 ) -> list[bool]: 113 """Send change data request to PNET Gateway server""" 114 return await self._send_with_response( 115 'change_data_request', 116 [encoder.change_to_json(i) for i in changes])
Send change data request to PNET Gateway server