hat.drivers.pnetgateway

 1from hat.drivers.pnetgateway.common import (Status,
 2                                            Quality,
 3                                            Source,
 4                                            DataType,
 5                                            Data,
 6                                            Change,
 7                                            Command)
 8from hat.drivers.pnetgateway.client import (StatusCb,
 9                                            DataCb,
10                                            connect,
11                                            Connection)
12
13
14__all__ = ['Status',
15           'Quality',
16           'Source',
17           'DataType',
18           'Data',
19           'Change',
20           'Command',
21           'StatusCb',
22           'DataCb',
23           'connect',
24           'Connection']
class Status(enum.Enum):
 8class Status(enum.Enum):
 9    DISCONNECTED = 'DISCONNECTED'
10    CONNECTING = 'CONNECTING'
11    CONNECTED = 'CONNECTED'

An enumeration.

DISCONNECTED = <Status.DISCONNECTED: 'DISCONNECTED'>
CONNECTING = <Status.CONNECTING: 'CONNECTING'>
CONNECTED = <Status.CONNECTED: 'CONNECTED'>
class Quality(enum.Enum):
14class Quality(enum.Enum):
15    NONE = 'NONE'
16    GOOD = 'GOOD'
17    UNRELIABLE = 'UNRELIABLE'
18    BAD = 'BAD'

An enumeration.

NONE = <Quality.NONE: 'NONE'>
GOOD = <Quality.GOOD: 'GOOD'>
UNRELIABLE = <Quality.UNRELIABLE: 'UNRELIABLE'>
BAD = <Quality.BAD: 'BAD'>
class Source(enum.Enum):
21class Source(enum.Enum):
22    APPLICATION = 'APPLICATION'
23    CALCULATED = 'CALCULATED'
24    MANUAL_ENTRY = 'MANUAL_ENTRY'
25    NONSPECIFIC = 'NONSPECIFIC'
26    PROC_MOD = 'PROC_MOD'
27    REMOTE_SRC = 'REMOTE_SRC'
28    REMOTE_SRC_INVALID = 'REMOTE_SRC_INVALID'
29    SCHEDULED = 'SCHEDULED'

An enumeration.

APPLICATION = <Source.APPLICATION: 'APPLICATION'>
CALCULATED = <Source.CALCULATED: 'CALCULATED'>
MANUAL_ENTRY = <Source.MANUAL_ENTRY: 'MANUAL_ENTRY'>
NONSPECIFIC = <Source.NONSPECIFIC: 'NONSPECIFIC'>
PROC_MOD = <Source.PROC_MOD: 'PROC_MOD'>
REMOTE_SRC = <Source.REMOTE_SRC: 'REMOTE_SRC'>
REMOTE_SRC_INVALID = <Source.REMOTE_SRC_INVALID: 'REMOTE_SRC_INVALID'>
SCHEDULED = <Source.SCHEDULED: 'SCHEDULED'>
class DataType(enum.Enum):
32class DataType(enum.Enum):
33    BLOB = 'BLOB'
34    COMMAND = 'COMMAND'
35    COUNTER = 'COUNTER'
36    EVENT = 'EVENT'
37    GROUP = 'GROUP'
38    NUMERIC = 'NUMERIC'
39    STATES = 'STATES'
40    UNKNOWN = 'UNKNOWN'

An enumeration.

BLOB = <DataType.BLOB: 'BLOB'>
COMMAND = <DataType.COMMAND: 'COMMAND'>
COUNTER = <DataType.COUNTER: 'COUNTER'>
EVENT = <DataType.EVENT: 'EVENT'>
GROUP = <DataType.GROUP: 'GROUP'>
NUMERIC = <DataType.NUMERIC: 'NUMERIC'>
STATES = <DataType.STATES: 'STATES'>
UNKNOWN = <DataType.UNKNOWN: 'UNKNOWN'>
class Data(typing.NamedTuple):
43class Data(typing.NamedTuple):
44    key: str
45    value: json.Data
46    quality: Quality
47    timestamp: float
48    type: DataType
49    source: Source

Data(key, value, quality, timestamp, type, source)

