hat.drivers.snmp

  1from hat.drivers.snmp.common import (Version,
  2                                     ErrorType,
  3                                     CauseType,
  4                                     AuthType,
  5                                     PrivType,
  6                                     Error,
  7                                     Cause,
  8                                     IntegerData,
  9                                     UnsignedData,
 10                                     CounterData,
 11                                     BigCounterData,
 12                                     StringData,
 13                                     ObjectIdData,
 14                                     IpAddressData,
 15                                     TimeTicksData,
 16                                     ArbitraryData,
 17                                     EmptyData,
 18                                     UnspecifiedData,
 19                                     NoSuchObjectData,
 20                                     NoSuchInstanceData,
 21                                     EndOfMibViewData,
 22                                     Data,
 23                                     CommunityName,
 24                                     UserName,
 25                                     Password,
 26                                     EngineId,
 27                                     User,
 28                                     Context,
 29                                     Trap,
 30                                     Inform,
 31                                     GetDataReq,
 32                                     GetNextDataReq,
 33                                     GetBulkDataReq,
 34                                     SetDataReq,
 35                                     Request,
 36                                     Response)
 37from hat.drivers.snmp.agent import (V1RequestCb,
 38                                    V2CRequestCb,
 39                                    V3RequestCb,
 40                                    create_agent,
 41                                    Agent)
 42from hat.drivers.snmp.manager import (Manager,
 43                                      create_v1_manager,
 44                                      create_v2c_manager,
 45                                      create_v3_manager)
 46from hat.drivers.snmp.trap import (V1TrapCb,
 47                                   V2CTrapCb,
 48                                   V2CInformCb,
 49                                   V3TrapCb,
 50                                   V3InformCb,
 51                                   create_trap_listener,
 52                                   TrapListener,
 53                                   TrapSender,
 54                                   create_v1_trap_sender,
 55                                   create_v2c_trap_sender,
 56                                   create_v3_trap_sender)
 57
 58
 59__all__ = ['Version',
 60           'ErrorType',
 61           'CauseType',
 62           'AuthType',
 63           'PrivType',
 64           'Error',
 65           'Cause',
 66           'IntegerData',
 67           'UnsignedData',
 68           'CounterData',
 69           'BigCounterData',
 70           'StringData',
 71           'ObjectIdData',
 72           'IpAddressData',
 73           'TimeTicksData',
 74           'ArbitraryData',
 75           'EmptyData',
 76           'UnspecifiedData',
 77           'NoSuchObjectData',
 78           'NoSuchInstanceData',
 79           'EndOfMibViewData',
 80           'Data',
 81           'CommunityName',
 82           'UserName',
 83           'Password',
 84           'EngineId',
 85           'User',
 86           'Context',
 87           'Trap',
 88           'Inform',
 89           'GetDataReq',
 90           'GetNextDataReq',
 91           'GetBulkDataReq',
 92           'SetDataReq',
 93           'Request',
 94           'Response',
 95           'V1RequestCb',
 96           'V2CRequestCb',
 97           'V3RequestCb',
 98           'create_agent',
 99           'Agent',
100           'Manager',
101           'create_v1_manager',
102           'create_v2c_manager',
103           'create_v3_manager',
104           'V1TrapCb',
105           'V2CTrapCb',
106           'V2CInformCb',
107           'V3TrapCb',
108           'V3InformCb',
109           'create_trap_listener',
110           'TrapListener',
111           'TrapSender',
112           'create_v1_trap_sender',
113           'create_v2c_trap_sender',
114           'create_v3_trap_sender']
class Version(enum.Enum):
10class Version(enum.Enum):
11    V1 = 0
12    V2C = 1
13    V3 = 3

An enumeration.

V1 = <Version.V1: 0>
V2C = <Version.V2C: 1>
V3 = <Version.V3: 3>
class ErrorType(enum.Enum):
16class ErrorType(enum.Enum):
17    NO_ERROR = 0                 # v1, v2c, v3
18    TOO_BIG = 1                  # v1, v2c, v3
19    NO_SUCH_NAME = 2             # v1, v2c, v3
20    BAD_VALUE = 3                # v1, v2c, v3
21    READ_ONLY = 4                # v1, v2c, v3
22    GEN_ERR = 5                  # v1, v2c, v3
23    NO_ACCESS = 6                # v2c, v3
24    WRONG_TYPE = 7               # v2c, v3
25    WRONG_LENGTH = 8             # v2c, v3
26    WRONG_ENCODING = 9           # v2c, v3
27    WRONG_VALUE = 10             # v2c, v3
28    NO_CREATION = 11             # v2c, v3
29    INCONSISTENT_VALUE = 12      # v2c, v3
30    RESOURCE_UNAVAILABLE = 13    # v2c, v3
31    COMMIT_FAILED = 14           # v2c, v3
32    UNDO_FAILED = 15             # v2c, v3
33    AUTHORIZATION_ERROR = 16     # v2c, v3
34    NOT_WRITABLE = 17            # v2c, v3
35    INCONSISTENT_NAME = 18       # v2c, v3

An enumeration.

