hat.drivers.pnetgateway

 1from hat.drivers.pnetgateway.common import (Status,
 2                                            Quality,
 3                                            Source,
 4                                            DataType,
 5                                            Data,
 6                                            Change,
 7                                            Command)
 8from hat.drivers.pnetgateway.client import (StatusCb,
 9                                            DataCb,
10                                            connect,
11                                            Connection)
12
13
14__all__ = ['Status',
15           'Quality',
16           'Source',
17           'DataType',
18           'Data',
19           'Change',
20           'Command',
21           'StatusCb',
22           'DataCb',
23           'connect',
24           'Connection']
class Status(enum.Enum):
 8class Status(enum.Enum):
 9    DISCONNECTED = 'DISCONNECTED'
10    CONNECTING = 'CONNECTING'
11    CONNECTED = 'CONNECTED'
DISCONNECTED = <Status.DISCONNECTED: 'DISCONNECTED'>
CONNECTING = <Status.CONNECTING: 'CONNECTING'>
CONNECTED = <Status.CONNECTED: 'CONNECTED'>
class Quality(enum.Enum):
14class Quality(enum.Enum):
15    NONE = 'NONE'
16    GOOD = 'GOOD'
17    UNRELIABLE = 'UNRELIABLE'
18    BAD = 'BAD'
NONE = <Quality.NONE: 'NONE'>
GOOD = <Quality.GOOD: 'GOOD'>
UNRELIABLE = <Quality.UNRELIABLE: 'UNRELIABLE'>
BAD = <Quality.BAD: 'BAD'>
class Source(enum.Enum):
21class Source(enum.Enum):
22    APPLICATION = 'APPLICATION'
23    CALCULATED = 'CALCULATED'
24    MANUAL_ENTRY = 'MANUAL_ENTRY'
25    NONSPECIFIC = 'NONSPECIFIC'
26    PROC_MOD = 'PROC_MOD'
27    REMOTE_SRC = 'REMOTE_SRC'
28    REMOTE_SRC_INVALID = 'REMOTE_SRC_INVALID'
29    SCHEDULED = 'SCHEDULED'
APPLICATION = <Source.APPLICATION: 'APPLICATION'>
CALCULATED = <Source.CALCULATED: 'CALCULATED'>
MANUAL_ENTRY = <Source.MANUAL_ENTRY: 'MANUAL_ENTRY'>
NONSPECIFIC = <Source.NONSPECIFIC: 'NONSPECIFIC'>
PROC_MOD = <Source.PROC_MOD: 'PROC_MOD'>
REMOTE_SRC = <Source.REMOTE_SRC: 'REMOTE_SRC'>
REMOTE_SRC_INVALID = <Source.REMOTE_SRC_INVALID: 'REMOTE_SRC_INVALID'>
SCHEDULED = <Source.SCHEDULED: 'SCHEDULED'>
class DataType(enum.Enum):
32class DataType(enum.Enum):
33    BLOB = 'BLOB'
34    COMMAND = 'COMMAND'
35    COUNTER = 'COUNTER'
36    EVENT = 'EVENT'
37    GROUP = 'GROUP'
38    NUMERIC = 'NUMERIC'
39    STATES = 'STATES'
40    UNKNOWN = 'UNKNOWN'
BLOB = <DataType.BLOB: 'BLOB'>
COMMAND = <DataType.COMMAND: 'COMMAND'>
COUNTER = <DataType.COUNTER: 'COUNTER'>
EVENT = <DataType.EVENT: 'EVENT'>
GROUP = <DataType.GROUP: 'GROUP'>
NUMERIC = <DataType.NUMERIC: 'NUMERIC'>
STATES = <DataType.STATES: 'STATES'>
UNKNOWN = <DataType.UNKNOWN: 'UNKNOWN'>
class Data(typing.NamedTuple):
43class Data(typing.NamedTuple):
44    key: str
45    value: json.Data
46    quality: Quality
47    timestamp: float
48    type: DataType
49    source: Source

Data(key, value, quality, timestamp, type, source)

