hat.drivers.pnetgateway
1from hat.drivers.pnetgateway.common import (Status, 2 Quality, 3 Source, 4 DataType, 5 Data, 6 Change, 7 Command) 8from hat.drivers.pnetgateway.client import (StatusCb, 9 DataCb, 10 connect, 11 Connection) 12 13 14__all__ = ['Status', 15 'Quality', 16 'Source', 17 'DataType', 18 'Data', 19 'Change', 20 'Command', 21 'StatusCb', 22 'DataCb', 23 'connect', 24 'Connection']
class
Status(enum.Enum):
8class Status(enum.Enum): 9 DISCONNECTED = 'DISCONNECTED' 10 CONNECTING = 'CONNECTING' 11 CONNECTED = 'CONNECTED'
An enumeration.
DISCONNECTED =
<Status.DISCONNECTED: 'DISCONNECTED'>
CONNECTING =
<Status.CONNECTING: 'CONNECTING'>
CONNECTED =
<Status.CONNECTED: 'CONNECTED'>
class
Quality(enum.Enum):
14class Quality(enum.Enum): 15 NONE = 'NONE' 16 GOOD = 'GOOD' 17 UNRELIABLE = 'UNRELIABLE' 18 BAD = 'BAD'
An enumeration.
NONE =
<Quality.NONE: 'NONE'>
GOOD =
<Quality.GOOD: 'GOOD'>
UNRELIABLE =
<Quality.UNRELIABLE: 'UNRELIABLE'>
BAD =
<Quality.BAD: 'BAD'>
class
Source(enum.Enum):
21class Source(enum.Enum): 22 APPLICATION = 'APPLICATION' 23 CALCULATED = 'CALCULATED' 24 MANUAL_ENTRY = 'MANUAL_ENTRY' 25 NONSPECIFIC = 'NONSPECIFIC' 26 PROC_MOD = 'PROC_MOD' 27 REMOTE_SRC = 'REMOTE_SRC' 28 REMOTE_SRC_INVALID = 'REMOTE_SRC_INVALID' 29 SCHEDULED = 'SCHEDULED'
An enumeration.
APPLICATION =
<Source.APPLICATION: 'APPLICATION'>
CALCULATED =
<Source.CALCULATED: 'CALCULATED'>
MANUAL_ENTRY =
<Source.MANUAL_ENTRY: 'MANUAL_ENTRY'>
NONSPECIFIC =
<Source.NONSPECIFIC: 'NONSPECIFIC'>
PROC_MOD =
<Source.PROC_MOD: 'PROC_MOD'>
REMOTE_SRC =
<Source.REMOTE_SRC: 'REMOTE_SRC'>
REMOTE_SRC_INVALID =
<Source.REMOTE_SRC_INVALID: 'REMOTE_SRC_INVALID'>
SCHEDULED =
<Source.SCHEDULED: 'SCHEDULED'>
class
DataType(enum.Enum):
32class DataType(enum.Enum): 33 BLOB = 'BLOB' 34 COMMAND = 'COMMAND' 35 COUNTER = 'COUNTER' 36 EVENT = 'EVENT' 37 GROUP = 'GROUP' 38 NUMERIC = 'NUMERIC' 39 STATES = 'STATES' 40 UNKNOWN = 'UNKNOWN'
An enumeration.
BLOB =
<DataType.BLOB: 'BLOB'>
COMMAND =
<DataType.COMMAND: 'COMMAND'>
COUNTER =
<DataType.COUNTER: 'COUNTER'>
EVENT =
<DataType.EVENT: 'EVENT'>
GROUP =
<DataType.GROUP: 'GROUP'>
NUMERIC =
<DataType.NUMERIC: 'NUMERIC'>
STATES =
<DataType.STATES: 'STATES'>
UNKNOWN =
<DataType.UNKNOWN: 'UNKNOWN'>
class
Data(typing.NamedTuple):
43class Data(typing.NamedTuple): 44 key: str 45 value: json.Data 46 quality: Quality 47 timestamp: float 48 type: DataType 49 source: Source
Data(key, value, quality, timestamp, type, source)
class
Change(typing.NamedTuple):
52class Change(typing.NamedTuple): 53 key: str 54 value: json.Data | None 55 quality: Quality | None 56 timestamp: float | None 57 source: Source | None
Change(key, value, quality, timestamp, source)
class
Command(typing.NamedTuple):
Command(key, value)
StatusCb =
typing.Callable[[Status], typing.Optional[typing.Awaitable[NoneType]]]
DataCb =
typing.Callable[[list[Data]], typing.Optional[typing.Awaitable[NoneType]]]
async def
connect( addr: hat.drivers.tcp.Address, username: str, password: str, status_cb: Callable[[Status], Optional[Awaitable[NoneType]]], data_cb: Callable[[list[Data]], Optional[Awaitable[NoneType]]], subscriptions: list[str] | None = None, **kwargs) -> Connection:
25async def connect(addr: tcp.Address, 26 username: str, 27 password: str, 28 status_cb: StatusCb, 29 data_cb: DataCb, 30 subscriptions: list[str] | None = None, 31 **kwargs 32 ) -> 'Connection': 33 """Connect to PNET Gateway server 34 35 Additional arguments are passed directly to `hat.drivers.tcp.connect`. 36 37 Args: 38 address: PNET Gateway server address 39 username: user name 40 password: password 41 status_cb: status change callback 42 data_cb: data change callback 43 subscriptions: list of data keys for subscriptions 44 45 If subscription is ``None``, client is subscribed to all data changes 46 from the server. 47 48 """ 49 conn = Connection() 50 conn._pnet_status = common.Status.DISCONNECTED 51 conn._data = {} 52 conn._status_cb = status_cb 53 conn._data_cb = data_cb 54 conn._next_ids = itertools.count(0) 55 conn._id_futures = {} 56 57 conn._conn = transport.Transport(await tcp.connect(addr, **kwargs)) 58 59 try: 60 await conn._conn.send({'type': 'authentication_request', 61 'body': {'username': username, 62 'password': password, 63 'subscriptions': subscriptions}}) 64 65 msg = None 66 while msg is None or msg['type'] != 'authentication_response': 67 msg = await conn._conn.receive() 68 69 if not msg['body']['success']: 70 raise Exception('authentication failed') 71 72 conn._pnet_status = common.Status(msg['body']['status']) 73 for i in msg['body']['data']: 74 data = encoder.data_from_json(i) 75 conn._data[data.key] = data 76 77 except BaseException: 78 await aio.uncancellable(conn.async_close()) 79 raise 80 81 conn.async_group.spawn(conn._read_loop) 82 83 return conn
Connect to PNET Gateway server
Additional arguments are passed directly to hat.drivers.tcp.connect
.
Arguments:
- address: PNET Gateway server address
- username: user name
- password: password
- status_cb: status change callback
- data_cb: data change callback
- subscriptions: list of data keys for subscriptions
If subscription is None
, client is subscribed to all data changes
from the server.
class
Connection(hat.aio.group.Resource):
86class Connection(aio.Resource): 87 """PNET Gateway connection 88 89 For creating new connection see :func:`connect` 90 91 """ 92 93 @property 94 def async_group(self) -> aio.Group: 95 """Async group""" 96 return self._conn.async_group 97 98 @property 99 def pnet_status(self) -> common.Status: 100 """PNET Gateway server's status""" 101 return self._pnet_status 102 103 @property 104 def data(self) -> dict[str, common.Data]: 105 """Subscribed data""" 106 return self._data 107 108 async def change_data(self, 109 changes: typing.Iterable[common.Change] 110 ) -> list[bool]: 111 """Send change data request to PNET Gateway server""" 112 return await self._send_with_response( 113 'change_data_request', 114 [encoder.change_to_json(i) for i in changes]) 115 116 async def send_commands(self, 117 commands: typing.Iterable[common.Command] 118 ) -> list[bool]: 119 """Send commands to PNET Gateway""" 120 return await self._send_with_response( 121 'command_request', 122 [encoder.command_to_json(i) for i in commands]) 123 124 async def _send_with_response(self, msg_type, msg_data): 125 if not self.is_open: 126 raise ConnectionError() 127 128 msg_id = next(self._next_ids) 129 msg = {'type': msg_type, 130 'body': {'id': msg_id, 131 'data': msg_data}} 132 133 future = asyncio.Future() 134 self._id_futures[msg_id] = future 135 136 try: 137 await self._conn.send(msg) 138 return await future 139 140 finally: 141 self._id_futures.pop(msg_id) 142 143 async def _read_loop(self): 144 try: 145 while True: 146 msg = await self._conn.receive() 147 148 if msg['type'] == 'change_data_response': 149 self._on_change_data_response(msg['body']) 150 151 elif msg['type'] == 'command_response': 152 self._on_command_response(msg['body']) 153 154 elif msg['type'] == 'data_changed_unsolicited': 155 await self._on_data_changed_unsolicited(msg['body']) 156 157 elif msg['type'] == 'status_changed_unsolicited': 158 await self._on_status_changed_unsolicited(msg['body']) 159 160 except ConnectionError: 161 pass 162 163 except Exception as e: 164 mlog.warning('read loop error: %s', e, exc_info=e) 165 166 finally: 167 self.close() 168 for future in self._id_futures.values(): 169 if not future.done(): 170 future.set_exception(ConnectionError()) 171 172 def _on_change_data_response(self, body): 173 future = self._id_futures.get(body.get('id')) 174 if not future or future.done(): 175 return 176 177 mlog.debug('received change data response for %s', body.get('id')) 178 future.set_result(body['success']) 179 180 def _on_command_response(self, body): 181 future = self._id_futures.get(body.get('id')) 182 if not future or future.done(): 183 return 184 185 mlog.debug('received command response for %s', body.get('id')) 186 future.set_result(body['success']) 187 188 async def _on_data_changed_unsolicited(self, body): 189 if not body['data']: 190 return 191 192 mlog.debug('received data change unsolicited') 193 data = [encoder.data_from_json(i) for i in body['data']] 194 for i in data: 195 self._data[i.key] = i 196 await aio.call(self._data_cb, data) 197 198 async def _on_status_changed_unsolicited(self, body): 199 mlog.debug('received status changed unsolicited') 200 self._pnet_status = common.Status(body['status']) 201 202 self._data = {} 203 for i in body['data']: 204 data = encoder.data_from_json(i) 205 self._data[data.key] = data 206 207 await aio.call(self._status_cb, self._pnet_status)
PNET Gateway connection
For creating new connection see connect()
async_group: hat.aio.group.Group
93 @property 94 def async_group(self) -> aio.Group: 95 """Async group""" 96 return self._conn.async_group
Async group
pnet_status: Status
98 @property 99 def pnet_status(self) -> common.Status: 100 """PNET Gateway server's status""" 101 return self._pnet_status
PNET Gateway server's status
data: dict[str, Data]
103 @property 104 def data(self) -> dict[str, common.Data]: 105 """Subscribed data""" 106 return self._data
Subscribed data
108 async def change_data(self, 109 changes: typing.Iterable[common.Change] 110 ) -> list[bool]: 111 """Send change data request to PNET Gateway server""" 112 return await self._send_with_response( 113 'change_data_request', 114 [encoder.change_to_json(i) for i in changes])
Send change data request to PNET Gateway server