NO_ERROR = <ErrorType.NO_ERROR: 0>
TOO_BIG = <ErrorType.TOO_BIG: 1>
NO_SUCH_NAME = <ErrorType.NO_SUCH_NAME: 2>
BAD_VALUE = <ErrorType.BAD_VALUE: 3>
READ_ONLY = <ErrorType.READ_ONLY: 4>
GEN_ERR = <ErrorType.GEN_ERR: 5>
NO_ACCESS = <ErrorType.NO_ACCESS: 6>
WRONG_TYPE = <ErrorType.WRONG_TYPE: 7>
WRONG_LENGTH = <ErrorType.WRONG_LENGTH: 8>
WRONG_ENCODING = <ErrorType.WRONG_ENCODING: 9>
WRONG_VALUE = <ErrorType.WRONG_VALUE: 10>
NO_CREATION = <ErrorType.NO_CREATION: 11>
INCONSISTENT_VALUE = <ErrorType.INCONSISTENT_VALUE: 12>
RESOURCE_UNAVAILABLE = <ErrorType.RESOURCE_UNAVAILABLE: 13>
COMMIT_FAILED = <ErrorType.COMMIT_FAILED: 14>
UNDO_FAILED = <ErrorType.UNDO_FAILED: 15>
AUTHORIZATION_ERROR = <ErrorType.AUTHORIZATION_ERROR: 16>
NOT_WRITABLE = <ErrorType.NOT_WRITABLE: 17>
INCONSISTENT_NAME = <ErrorType.INCONSISTENT_NAME: 18>
class CauseType(enum.Enum):
38class CauseType(enum.Enum):
39    COLD_START = 0
40    WARM_START = 1
41    LINK_DOWN = 2
42    LINK_UP = 3
43    AUTHENICATION_FAILURE = 4
44    EGP_NEIGHBOR_LOSS = 5
45    ENTERPRISE_SPECIFIC = 6

An enumeration.

COLD_START = <CauseType.COLD_START: 0>
WARM_START = <CauseType.WARM_START: 1>
AUTHENICATION_FAILURE = <CauseType.AUTHENICATION_FAILURE: 4>
EGP_NEIGHBOR_LOSS = <CauseType.EGP_NEIGHBOR_LOSS: 5>
ENTERPRISE_SPECIFIC = <CauseType.ENTERPRISE_SPECIFIC: 6>
class AuthType(enum.Enum):
48class AuthType(enum.Enum):
49    MD5 = 1
50    SHA = 2

An enumeration.

MD5 = <AuthType.MD5: 1>
SHA = <AuthType.SHA: 2>
class PrivType(enum.Enum):
53class PrivType(enum.Enum):
54    DES = 1

An enumeration.

DES = <PrivType.DES: 1>
class Error(typing.NamedTuple):
57class Error(typing.NamedTuple):
58    type: ErrorType
59    index: int

Error(type, index)

Error(type: ErrorType, index: int)

Create new instance of Error(type, index)

type: ErrorType

Alias for field number 0

index: int

Alias for field number 1

class Cause(typing.NamedTuple):
62class Cause(typing.NamedTuple):
63    type: CauseType
64    value: int

Cause(type, value)

Cause(type: CauseType, value: int)

Create new instance of Cause(type, value)

type: CauseType

Alias for field number 0

value: int

Alias for field number 1

class IntegerData(typing.NamedTuple):
68class IntegerData(typing.NamedTuple):
69    name: asn1.ObjectIdentifier
70    value: int

IntegerData(name, value)

IntegerData(name: tuple[int, ...], value: int)

Create new instance of IntegerData(name, value)

name: tuple[int, ...]

Alias for field number 0

value: int

Alias for field number 1

class UnsignedData(typing.NamedTuple):
74class UnsignedData(typing.NamedTuple):
75    name: asn1.ObjectIdentifier
76    value: int

UnsignedData(name, value)

UnsignedData(name: tuple[int, ...], value: int)

Create new instance of UnsignedData(name, value)

name: tuple[int, ...]

Alias for field number 0

value: int

Alias for field number 1

class CounterData(typing.NamedTuple):
80class CounterData(typing.NamedTuple):
81    name: asn1.ObjectIdentifier
82    value: int

CounterData(name, value)

CounterData(name: tuple[int, ...], value: int)

Create new instance of CounterData(name, value)

name: tuple[int, ...]

Alias for field number 0

value: int

Alias for field number 1

class BigCounterData(typing.NamedTuple):
86class BigCounterData(typing.NamedTuple):
87    name: asn1.ObjectIdentifier
88    value: int

BigCounterData(name, value)

BigCounterData(name: tuple[int, ...], value: int)

Create new instance of BigCounterData(name, value)

name: tuple[int, ...]

Alias for field number 0

value: int

Alias for field number 1

class StringData(typing.NamedTuple):
92class StringData(typing.NamedTuple):
93    name: asn1.ObjectIdentifier
94    value: util.Bytes

StringData(name, value)

StringData(name: tuple[int, ...], value: bytes | bytearray | memoryview)

Create new instance of StringData(name, value)

name: tuple[int, ...]

Alias for field number 0

value: bytes | bytearray | memoryview

Alias for field number 1

class ObjectIdData(typing.NamedTuple):
 98class ObjectIdData(typing.NamedTuple):
 99    name: asn1.ObjectIdentifier
100    value: asn1.ObjectIdentifier

ObjectIdData(name, value)

ObjectIdData(name: tuple[int, ...], value: tuple[int, ...])

Create new instance of ObjectIdData(name, value)

name: tuple[int, ...]

Alias for field number 0

value: tuple[int, ...]

Alias for field number 1

class IpAddressData(typing.NamedTuple):
104class IpAddressData(typing.NamedTuple):
105    name: asn1.ObjectIdentifier
106    value: tuple[int, int, int, int]

IpAddressData(name, value)

IpAddressData(name: tuple[int, ...], value: tuple[int, int, int, int])

Create new instance of IpAddressData(name, value)

name: tuple[int, ...]