Data( key: str, value: Union[NoneType, bool, int, float, str, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')]], quality: Quality, timestamp: float, type: DataType, source: Source)

Create new instance of Data(key, value, quality, timestamp, type, source)

key: str

Alias for field number 0

value: Union[NoneType, bool, int, float, str, List[Data], Dict[str, Data]]

Alias for field number 1

quality: Quality

Alias for field number 2

timestamp: float

Alias for field number 3

type: DataType

Alias for field number 4

source: Source

Alias for field number 5

class Change(typing.NamedTuple):
52class Change(typing.NamedTuple):
53    key: str
54    value: json.Data | None
55    quality: Quality | None
56    timestamp: float | None
57    source: Source | None

Change(key, value, quality, timestamp, source)

Change( key: str, value: Union[NoneType, bool, int, float, str, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')]], quality: Quality | None, timestamp: float | None, source: Source | None)

Create new instance of Change(key, value, quality, timestamp, source)

key: str

Alias for field number 0

value: Union[NoneType, bool, int, float, str, List[Data], Dict[str, Data]]

Alias for field number 1

quality: Quality | None

Alias for field number 2

timestamp: float | None

Alias for field number 3

source: Source | None

Alias for field number 4

class Command(typing.NamedTuple):
60class Command(typing.NamedTuple):
61    key: str
62    value: json.Data

Command(key, value)

Command( key: str, value: Union[NoneType, bool, int, float, str, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')]])

Create new instance of Command(key, value)

key: str

Alias for field number 0

value: Union[NoneType, bool, int, float, str, List[Data], Dict[str, Data]]

Alias for field number 1

StatusCb = typing.Callable[[Status], None | collections.abc.Awaitable[None]]
DataCb = typing.Callable[[list[Data]], None | collections.abc.Awaitable[None]]
async def connect( addr: hat.drivers.tcp.Address, username: str, password: str, status_cb: Callable[[Status], None | Awaitable[None]], data_cb: Callable[[list[Data]], None | Awaitable[None]], subscriptions: list[str] | None = None, **kwargs) -> Connection:
25async def connect(addr: tcp.Address,
26                  username: str,
27                  password: str,
28                  status_cb: StatusCb,
29                  data_cb: DataCb,
30                  subscriptions: list[str] | None = None,
31                  **kwargs
32                  ) -> 'Connection':
33    """Connect to PNET Gateway server
34
35    Additional arguments are passed directly to `hat.drivers.tcp.connect`.
36
37    Args:
38        address: PNET Gateway server address
39        username: user name
40        password: password
41        status_cb: status change callback
42        data_cb: data change callback
43        subscriptions: list of data keys for subscriptions
44
45    If subscription is ``None``, client is subscribed to all data changes
46    from the server.
47
48    """
49    conn = Connection()
50    conn._pnet_status = common.Status.DISCONNECTED
51    conn._data = {}
52    conn._status_cb = status_cb
53    conn._data_cb = data_cb
54    conn._next_ids = itertools.count(0)
55    conn._id_futures = {}
56
57    conn._conn = transport.Transport(await tcp.connect(addr, **kwargs))
58
59    conn._log = _create_logger(conn._conn.info)
60
61    try:
62        await conn._conn.send({'type': 'authentication_request',
63                               'body': {'username': username,
64                                        'password': password,
65                                        'subscriptions': subscriptions}})
66
67        msg = None
68        while msg is None or msg['type'] != 'authentication_response':
69            msg = await conn._conn.receive()
70
71        if not msg['body']['success']:
72            raise Exception('authentication failed')
73
74        conn._pnet_status = common.Status(msg['body']['status'])
75        for i in msg['body']['data']:
76            data = encoder.data_from_json(i)
77            conn._data[data.key] = data
78
79    except BaseException:
80        await aio.uncancellable(conn.async_close())
81        raise
82
83    conn.async_group.spawn(conn._read_loop)
84
85    return conn

Connect to PNET Gateway server

Additional arguments are passed directly to hat.drivers.tcp.connect.

Arguments:
  • address: PNET Gateway server address
  • username: user name
  • password: password
  • status_cb: status change callback
  • data_cb: data change callback
  • subscriptions: list of data keys for subscriptions

If subscription is None, client is subscribed to all data changes from the server.

class Connection(hat.aio.group.Resource):
 88class Connection(aio.Resource):
 89    """PNET Gateway connection
 90
 91    For creating new connection see :func:`connect`
 92
 93    """
 94
 95    @property
 96    def async_group(self) -> aio.Group:
 97        """Async group"""
 98        return self._conn.async_group
 99
100    @property
101    def pnet_status(self) -> common.Status:
102        """PNET Gateway server's status"""
103        return self._pnet_status
104
105    @property
106    def data(self) -> dict[str, common.Data]:
107        """Subscribed data"""
108        return self._data
109
110    async def change_data(self,
111                          changes: typing.Iterable[common.Change]
112                          ) -> list[bool]:
113        """Send change data request to PNET Gateway server"""
114        return await self._send_with_response(
115            'change_data_request',
116            [encoder.change_to_json(i) for i in changes])
117
118    async def send_commands(self,
119                            commands: typing.Iterable[common.Command]
120                            ) -> list[bool]:
121        """Send commands to PNET Gateway"""
122        return await self._send_with_response(
123            'command_request',
124            [encoder.command_to_json(i) for i in commands])
125
126    async def _send_with_response(self, msg_type, msg_data):
127        if not self.is_open:
128            raise ConnectionError()
129
130        msg_id = next(self._next_ids)
131        msg = {'type': msg_type,
132               'body': {'id': msg_id,
133                        'data': msg_data}}
134
135        future = asyncio.Future()
136        self._id_futures[msg_id] = future
137
138        try:
139            await self._conn.send(msg)
140            return await future
141
142        finally:
143            self._id_futures.pop(msg_id)
144
145    async def _read_loop(self):
146        try:
147            while True:
148                msg = await self._conn.receive()
149
150                if msg['type'] == 'change_data_response':
151                    self._on_change_data_response(msg['body'])
152
153                elif msg['type'] == 'command_response':
154                    self._on_command_response(msg['body'])
155
156                elif msg['type'] == 'data_changed_unsolicited':
157                    await self._on_data_changed_unsolicited(msg['body'])
158
159                elif msg['type'] == 'status_changed_unsolicited':
160                    await self._on_status_changed_unsolicited(msg['body'])
161
162        except ConnectionError:
163            pass
164
165        except Exception as e:
166            self._log.warning('read loop error: %s', e, exc_info=e)
167
168        finally:
169            self.close()
170            for future in self._id_futures.values():
171                if not future.done():
172                    future.set_exception(ConnectionError())
173
174    def _on_change_data_response(self, body):
175        future = self._id_futures.get(body.get('id'))
176        if not future or future.done():
177            return
178
179        self._log.debug('received change data response for %s', body.get('id'))
180        future.set_result(body['success'])
181
182    def _on_command_response(self, body):
183        future = self._id_futures.get(body.get('id'))
184        if not future or future.done():
185            return
186
187        self._log.debug('received command response for %s', body.get('id'))
188        future.set_result(body['success'])
189
190    async def _on_data_changed_unsolicited(self, body):
191        if not body['data']:
192            return
193
194        self._log.debug('received data change unsolicited')
195        data = [encoder.data_from_json(i) for i in body['data']]
196        for i in data:
197            self._data[i.key] = i
198        await aio.call(self._data_cb, data)
199
200    async def _on_status_changed_unsolicited(self, body):
201        self._log.debug('received status changed unsolicited')
202        self._pnet_status = common.Status(body['status'])
203
204        self._data = {}
205        for i in body['data']:
206            data = encoder.data_from_json(i)
207            self._data[data.key] = data
208
209        await aio.call(self._status_cb, self._pnet_status)

PNET Gateway connection

For creating new connection see connect()

async_group: hat.aio.group.Group
95    @property
96    def async_group(self) -> aio.Group:
97        """Async group"""
98        return self._conn.async_group

Async group

pnet_status: Status
100    @property
101    def pnet_status(self) -> common.Status:
102        """PNET Gateway server's status"""
103        return self._pnet_status

PNET Gateway server's status

data: dict[str, Data]
105    @property
106    def data(self) -> dict[str, common.Data]:
107        """Subscribed data"""
108        return self._data

Subscribed data

async def change_data( self, changes: Iterable[Change]) -> list[bool]:
110    async def change_data(self,
111                          changes: typing.Iterable[common.Change]
112                          ) -> list[bool]:
113        """Send change data request to PNET Gateway server"""
114        return await self._send_with_response(
115            'change_data_request',
116            [encoder.change_to_json(i) for i in changes])

Send change data request to PNET Gateway server

async def send_commands( self, commands: Iterable[Command]) -> list[bool]:
118    async def send_commands(self,
119                            commands: typing.Iterable[common.Command]
120                            ) -> list[bool]:
121        """Send commands to PNET Gateway"""
122        return await self._send_with_response(
123            'command_request',
124            [encoder.command_to_json(i) for i in commands])

Send commands to PNET Gateway