Data( key: str, value: Union[NoneType, bool, int, float, str, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')]], quality: Quality, timestamp: float, type: DataType, source: Source)

Create new instance of Data(key, value, quality, timestamp, type, source)

key: str

Alias for field number 0

value: Union[NoneType, bool, int, float, str, List[Data], Dict[str, Data]]

Alias for field number 1

quality: Quality

Alias for field number 2

timestamp: float

Alias for field number 3

type: DataType

Alias for field number 4

source: Source

Alias for field number 5

class Change(typing.NamedTuple):
52class Change(typing.NamedTuple):
53    key: str
54    value: json.Data | None
55    quality: Quality | None
56    timestamp: float | None
57    source: Source | None

Change(key, value, quality, timestamp, source)

Change( key: str, value: Union[NoneType, bool, int, float, str, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')]], quality: Quality | None, timestamp: float | None, source: Source | None)

Create new instance of Change(key, value, quality, timestamp, source)

key: str

Alias for field number 0

value: Union[NoneType, bool, int, float, str, List[Data], Dict[str, Data]]

Alias for field number 1

quality: Quality | None

Alias for field number 2

timestamp: float | None

Alias for field number 3

source: Source | None

Alias for field number 4

class Command(typing.NamedTuple):
60class Command(typing.NamedTuple):
61    key: str
62    value: json.Data

Command(key, value)

Command( key: str, value: Union[NoneType, bool, int, float, str, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')]])

Create new instance of Command(key, value)

key: str

Alias for field number 0

value: Union[NoneType, bool, int, float, str, List[Data], Dict[str, Data]]

Alias for field number 1

StatusCb = typing.Callable[[Status], None | collections.abc.Awaitable[None]]
DataCb = typing.Callable[[list[Data]], None | collections.abc.Awaitable[None]]
async def connect( addr: hat.drivers.tcp.Address, username: str, password: str, status_cb: Callable[[Status], None | Awaitable[None]], data_cb: Callable[[list[Data]], None | Awaitable[None]], subscriptions: list[str] | None = None, **kwargs) -> Connection:
25async def connect(addr: tcp.Address,
26                  username: str,
27                  password: str,
28                  status_cb: StatusCb,
29                  data_cb: DataCb,
30                  subscriptions: list[str] | None = None,
31                  **kwargs
32                  ) -> 'Connection':
33    """Connect to PNET Gateway server
34
35    Additional arguments are passed directly to `hat.drivers.tcp.connect`.
36
37    Args:
38        address: PNET Gateway server address
39        username: user name
40        password: password
41        status_cb: status change callback
42        data_cb: data change callback
43        subscriptions: list of data keys for subscriptions
44
45    If subscription is ``None``, client is subscribed to all data changes
46    from the server.
47
48    """
49    conn = Connection()
50    conn._pnet_status = common.Status.DISCONNECTED
51    conn._data = {}
52    conn._status_cb = status_cb
53    conn._data_cb = data_cb
54    conn._next_ids = itertools.count(0)
55    conn._id_futures = {}
56
57    conn._conn = transport.Transport(await tcp.connect(addr, **kwargs))
58
59    try:
60        await conn._conn.send({'type': 'authentication_request',
61                               'body': {'username': username,
62                                        'password': password,
63                                        'subscriptions': subscriptions}})
64
65        msg = None
66        while msg is None or msg['type'] != 'authentication_response':
67            msg = await conn._conn.receive()
68
69        if not msg['body']['success']:
70            raise Exception('authentication failed')
71
72        conn._pnet_status = common.Status(msg['body']['status'])
73        for i in msg['body']['data']:
74            data = encoder.data_from_json(i)
75            conn._data[data.key] = data
76
77    except BaseException:
78        await aio.uncancellable(conn.async_close())
79        raise
80
81    conn.async_group.spawn(conn._read_loop)
82
83    return conn

Connect to PNET Gateway server

Additional arguments are passed directly to hat.drivers.tcp.connect.

Arguments:
  • address: PNET Gateway server address
  • username: user name
  • password: password
  • status_cb: status change callback
  • data_cb: data change callback
  • subscriptions: list of data keys for subscriptions

If subscription is None, client is subscribed to all data changes from the server.

class Connection(hat.aio.group.Resource):
 86class Connection(aio.Resource):
 87    """PNET Gateway connection
 88
 89    For creating new connection see :func:`connect`
 90
 91    """
 92
 93    @property
 94    def async_group(self) -> aio.Group:
 95        """Async group"""
 96        return self._conn.async_group
 97
 98    @property
 99    def pnet_status(self) -> common.Status:
100        """PNET Gateway server's status"""
101        return self._pnet_status
102
103    @property
104    def data(self) -> dict[str, common.Data]:
105        """Subscribed data"""
106        return self._data
107
108    async def change_data(self,
109                          changes: typing.Iterable[common.Change]
110                          ) -> list[bool]:
111        """Send change data request to PNET Gateway server"""
112        return await self._send_with_response(
113            'change_data_request',
114            [encoder.change_to_json(i) for i in changes])
115
116    async def send_commands(self,
117                            commands: typing.Iterable[common.Command]
118                            ) -> list[bool]:
119        """Send commands to PNET Gateway"""
120        return await self._send_with_response(
121            'command_request',
122            [encoder.command_to_json(i) for i in commands])
123
124    async def _send_with_response(self, msg_type, msg_data):
125        if not self.is_open:
126            raise ConnectionError()
127
128        msg_id = next(self._next_ids)
129        msg = {'type': msg_type,
130               'body': {'id': msg_id,
131                        'data': msg_data}}
132
133        future = asyncio.Future()
134        self._id_futures[msg_id] = future
135
136        try:
137            await self._conn.send(msg)
138            return await future
139
140        finally:
141            self._id_futures.pop(msg_id)
142
143    async def _read_loop(self):
144        try:
145            while True:
146                msg = await self._conn.receive()
147
148                if msg['type'] == 'change_data_response':
149                    self._on_change_data_response(msg['body'])
150
151                elif msg['type'] == 'command_response':
152                    self._on_command_response(msg['body'])
153
154                elif msg['type'] == 'data_changed_unsolicited':
155                    await self._on_data_changed_unsolicited(msg['body'])
156
157                elif msg['type'] == 'status_changed_unsolicited':
158                    await self._on_status_changed_unsolicited(msg['body'])
159
160        except ConnectionError:
161            pass
162
163        except Exception as e:
164            mlog.warning('read loop error: %s', e, exc_info=e)
165
166        finally:
167            self.close()
168            for future in self._id_futures.values():
169                if not future.done():
170                    future.set_exception(ConnectionError())
171
172    def _on_change_data_response(self, body):
173        future = self._id_futures.get(body.get('id'))
174        if not future or future.done():
175            return
176
177        mlog.debug('received change data response for %s', body.get('id'))
178        future.set_result(body['success'])
179
180    def _on_command_response(self, body):
181        future = self._id_futures.get(body.get('id'))
182        if not future or future.done():
183            return
184
185        mlog.debug('received command response for %s', body.get('id'))
186        future.set_result(body['success'])
187
188    async def _on_data_changed_unsolicited(self, body):
189        if not body['data']:
190            return
191
192        mlog.debug('received data change unsolicited')
193        data = [encoder.data_from_json(i) for i in body['data']]
194        for i in data:
195            self._data[i.key] = i
196        await aio.call(self._data_cb, data)
197
198    async def _on_status_changed_unsolicited(self, body):
199        mlog.debug('received status changed unsolicited')
200        self._pnet_status = common.Status(body['status'])
201
202        self._data = {}
203        for i in body['data']:
204            data = encoder.data_from_json(i)
205            self._data[data.key] = data
206
207        await aio.call(self._status_cb, self._pnet_status)

PNET Gateway connection

For creating new connection see connect()

async_group: hat.aio.group.Group
93    @property
94    def async_group(self) -> aio.Group:
95        """Async group"""
96        return self._conn.async_group

Async group

pnet_status: Status
 98    @property
 99    def pnet_status(self) -> common.Status:
100        """PNET Gateway server's status"""
101        return self._pnet_status

PNET Gateway server's status

data: dict[str, Data]
103    @property
104    def data(self) -> dict[str, common.Data]:
105        """Subscribed data"""
106        return self._data

Subscribed data

async def change_data( self, changes: Iterable[Change]) -> list[bool]:
108    async def change_data(self,
109                          changes: typing.Iterable[common.Change]
110                          ) -> list[bool]:
111        """Send change data request to PNET Gateway server"""
112        return await self._send_with_response(
113            'change_data_request',
114            [encoder.change_to_json(i) for i in changes])

Send change data request to PNET Gateway server

async def send_commands( self, commands: Iterable[Command]) -> list[bool]:
116    async def send_commands(self,
117                            commands: typing.Iterable[common.Command]
118                            ) -> list[bool]:
119        """Send commands to PNET Gateway"""
120        return await self._send_with_response(
121            'command_request',
122            [encoder.command_to_json(i) for i in commands])

Send commands to PNET Gateway