Alias for field number 0

value: tuple[int, int, int, int]

Alias for field number 1

class TimeTicksData(typing.NamedTuple):
110class TimeTicksData(typing.NamedTuple):
111    name: asn1.ObjectIdentifier
112    value: int

TimeTicksData(name, value)

TimeTicksData(name: tuple[int, ...], value: int)

Create new instance of TimeTicksData(name, value)

name: tuple[int, ...]

Alias for field number 0

value: int

Alias for field number 1

class ArbitraryData(typing.NamedTuple):
116class ArbitraryData(typing.NamedTuple):
117    name: asn1.ObjectIdentifier
118    value: util.Bytes

ArbitraryData(name, value)

ArbitraryData(name: tuple[int, ...], value: bytes | bytearray | memoryview)

Create new instance of ArbitraryData(name, value)

name: tuple[int, ...]

Alias for field number 0

value: bytes | bytearray | memoryview

Alias for field number 1

class EmptyData(typing.NamedTuple):
122class EmptyData(typing.NamedTuple):
123    name: asn1.ObjectIdentifier

EmptyData(name,)

EmptyData(name: tuple[int, ...])

Create new instance of EmptyData(name,)

name: tuple[int, ...]

Alias for field number 0

class UnspecifiedData(typing.NamedTuple):
127class UnspecifiedData(typing.NamedTuple):
128    name: asn1.ObjectIdentifier

UnspecifiedData(name,)

UnspecifiedData(name: tuple[int, ...])

Create new instance of UnspecifiedData(name,)

name: tuple[int, ...]

Alias for field number 0

class NoSuchObjectData(typing.NamedTuple):
132class NoSuchObjectData(typing.NamedTuple):
133    name: asn1.ObjectIdentifier

NoSuchObjectData(name,)

NoSuchObjectData(name: tuple[int, ...])

Create new instance of NoSuchObjectData(name,)

name: tuple[int, ...]

Alias for field number 0

class NoSuchInstanceData(typing.NamedTuple):
137class NoSuchInstanceData(typing.NamedTuple):
138    name: asn1.ObjectIdentifier

NoSuchInstanceData(name,)

NoSuchInstanceData(name: tuple[int, ...])

Create new instance of NoSuchInstanceData(name,)

name: tuple[int, ...]

Alias for field number 0

class EndOfMibViewData(typing.NamedTuple):
142class EndOfMibViewData(typing.NamedTuple):
143    name: asn1.ObjectIdentifier

EndOfMibViewData(name,)

EndOfMibViewData(name: tuple[int, ...])

Create new instance of EndOfMibViewData(name,)

name: tuple[int, ...]

Alias for field number 0

CommunityName = <class 'str'>
UserName = <class 'str'>
Password = <class 'str'>
EngineId = bytes | bytearray | memoryview
class User(typing.NamedTuple):
171class User(typing.NamedTuple):
172    name: UserName
173    auth_type: AuthType | None
174    auth_password: Password | None
175    priv_type: PrivType | None
176    priv_password: Password | None

User(name, auth_type, auth_password, priv_type, priv_password)

User( name: str, auth_type: AuthType | None, auth_password: str | None, priv_type: PrivType | None, priv_password: str | None)

Create new instance of User(name, auth_type, auth_password, priv_type, priv_password)

name: str

Alias for field number 0

auth_type: AuthType | None

Alias for field number 1

auth_password: str | None

Alias for field number 2

priv_type: PrivType | None

Alias for field number 3

priv_password: str | None

Alias for field number 4

class Context(typing.NamedTuple):
179class Context(typing.NamedTuple):
180    engine_id: EngineId
181    name: str

Context(engine_id, name)

Context(engine_id: bytes | bytearray | memoryview, name: str)

Create new instance of Context(engine_id, name)

engine_id: bytes | bytearray | memoryview

Alias for field number 0

name: str

Alias for field number 1

class Trap(typing.NamedTuple):
184class Trap(typing.NamedTuple):
185    cause: Cause | None
186    """cause is available in case of v1"""
187    oid: asn1.ObjectIdentifier
188    timestamp: int
189    data: Collection[Data]

Trap(cause, oid, timestamp, data)

Create new instance of Trap(cause, oid, timestamp, data)

cause: Cause | None

cause is available in case of v1

oid: tuple[int, ...]

Alias for field number 1

timestamp: int

Alias for field number 2

class Inform(typing.NamedTuple):
192class Inform(typing.NamedTuple):
193    data: Collection[Data]

Inform(data,)

class GetDataReq(typing.NamedTuple):
196class GetDataReq(typing.NamedTuple):
197    names: Collection[asn1.ObjectIdentifier]

GetDataReq(names,)

GetDataReq(names: Collection[tuple[int, ...]])

Create new instance of GetDataReq(names,)

names: Collection[tuple[int, ...]]

Alias for field number 0

class GetNextDataReq(typing.NamedTuple):
200class GetNextDataReq(typing.NamedTuple):
201    names: Collection[asn1.ObjectIdentifier]

GetNextDataReq(names,)

GetNextDataReq(names: Collection[tuple[int, ...]])

Create new instance of GetNextDataReq(names,)

names: Collection[tuple[int, ...]]

Alias for field number 0

class GetBulkDataReq(typing.NamedTuple):
204class GetBulkDataReq(typing.NamedTuple):
205    names: Collection[asn1.ObjectIdentifier]

GetBulkDataReq(names,)

GetBulkDataReq(names: Collection[tuple[int, ...]])

Create new instance of GetBulkDataReq(names,)

names: Collection[tuple[int, ...]]

Alias for field number 0

class SetDataReq(typing.NamedTuple):
208class SetDataReq(typing.NamedTuple):
209    data: Collection[Data]

SetDataReq(data,)

async def create_agent( local_addr: hat.drivers.udp.Address = Address(host='0.0.0.0', port=161), v1_request_cb: Optional[Callable[[hat.drivers.udp.Address, str, GetDataReq | GetNextDataReq | GetBulkDataReq | SetDataReq], Union[Error, Collection[IntegerData | UnsignedData | CounterData | BigCounterData | StringData | ObjectIdData | IpAddressData | TimeTicksData | ArbitraryData | EmptyData | UnspecifiedData | NoSuchObjectData | NoSuchInstanceData | EndOfMibViewData], Awaitable[Error | Collection[IntegerData | UnsignedData | CounterData | BigCounterData | StringData | ObjectIdData | IpAddressData | TimeTicksData | ArbitraryData | EmptyData | UnspecifiedData | NoSuchObjectData | NoSuchInstanceData | EndOfMibViewData]]]]] = None, v2c_request_cb: Optional[Callable[[hat.drivers.udp.Address, str, GetDataReq | GetNextDataReq | GetBulkDataReq | SetDataReq], Union[Error, Collection[IntegerData | UnsignedData | CounterData | BigCounterData | StringData | ObjectIdData | IpAddressData | TimeTicksData | ArbitraryData | EmptyData | UnspecifiedData | NoSuchObjectData | NoSuchInstanceData | EndOfMibViewData], Awaitable[Error | Collection[IntegerData | UnsignedData | CounterData | BigCounterData | StringData | ObjectIdData | IpAddressData | TimeTicksData | ArbitraryData | EmptyData | UnspecifiedData | NoSuchObjectData | NoSuchInstanceData | EndOfMibViewData]]]]] = None, v3_request_cb: Optional[Callable[[hat.drivers.udp.Address, str, Context, GetDataReq | GetNextDataReq | GetBulkDataReq | SetDataReq], Union[Error, Collection[IntegerData | UnsignedData | CounterData | BigCounterData | StringData | ObjectIdData | IpAddressData | TimeTicksData | ArbitraryData | EmptyData | UnspecifiedData | NoSuchObjectData | NoSuchInstanceData | EndOfMibViewData], Awaitable[Error | Collection[IntegerData | UnsignedData | CounterData | BigCounterData | StringData | ObjectIdData | IpAddressData | TimeTicksData | ArbitraryData | EmptyData | UnspecifiedData | NoSuchObjectData | NoSuchInstanceData | EndOfMibViewData]]]]] = None, authoritative_engine_id: bytes | bytearray | memoryview | None = None, users: Collection[User] = []) -> Agent:
32async def create_agent(local_addr: udp.Address = udp.Address('0.0.0.0', 161),
33                       v1_request_cb: V1RequestCb | None = None,
34                       v2c_request_cb: V2CRequestCb | None = None,
35                       v3_request_cb: V3RequestCb | None = None,
36                       authoritative_engine_id: common.EngineId | None = None,
37                       users: Collection[common.User] = []
38                       ) -> 'Agent':
39    """Create agent"""
40    endpoint = await udp.create(local_addr=local_addr,
41                                remote_addr=None)
42
43    try:
44        return Agent(endpoint=endpoint,
45                     v1_request_cb=v1_request_cb,
46                     v2c_request_cb=v2c_request_cb,
47                     v3_request_cb=v3_request_cb,
48                     authoritative_engine_id=authoritative_engine_id,
49                     users=users)
50
51    except BaseException:
52        await aio.uncancellable(endpoint.async_close())
53        raise

Create agent

class Agent(hat.aio.group.Resource):
 56class Agent(aio.Resource):
 57
 58    def __init__(self,
 59                 endpoint: udp.Endpoint,
 60                 v1_request_cb: V1RequestCb | None,
 61                 v2c_request_cb: V2CRequestCb | None,
 62                 v3_request_cb: V3RequestCb | None,
 63                 authoritative_engine_id: common.EngineId | None,
 64                 users: Collection[common.User] = []):
 65        self._endpoint = endpoint
 66        self._v1_request_cb = v1_request_cb
 67        self._v2c_request_cb = v2c_request_cb
 68        self._v3_request_cb = v3_request_cb
 69        self._auth_engine_id = authoritative_engine_id
 70        self._auth_keys = {}
 71        self._priv_keys = {}
 72
 73        for user in users:
 74            common.validate_user(user)
 75
 76            if user.auth_type:
 77                key_type = key.auth_type_to_key_type(user.auth_type)
 78                self._auth_keys[user.name] = key.create_key(
 79                    key_type=key_type,
 80                    password=user.auth_password,
 81                    engine_id=authoritative_engine_id)
 82
 83            else:
 84                self._auth_keys[user.name] = None
 85
 86            if user.priv_type:
 87                key_type = key.priv_type_to_key_type(user.priv_type)
 88                self._priv_keys[user.name] = key.create_key(
 89                    key_type=key_type,
 90                    password=user.priv_password,
 91                    engine_id=authoritative_engine_id)
 92
 93            else:
 94                self._priv_keys[user.name] = None
 95
 96        self.async_group.spawn(self._receive_loop)
 97
 98    @property
 99    def async_group(self) -> aio.Group:
100        return self._endpoint.async_group
101
102    def _on_auth_key(self, engine_id, username):
103        if engine_id != self._auth_engine_id:
104            raise Exception('invalid authoritative engine id')
105
106        if username not in self._auth_keys:
107            raise Exception('invalid user')
108        return self._auth_keys[username]
109
110    def _on_priv_key(self, engine_id, username):
111        if engine_id != self._auth_engine_id:
112            raise Exception('invalid authoritative engine id')
113
114        if username not in self._priv_keys:
115            raise Exception('invalid user')
116        return self._priv_keys[username]
117
118    async def _receive_loop(self):
119        try:
120            while True:
121                req_msg_bytes, addr = await self._endpoint.receive()
122
123                try:
124                    req_msg = encoder.decode(msg_bytes=req_msg_bytes,
125                                             auth_key_cb=self._on_auth_key,
126                                             priv_key_cb=self._on_priv_key)
127
128                except Exception as e:
129                    mlog.warning("error decoding message from %s: %s",
130                                 addr, e, exc_info=e)
131                    continue
132
133                try:
134                    if isinstance(req_msg, encoder.v1.Msg):
135                        res_msg = await _process_v1_req_msg(
136                            req_msg=req_msg,
137                            addr=addr,
138                            request_cb=self._v1_request_cb)
139
140                    elif isinstance(req_msg, encoder.v2c.Msg):
141                        res_msg = await _process_v2c_req_msg(
142                            req_msg=req_msg,
143                            addr=addr,
144                            request_cb=self._v2c_request_cb)
145
146                    elif isinstance(req_msg, encoder.v3.Msg):
147                        res_msg = await _process_v3_req_msg(
148                            req_msg=req_msg,
149                            addr=addr,
150                            request_cb=self._v3_request_cb,
151                            authoritative_engine_id=self._auth_engine_id,
152                            auth_keys=self._auth_keys,
153                            priv_keys=self._priv_keys)
154
155                    else:
156                        raise ValueError('unsupported message type')
157
158                except Exception as e:
159                    mlog.warning("error processing message from %s: %s",
160                                 addr, e, exc_info=e)
161                    continue
162
163                if not res_msg:
164                    continue
165
166                try:
167                    if isinstance(res_msg, encoder.v3.Msg):
168                        auth_key = (
169                            self._on_auth_key(res_msg.authorative_engine.id,
170                                              res_msg.user)
171                            if res_msg.auth else None)
172                        priv_key = (
173                            self._on_priv_key(res_msg.authorative_engine.id,
174                                              res_msg.user)
175                            if res_msg.priv else None)
176
177                    else:
178                        auth_key = None
179                        priv_key = None
180
181                    res_msg_bytes = encoder.encode(msg=res_msg,
182                                                   auth_key=auth_key,
183                                                   priv_key=priv_key)
184
185                except Exception as e:
186                    mlog.warning("error encoding message: %s", e, exc_info=e)
187                    continue
188
189                self._endpoint.send(res_msg_bytes, addr)
190
191        except ConnectionError:
192            pass
193
194        except Exception as e:
195            mlog.error("receive loop error: %s", e, exc_info=e)
196
197        finally:
198            self.close()

Resource with lifetime control based on Group.

Agent( endpoint: hat.drivers.udp.Endpoint, v1_request_cb: Optional[Callable[[hat.drivers.udp.Address, str, GetDataReq | GetNextDataReq | GetBulkDataReq | SetDataReq], Union[Error, Collection[IntegerData | UnsignedData | CounterData | BigCounterData | StringData | ObjectIdData | IpAddressData | TimeTicksData | ArbitraryData | EmptyData | UnspecifiedData | NoSuchObjectData | NoSuchInstanceData | EndOfMibViewData], Awaitable[Error | Collection[IntegerData | UnsignedData | CounterData | BigCounterData | StringData | ObjectIdData | IpAddressData | TimeTicksData | ArbitraryData | EmptyData | UnspecifiedData | NoSuchObjectData | NoSuchInstanceData | EndOfMibViewData]]]]], v2c_request_cb: Optional[Callable[[hat.drivers.udp.Address, str, GetDataReq | GetNextDataReq | GetBulkDataReq | SetDataReq], Union[Error, Collection[IntegerData | UnsignedData | CounterData | BigCounterData | StringData | ObjectIdData | IpAddressData | TimeTicksData | ArbitraryData | EmptyData | UnspecifiedData | NoSuchObjectData | NoSuchInstanceData | EndOfMibViewData], Awaitable[Error | Collection[IntegerData | UnsignedData | CounterData | BigCounterData | StringData | ObjectIdData | IpAddressData | TimeTicksData | ArbitraryData | EmptyData | UnspecifiedData | NoSuchObjectData | NoSuchInstanceData | EndOfMibViewData]]]]], v3_request_cb: Optional[Callable[[hat.drivers.udp.Address, str, Context, GetDataReq | GetNextDataReq | GetBulkDataReq | SetDataReq], Union[Error, Collection[IntegerData | UnsignedData | CounterData | BigCounterData | StringData | ObjectIdData | IpAddressData | TimeTicksData | ArbitraryData | EmptyData | UnspecifiedData | NoSuchObjectData | NoSuchInstanceData | EndOfMibViewData], Awaitable[Error | Collection[IntegerData | UnsignedData | CounterData | BigCounterData | StringData | ObjectIdData | IpAddressData | TimeTicksData | ArbitraryData | EmptyData | UnspecifiedData | NoSuchObjectData | NoSuchInstanceData | EndOfMibViewData]]]]], authoritative_engine_id: bytes | bytearray | memoryview | None, users: Collection[User] = [])
58    def __init__(self,
59                 endpoint: udp.Endpoint,
60                 v1_request_cb: V1RequestCb | None,
61                 v2c_request_cb: V2CRequestCb | None,
62                 v3_request_cb: V3RequestCb | None,
63                 authoritative_engine_id: common.EngineId | None,
64                 users: Collection[common.User] = []):
65        self._endpoint = endpoint
66        self._v1_request_cb = v1_request_cb
67        self._v2c_request_cb = v2c_request_cb
68        self._v3_request_cb = v3_request_cb
69        self._auth_engine_id = authoritative_engine_id
70        self._auth_keys = {}
71        self._priv_keys = {}
72
73        for user in users:
74            common.validate_user(user)
75
76            if user.auth_type:
77                key_type = key.auth_type_to_key_type(user.auth_type)
78                self._auth_keys[user.name] = key.create_key(
79                    key_type=key_type,
80                    password=user.auth_password,
81                    engine_id=authoritative_engine_id)
82
83            else:
84                self._auth_keys[user.name] = None
85
86            if user.priv_type:
87                key_type = key.priv_type_to_key_type(user.priv_type)
88                self._priv_keys[user.name] = key.create_key(
89                    key_type=key_type,
90                    password=user.priv_password,
91                    engine_id=authoritative_engine_id)
92
93            else:
94                self._priv_keys[user.name] = None
95
96        self.async_group.spawn(self._receive_loop)
async_group: hat.aio.group.Group
 98    @property
 99    def async_group(self) -> aio.Group:
100        return self._endpoint.async_group

Group controlling resource's lifetime.

class Manager(hat.aio.group.Resource):
11class Manager(aio.Resource):
12
13    @abc.abstractmethod
14    async def send(self, req: Request) -> Response:
15        """Send request and wait for response"""

Resource with lifetime control based on Group.

13    @abc.abstractmethod
14    async def send(self, req: Request) -> Response:
15        """Send request and wait for response"""

Send request and wait for response

async def create_v1_manager( remote_addr: hat.drivers.udp.Address, community: str = 'public') -> Manager:
17async def create_v1_manager(remote_addr: udp.Address,
18                            community: common.CommunityName = 'public'
19                            ) -> common.Manager:
20    """Create v1 manager"""
21    endpoint = await udp.create(local_addr=None,
22                                remote_addr=remote_addr)
23
24    try:
25        return V1Manager(endpoint=endpoint,
26                         community=community)
27
28    except BaseException:
29        await aio.uncancellable(endpoint.async_close())
30        raise

Create v1 manager

async def create_v2c_manager( remote_addr: hat.drivers.udp.Address, community: str = 'public') -> Manager:
17async def create_v2c_manager(remote_addr: udp.Address,
18                             community: common.CommunityName = 'public'
19                             ) -> common.Manager:
20    """Create v2c manager"""
21    endpoint = await udp.create(local_addr=None,
22                                remote_addr=remote_addr)
23
24    try:
25        return V2CManager(endpoint=endpoint,
26                          community=community)
27
28    except BaseException:
29        await aio.uncancellable(endpoint.async_close())
30        raise

Create v2c manager

async def create_v3_manager( remote_addr: hat.drivers.udp.Address, context: Context | None = None, user: User = User(name='public', auth_type=None, auth_password=None, priv_type=None, priv_password=None)) -> Manager:
25async def create_v3_manager(remote_addr: udp.Address,
26                            context: common.Context | None = None,
27                            user: common.User = _default_user
28                            ) -> common.Manager:
29    """Create v3 manager"""
30    endpoint = await udp.create(local_addr=None,
31                                remote_addr=remote_addr)
32
33    try:
34        manager = V3Manager(endpoint=endpoint,
35                            context=context,
36                            user=user)
37
38    except BaseException:
39        await aio.uncancellable(endpoint.async_close())
40        raise
41
42    try:
43        await manager.sync()
44
45    except BaseException:
46        await aio.uncancellable(manager.async_close())
47        raise
48
49    return manager

Create v3 manager

V1TrapCb = typing.Callable[[hat.drivers.udp.Address, str, Trap], typing.Optional[typing.Awaitable[NoneType]]]
V2CTrapCb = typing.Callable[[hat.drivers.udp.Address, str, Trap], typing.Optional[typing.Awaitable[NoneType]]]
V2CInformCb = typing.Callable[[hat.drivers.udp.Address, str, Inform], typing.Union[Error, NoneType, typing.Awaitable[Error | None]]]
V3TrapCb = typing.Callable[[hat.drivers.udp.Address, str, Context, Trap], typing.Optional[typing.Awaitable[NoneType]]]
V3InformCb = typing.Callable[[hat.drivers.udp.Address, str, Context, Inform], typing.Union[Error, NoneType, typing.Awaitable[Error | None]]]
async def create_trap_listener( local_addr: hat.drivers.udp.Address = Address(host='0.0.0.0', port=162), v1_trap_cb: Optional[Callable[[hat.drivers.udp.Address, str, Trap], Optional[Awaitable[NoneType]]]] = None, v2c_trap_cb: Optional[Callable[[hat.drivers.udp.Address, str, Trap], Optional[Awaitable[NoneType]]]] = None, v2c_inform_cb: Optional[Callable[[hat.drivers.udp.Address, str, Inform], Union[Error, NoneType, Awaitable[Error | None]]]] = None, v3_trap_cb: Optional[Callable[[hat.drivers.udp.Address, str, Context, Trap], Optional[Awaitable[NoneType]]]] = None, v3_inform_cb: Optional[Callable[[hat.drivers.udp.Address, str, Context, Inform], Union[Error, NoneType, Awaitable[Error | None]]]] = None, users: Collection[User] = []) -> TrapListener:
43async def create_trap_listener(local_addr: udp.Address = udp.Address('0.0.0.0', 162),  # NOQA
44                               v1_trap_cb: V1TrapCb | None = None,
45                               v2c_trap_cb: V2CTrapCb | None = None,
46                               v2c_inform_cb: V2CInformCb | None = None,
47                               v3_trap_cb: V3TrapCb | None = None,
48                               v3_inform_cb: V3InformCb | None = None,
49                               users: Collection[common.User] = []
50                               ) -> 'TrapListener':
51    """Create trap listener"""
52    endpoint = await udp.create(local_addr=local_addr,
53                                remote_addr=None)
54
55    try:
56        return TrapListener(endpoint=endpoint,
57                            v1_trap_cb=v1_trap_cb,
58                            v2c_trap_cb=v2c_trap_cb,
59                            v2c_inform_cb=v2c_inform_cb,
60                            v3_trap_cb=v3_trap_cb,
61                            v3_inform_cb=v3_inform_cb,
62                            users=users)
63
64    except BaseException:
65        await aio.uncancellable(endpoint.async_close())
66        raise

Create trap listener

class TrapListener(hat.aio.group.Resource):
 69class TrapListener(aio.Resource):
 70
 71    def __init__(self,
 72                 endpoint: udp.Endpoint,
 73                 v1_trap_cb: V1TrapCb | None,
 74                 v2c_trap_cb: V2CTrapCb | None,
 75                 v2c_inform_cb: V2CInformCb | None,
 76                 v3_trap_cb: V3TrapCb | None,
 77                 v3_inform_cb: V3InformCb | None,
 78                 users: Collection[common.User]):
 79        self._endpoint = endpoint
 80        self._v1_trap_cb = v1_trap_cb
 81        self._v2c_trap_cb = v2c_trap_cb
 82        self._v2c_inform_cb = v2c_inform_cb
 83        self._v3_trap_cb = v3_trap_cb
 84        self._v3_inform_cb = v3_inform_cb
 85        self._users = {}
 86        self._auth_keys = {}
 87        self._priv_keys = {}
 88
 89        for user in users:
 90            common.validate_user(user)
 91            self._users[user.name] = user
 92
 93        self.async_group.spawn(self._receive_loop)
 94
 95    @property
 96    def async_group(self) -> aio.Group:
 97        """Async group"""
 98        return self._endpoint.async_group
 99
100    def _on_auth_key(self, engine_id, username):
101        user = self._users.get(username)
102        if not user or not user.auth_type:
103            return
104
105        auth_key = self._auth_keys.get((engine_id, username))
106        if auth_key:
107            return auth_key
108
109        key_type = key.auth_type_to_key_type(user.auth_type)
110        auth_key = key.create_key(key_type=key_type,
111                                  password=user.auth_password,
112                                  engine_id=engine_id)
113
114        self._auth_keys[(engine_id, username)] = auth_key
115        return auth_key
116
117    def _on_priv_key(self, engine_id, username):
118        user = self._users.get(username)
119        if not user or not user.priv_type:
120            return
121
122        priv_key = self._priv_keys.get((engine_id, username))
123        if priv_key:
124            return priv_key
125
126        key_type = key.priv_type_to_key_type(user.priv_type)
127        priv_key = key.create_key(key_type=key_type,
128                                  password=user.priv_password,
129                                  engine_id=engine_id)
130
131        self._priv_keys[(engine_id, username)] = priv_key
132        return priv_key
133
134    async def _receive_loop(self):
135        try:
136            while True:
137                req_msg_bytes, addr = await self._endpoint.receive()
138
139                try:
140                    req_msg = encoder.decode(msg_bytes=req_msg_bytes,
141                                             auth_key_cb=self._on_auth_key,
142                                             priv_key_cb=self._on_priv_key)
143
144                except Exception as e:
145                    mlog.warning("error decoding message from %s: %s",
146                                 addr, e, exc_info=e)
147                    continue
148
149                try:
150                    if isinstance(req_msg, encoder.v1.Msg):
151                        res_msg = await _process_v1_req_msg(
152                            req_msg=req_msg,
153                            addr=addr,
154                            trap_cb=self._v1_trap_cb)
155
156                    elif isinstance(req_msg, encoder.v2c.Msg):
157                        res_msg = await _process_v2c_req_msg(
158                            req_msg=req_msg,
159                            addr=addr,
160                            trap_cb=self._v2c_trap_cb,
161                            inform_cb=self._v2c_inform_cb)
162
163                    elif isinstance(req_msg, encoder.v3.Msg):
164                        res_msg = await _process_v3_req_msg(
165                            req_msg=req_msg,
166                            addr=addr,
167                            trap_cb=self._v3_trap_cb,
168                            inform_cb=self._v3_inform_cb)
169
170                    else:
171                        raise ValueError('unsupported message type')
172
173                except Exception as e:
174                    mlog.warning("error processing message from %s: %s",
175                                 addr, e, exc_info=e)
176                    continue
177
178                if not res_msg:
179                    continue
180
181                try:
182                    if isinstance(res_msg, encoder.v3.Msg):
183                        auth_key = (
184                            self._on_auth_key(res_msg.authorative_engine.id,
185                                              res_msg.user)
186                            if res_msg.auth else None)
187                        priv_key = (
188                            self._on_priv_key(res_msg.authorative_engine.id,
189                                              res_msg.user)
190                            if res_msg.priv else None)
191
192                    else:
193                        auth_key = None
194                        priv_key = None
195
196                    res_msg_bytes = encoder.encode(msg=res_msg,
197                                                   auth_key=auth_key,
198                                                   priv_key=priv_key)
199
200                except Exception as e:
201                    mlog.warning("error encoding message: %s", e, exc_info=e)
202                    continue
203
204                self._endpoint.send(res_msg_bytes, addr)
205
206        except ConnectionError:
207            pass
208
209        except Exception as e:
210            mlog.error("receive loop error: %s", e, exc_info=e)
211
212        finally:
213            self.close()

Resource with lifetime control based on Group.

TrapListener( endpoint: hat.drivers.udp.Endpoint, v1_trap_cb: Optional[Callable[[hat.drivers.udp.Address, str, Trap], Optional[Awaitable[NoneType]]]], v2c_trap_cb: Optional[Callable[[hat.drivers.udp.Address, str, Trap], Optional[Awaitable[NoneType]]]], v2c_inform_cb: Optional[Callable[[hat.drivers.udp.Address, str, Inform], Union[Error, NoneType, Awaitable[Error | None]]]], v3_trap_cb: Optional[Callable[[hat.drivers.udp.Address, str, Context, Trap], Optional[Awaitable[NoneType]]]], v3_inform_cb: Optional[Callable[[hat.drivers.udp.Address, str, Context, Inform], Union[Error, NoneType, Awaitable[Error | None]]]], users: Collection[User])
71    def __init__(self,
72                 endpoint: udp.Endpoint,
73                 v1_trap_cb: V1TrapCb | None,
74                 v2c_trap_cb: V2CTrapCb | None,
75                 v2c_inform_cb: V2CInformCb | None,
76                 v3_trap_cb: V3TrapCb | None,
77                 v3_inform_cb: V3InformCb | None,
78                 users: Collection[common.User]):
79        self._endpoint = endpoint
80        self._v1_trap_cb = v1_trap_cb
81        self._v2c_trap_cb = v2c_trap_cb
82        self._v2c_inform_cb = v2c_inform_cb
83        self._v3_trap_cb = v3_trap_cb
84        self._v3_inform_cb = v3_inform_cb
85        self._users = {}
86        self._auth_keys = {}
87        self._priv_keys = {}
88
89        for user in users:
90            common.validate_user(user)
91            self._users[user.name] = user
92
93        self.async_group.spawn(self._receive_loop)
async_group: hat.aio.group.Group
95    @property
96    def async_group(self) -> aio.Group:
97        """Async group"""
98        return self._endpoint.async_group

Async group

class TrapSender(hat.aio.group.Resource):
11class TrapSender(aio.Resource):
12
13    @abc.abstractmethod
14    def send_trap(self, trap: Trap):
15        """Send trap"""
16
17    @abc.abstractmethod
18    async def send_inform(self,
19                          inform: Inform
20                          ) -> Error | None:
21        """Send inform"""

Resource with lifetime control based on Group.

@abc.abstractmethod
def send_trap(self, trap: Trap):
13    @abc.abstractmethod
14    def send_trap(self, trap: Trap):
15        """Send trap"""

Send trap

@abc.abstractmethod
async def send_inform( self, inform: Inform) -> Error | None:
17    @abc.abstractmethod
18    async def send_inform(self,
19                          inform: Inform
20                          ) -> Error | None:
21        """Send inform"""

Send inform

async def create_v1_trap_sender( remote_addr: hat.drivers.udp.Address, community: str = 'public') -> TrapSender:
15async def create_v1_trap_sender(remote_addr: udp.Address,
16                                community: common.CommunityName = 'public'
17                                ) -> common.TrapSender:
18    """Create v1 trap sender"""
19    endpoint = await udp.create(local_addr=None,
20                                remote_addr=remote_addr)
21
22    try:
23        return V1TrapSender(endpoint=endpoint,
24                            community=community)
25
26    except BaseException:
27        await aio.uncancellable(endpoint.async_close())
28        raise

Create v1 trap sender

async def create_v2c_trap_sender( remote_addr: hat.drivers.udp.Address, community: str = 'public') -> TrapSender:
17async def create_v2c_trap_sender(remote_addr: udp.Address,
18                                 community: common.CommunityName = 'public'
19                                 ) -> common.TrapSender:
20    """Create v2c trap sender"""
21    endpoint = await udp.create(local_addr=None,
22                                remote_addr=remote_addr)
23
24    try:
25        return V2CTrapSender(endpoint=endpoint,
26                             community=community)
27
28    except BaseException:
29        await aio.uncancellable(endpoint.async_close())
30        raise

Create v2c trap sender

async def create_v3_trap_sender( remote_addr: hat.drivers.udp.Address, authoritative_engine_id: bytes | bytearray | memoryview, context: Context | None = None, user: User = User(name='public', auth_type=None, auth_password=None, priv_type=None, priv_password=None)) -> TrapSender:
25async def create_v3_trap_sender(remote_addr: udp.Address,
26                                authoritative_engine_id: common.EngineId,
27                                context: common.Context | None = None,
28                                user: common.User = _default_user
29                                ) -> common.TrapSender:
30    """Create v3 trap sender"""
31    endpoint = await udp.create(local_addr=None,
32                                remote_addr=remote_addr)
33
34    try:
35        return V3TrapSender(endpoint=endpoint,
36                            authoritative_engine_id=authoritative_engine_id,
37                            context=context,
38                            user=user)
39
40    except BaseException:
41        await aio.uncancellable(endpoint.async_close())
42        raise

Create v3 trap sender