hat.drivers.snmp

  1from hat.drivers.snmp.common import (Version,
  2                                     ErrorType,
  3                                     CauseType,
  4                                     AuthType,
  5                                     PrivType,
  6                                     Error,
  7                                     Cause,
  8                                     IntegerData,
  9                                     UnsignedData,
 10                                     CounterData,
 11                                     BigCounterData,
 12                                     StringData,
 13                                     ObjectIdData,
 14                                     IpAddressData,
 15                                     TimeTicksData,
 16                                     ArbitraryData,
 17                                     EmptyData,
 18                                     UnspecifiedData,
 19                                     NoSuchObjectData,
 20                                     NoSuchInstanceData,
 21                                     EndOfMibViewData,
 22                                     Data,
 23                                     CommunityName,
 24                                     UserName,
 25                                     Password,
 26                                     EngineId,
 27                                     User,
 28                                     Context,
 29                                     Trap,
 30                                     Inform,
 31                                     GetDataReq,
 32                                     GetNextDataReq,
 33                                     GetBulkDataReq,
 34                                     SetDataReq,
 35                                     Request,
 36                                     Response)
 37from hat.drivers.snmp.agent import (V1RequestCb,
 38                                    V2CRequestCb,
 39                                    V3RequestCb,
 40                                    create_agent,
 41                                    Agent)
 42from hat.drivers.snmp.manager import (Manager,
 43                                      create_v1_manager,
 44                                      create_v2c_manager,
 45                                      create_v3_manager)
 46from hat.drivers.snmp.trap import (V1TrapCb,
 47                                   V2CTrapCb,
 48                                   V2CInformCb,
 49                                   V3TrapCb,
 50                                   V3InformCb,
 51                                   create_trap_listener,
 52                                   TrapListener,
 53                                   TrapSender,
 54                                   create_v1_trap_sender,
 55                                   create_v2c_trap_sender,
 56                                   create_v3_trap_sender)
 57
 58
 59__all__ = ['Version',
 60           'ErrorType',
 61           'CauseType',
 62           'AuthType',
 63           'PrivType',
 64           'Error',
 65           'Cause',
 66           'IntegerData',
 67           'UnsignedData',
 68           'CounterData',
 69           'BigCounterData',
 70           'StringData',
 71           'ObjectIdData',
 72           'IpAddressData',
 73           'TimeTicksData',
 74           'ArbitraryData',
 75           'EmptyData',
 76           'UnspecifiedData',
 77           'NoSuchObjectData',
 78           'NoSuchInstanceData',
 79           'EndOfMibViewData',
 80           'Data',
 81           'CommunityName',
 82           'UserName',
 83           'Password',
 84           'EngineId',
 85           'User',
 86           'Context',
 87           'Trap',
 88           'Inform',
 89           'GetDataReq',
 90           'GetNextDataReq',
 91           'GetBulkDataReq',
 92           'SetDataReq',
 93           'Request',
 94           'Response',
 95           'V1RequestCb',
 96           'V2CRequestCb',
 97           'V3RequestCb',
 98           'create_agent',
 99           'Agent',
100           'Manager',
101           'create_v1_manager',
102           'create_v2c_manager',
103           'create_v3_manager',
104           'V1TrapCb',
105           'V2CTrapCb',
106           'V2CInformCb',
107           'V3TrapCb',
108           'V3InformCb',
109           'create_trap_listener',
110           'TrapListener',
111           'TrapSender',
112           'create_v1_trap_sender',
113           'create_v2c_trap_sender',
114           'create_v3_trap_sender']
class Version(enum.Enum):
12class Version(enum.Enum):
13    V1 = 0
14    V2C = 1
15    V3 = 3
V1 = <Version.V1: 0>
V2C = <Version.V2C: 1>
V3 = <Version.V3: 3>
class ErrorType(enum.Enum):
18class ErrorType(enum.Enum):
19    NO_ERROR = 0                 # v1, v2c, v3
20    TOO_BIG = 1                  # v1, v2c, v3
21    NO_SUCH_NAME = 2             # v1, v2c, v3
22    BAD_VALUE = 3                # v1, v2c, v3
23    READ_ONLY = 4                # v1, v2c, v3
24    GEN_ERR = 5                  # v1, v2c, v3
25    NO_ACCESS = 6                # v2c, v3
26    WRONG_TYPE = 7               # v2c, v3
27    WRONG_LENGTH = 8             # v2c, v3
28    WRONG_ENCODING = 9           # v2c, v3
29    WRONG_VALUE = 10             # v2c, v3
30    NO_CREATION = 11             # v2c, v3
31    INCONSISTENT_VALUE = 12      # v2c, v3
32    RESOURCE_UNAVAILABLE = 13    # v2c, v3
33    COMMIT_FAILED = 14           # v2c, v3
34    UNDO_FAILED = 15             # v2c, v3
35    AUTHORIZATION_ERROR = 16     # v2c, v3
36    NOT_WRITABLE = 17            # v2c, v3
37    INCONSISTENT_NAME = 18       # v2c, v3
NO_ERROR = <ErrorType.NO_ERROR: 0>
TOO_BIG = <ErrorType.TOO_BIG: 1>
NO_SUCH_NAME = <ErrorType.NO_SUCH_NAME: 2>
BAD_VALUE = <ErrorType.BAD_VALUE: 3>
READ_ONLY = <ErrorType.READ_ONLY: 4>
GEN_ERR = <ErrorType.GEN_ERR: 5>
NO_ACCESS = <ErrorType.NO_ACCESS: 6>
WRONG_TYPE = <ErrorType.WRONG_TYPE: 7>
WRONG_LENGTH = <ErrorType.WRONG_LENGTH: 8>
WRONG_ENCODING = <ErrorType.WRONG_ENCODING: 9>
WRONG_VALUE = <ErrorType.WRONG_VALUE: 10>
NO_CREATION = <ErrorType.NO_CREATION: 11>
INCONSISTENT_VALUE = <ErrorType.INCONSISTENT_VALUE: 12>
RESOURCE_UNAVAILABLE = <ErrorType.RESOURCE_UNAVAILABLE: 13>
COMMIT_FAILED = <ErrorType.COMMIT_FAILED: 14>
UNDO_FAILED = <ErrorType.UNDO_FAILED: 15>
AUTHORIZATION_ERROR = <ErrorType.AUTHORIZATION_ERROR: 16>
NOT_WRITABLE = <ErrorType.NOT_WRITABLE: 17>
INCONSISTENT_NAME = <ErrorType.INCONSISTENT_NAME: 18>
class CauseType(enum.Enum):
40class CauseType(enum.Enum):
41    COLD_START = 0
42    WARM_START = 1
43    LINK_DOWN = 2
44    LINK_UP = 3
45    AUTHENICATION_FAILURE = 4
46    EGP_NEIGHBOR_LOSS = 5
47    ENTERPRISE_SPECIFIC = 6
COLD_START = <CauseType.COLD_START: 0>
WARM_START = <CauseType.WARM_START: 1>
AUTHENICATION_FAILURE = <CauseType.AUTHENICATION_FAILURE: 4>
EGP_NEIGHBOR_LOSS = <CauseType.EGP_NEIGHBOR_LOSS: 5>
ENTERPRISE_SPECIFIC = <CauseType.ENTERPRISE_SPECIFIC: 6>
class AuthType(enum.Enum):
50class AuthType(enum.Enum):
51    MD5 = 1
52    SHA = 2
MD5 = <AuthType.MD5: 1>
SHA = <AuthType.SHA: 2>
class PrivType(enum.Enum):
55class PrivType(enum.Enum):
56    DES = 1
DES = <PrivType.DES: 1>
class Error(typing.NamedTuple):
59class Error(typing.NamedTuple):
60    type: ErrorType
61    index: int

Error(type, index)

Error(type: ErrorType, index: int)

Create new instance of Error(type, index)

type: ErrorType

Alias for field number 0

index: int

Alias for field number 1

class Cause(typing.NamedTuple):
64class Cause(typing.NamedTuple):
65    type: CauseType
66    value: int

Cause(type, value)

Cause(type: CauseType, value: int)

Create new instance of Cause(type, value)

type: CauseType

Alias for field number 0

value: int

Alias for field number 1

class IntegerData(typing.NamedTuple):
70class IntegerData(typing.NamedTuple):
71    name: asn1.ObjectIdentifier
72    value: int

IntegerData(name, value)

IntegerData(name: tuple[int, ...], value: int)

Create new instance of IntegerData(name, value)

name: tuple[int, ...]

Alias for field number 0

value: int

Alias for field number 1

class UnsignedData(typing.NamedTuple):
76class UnsignedData(typing.NamedTuple):
77    name: asn1.ObjectIdentifier
78    value: int

UnsignedData(name, value)

UnsignedData(name: tuple[int, ...], value: int)

Create new instance of UnsignedData(name, value)

name: tuple[int, ...]

Alias for field number 0

value: int

Alias for field number 1

class CounterData(typing.NamedTuple):
82class CounterData(typing.NamedTuple):
83    name: asn1.ObjectIdentifier
84    value: int

CounterData(name, value)

CounterData(name: tuple[int, ...], value: int)

Create new instance of CounterData(name, value)

name: tuple[int, ...]

Alias for field number 0

value: int

Alias for field number 1

class BigCounterData(typing.NamedTuple):
88class BigCounterData(typing.NamedTuple):
89    name: asn1.ObjectIdentifier
90    value: int

BigCounterData(name, value)

BigCounterData(name: tuple[int, ...], value: int)

Create new instance of BigCounterData(name, value)

name: tuple[int, ...]

Alias for field number 0

value: int

Alias for field number 1

class StringData(typing.NamedTuple):
94class StringData(typing.NamedTuple):
95    name: asn1.ObjectIdentifier
96    value: util.Bytes

StringData(name, value)

StringData(name: tuple[int, ...], value: bytes | bytearray | memoryview)

Create new instance of StringData(name, value)

name: tuple[int, ...]

Alias for field number 0

value: bytes | bytearray | memoryview

Alias for field number 1

class ObjectIdData(typing.NamedTuple):
100class ObjectIdData(typing.NamedTuple):
101    name: asn1.ObjectIdentifier
102    value: asn1.ObjectIdentifier

ObjectIdData(name, value)

ObjectIdData(name: tuple[int, ...], value: tuple[int, ...])

Create new instance of ObjectIdData(name, value)

name: tuple[int, ...]

Alias for field number 0

value: tuple[int, ...]

Alias for field number 1

class IpAddressData(typing.NamedTuple):
106class IpAddressData(typing.NamedTuple):
107    name: asn1.ObjectIdentifier
108    value: tuple[int, int, int, int]

IpAddressData(name, value)

IpAddressData(name: tuple[int, ...], value: tuple[int, int, int, int])

Create new instance of IpAddressData(name, value)

name: tuple[int, ...]

Alias for field number 0

value: tuple[int, int, int, int]

Alias for field number 1

class TimeTicksData(typing.NamedTuple):
112class TimeTicksData(typing.NamedTuple):
113    name: asn1.ObjectIdentifier
114    value: int

TimeTicksData(name, value)

TimeTicksData(name: tuple[int, ...], value: int)

Create new instance of TimeTicksData(name, value)

name: tuple[int, ...]

Alias for field number 0

value: int

Alias for field number 1

class ArbitraryData(typing.NamedTuple):
118class ArbitraryData(typing.NamedTuple):
119    name: asn1.ObjectIdentifier
120    value: util.Bytes

ArbitraryData(name, value)

ArbitraryData(name: tuple[int, ...], value: bytes | bytearray | memoryview)

Create new instance of ArbitraryData(name, value)

name: tuple[int, ...]

Alias for field number 0

value: bytes | bytearray | memoryview

Alias for field number 1

class EmptyData(typing.NamedTuple):
124class EmptyData(typing.NamedTuple):
125    name: asn1.ObjectIdentifier

EmptyData(name,)

EmptyData(name: tuple[int, ...])

Create new instance of EmptyData(name,)

name: tuple[int, ...]

Alias for field number 0

class UnspecifiedData(typing.NamedTuple):
129class UnspecifiedData(typing.NamedTuple):
130    name: asn1.ObjectIdentifier

UnspecifiedData(name,)

UnspecifiedData(name: tuple[int, ...])

Create new instance of UnspecifiedData(name,)

name: tuple[int, ...]

Alias for field number 0

class NoSuchObjectData(typing.NamedTuple):
134class NoSuchObjectData(typing.NamedTuple):
135    name: asn1.ObjectIdentifier

NoSuchObjectData(name,)

NoSuchObjectData(name: tuple[int, ...])

Create new instance of NoSuchObjectData(name,)

name: tuple[int, ...]

Alias for field number 0

class NoSuchInstanceData(typing.NamedTuple):
139class NoSuchInstanceData(typing.NamedTuple):
140    name: asn1.ObjectIdentifier

NoSuchInstanceData(name,)

NoSuchInstanceData(name: tuple[int, ...])

Create new instance of NoSuchInstanceData(name,)

name: tuple[int, ...]

Alias for field number 0

class EndOfMibViewData(typing.NamedTuple):
144class EndOfMibViewData(typing.NamedTuple):
145    name: asn1.ObjectIdentifier

EndOfMibViewData(name,)

EndOfMibViewData(name: tuple[int, ...])

Create new instance of EndOfMibViewData(name,)

name: tuple[int, ...]

Alias for field number 0

CommunityName = <class 'str'>
UserName = <class 'str'>
Password = <class 'str'>
EngineId = bytes | bytearray | memoryview
class User(typing.NamedTuple):
173class User(typing.NamedTuple):
174    name: UserName
175    auth_type: AuthType | None
176    auth_password: Password | None
177    priv_type: PrivType | None
178    priv_password: Password | None

User(name, auth_type, auth_password, priv_type, priv_password)

User( name: str, auth_type: AuthType | None, auth_password: str | None, priv_type: PrivType | None, priv_password: str | None)

Create new instance of User(name, auth_type, auth_password, priv_type, priv_password)

name: str

Alias for field number 0

auth_type: AuthType | None

Alias for field number 1

auth_password: str | None

Alias for field number 2

priv_type: PrivType | None

Alias for field number 3

priv_password: str | None

Alias for field number 4

class Context(typing.NamedTuple):
181class Context(typing.NamedTuple):
182    engine_id: EngineId
183    name: str

Context(engine_id, name)

Context(engine_id: bytes | bytearray | memoryview, name: str)

Create new instance of Context(engine_id, name)

engine_id: bytes | bytearray | memoryview

Alias for field number 0

name: str

Alias for field number 1

class Trap(typing.NamedTuple):
186class Trap(typing.NamedTuple):
187    cause: Cause | None
188    """cause is available in case of v1"""
189    oid: asn1.ObjectIdentifier
190    timestamp: int
191    data: Collection[Data]

Trap(cause, oid, timestamp, data)

Create new instance of Trap(cause, oid, timestamp, data)

cause: Cause | None

cause is available in case of v1

oid: tuple[int, ...]

Alias for field number 1

timestamp: int

Alias for field number 2

class Inform(typing.NamedTuple):
194class Inform(typing.NamedTuple):
195    data: Collection[Data]

Inform(data,)

class GetDataReq(typing.NamedTuple):
198class GetDataReq(typing.NamedTuple):
199    names: Collection[asn1.ObjectIdentifier]

GetDataReq(names,)

GetDataReq(names: Collection[tuple[int, ...]])

Create new instance of GetDataReq(names,)

names: Collection[tuple[int, ...]]

Alias for field number 0

class GetNextDataReq(typing.NamedTuple):
202class GetNextDataReq(typing.NamedTuple):
203    names: Collection[asn1.ObjectIdentifier]

GetNextDataReq(names,)

GetNextDataReq(names: Collection[tuple[int, ...]])

Create new instance of GetNextDataReq(names,)

names: Collection[tuple[int, ...]]

Alias for field number 0

class GetBulkDataReq(typing.NamedTuple):
206class GetBulkDataReq(typing.NamedTuple):
207    names: Collection[asn1.ObjectIdentifier]

GetBulkDataReq(names,)

GetBulkDataReq(names: Collection[tuple[int, ...]])

Create new instance of GetBulkDataReq(names,)

names: Collection[tuple[int, ...]]

Alias for field number 0

class SetDataReq(typing.NamedTuple):
210class SetDataReq(typing.NamedTuple):
211    data: Collection[Data]

SetDataReq(data,)

async def create_agent( local_addr: hat.drivers.udp.Address = Address(host='0.0.0.0', port=161), *, v1_request_cb: Optional[Callable[[hat.drivers.udp.Address, str, GetDataReq | GetNextDataReq | GetBulkDataReq | SetDataReq], Error | Collection[IntegerData | UnsignedData | CounterData | BigCounterData | StringData | ObjectIdData | IpAddressData | TimeTicksData | ArbitraryData | EmptyData | UnspecifiedData | NoSuchObjectData | NoSuchInstanceData | EndOfMibViewData] | Awaitable[Error | Collection[IntegerData | UnsignedData | CounterData | BigCounterData | StringData | ObjectIdData | IpAddressData | TimeTicksData | ArbitraryData | EmptyData | UnspecifiedData | NoSuchObjectData | NoSuchInstanceData | EndOfMibViewData]]]] = None, v2c_request_cb: Optional[Callable[[hat.drivers.udp.Address, str, GetDataReq | GetNextDataReq | GetBulkDataReq | SetDataReq], Error | Collection[IntegerData | UnsignedData | CounterData | BigCounterData | StringData | ObjectIdData | IpAddressData | TimeTicksData | ArbitraryData | EmptyData | UnspecifiedData | NoSuchObjectData | NoSuchInstanceData | EndOfMibViewData] | Awaitable[Error | Collection[IntegerData | UnsignedData | CounterData | BigCounterData | StringData | ObjectIdData | IpAddressData | TimeTicksData | ArbitraryData | EmptyData | UnspecifiedData | NoSuchObjectData | NoSuchInstanceData | EndOfMibViewData]]]] = None, v3_request_cb: Optional[Callable[[hat.drivers.udp.Address, str, Context, GetDataReq | GetNextDataReq | GetBulkDataReq | SetDataReq], Error | Collection[IntegerData | UnsignedData | CounterData | BigCounterData | StringData | ObjectIdData | IpAddressData | TimeTicksData | ArbitraryData | EmptyData | UnspecifiedData | NoSuchObjectData | NoSuchInstanceData | EndOfMibViewData] | Awaitable[Error | Collection[IntegerData | UnsignedData | CounterData | BigCounterData | StringData | ObjectIdData | IpAddressData | TimeTicksData | ArbitraryData | EmptyData | UnspecifiedData | NoSuchObjectData | NoSuchInstanceData | EndOfMibViewData]]]] = None, authoritative_engine_id: bytes | bytearray | memoryview | None = None, users: Collection[User] = [], **kwargs) -> Agent:
33async def create_agent(local_addr: udp.Address = udp.Address('0.0.0.0', 161),
34                       *,
35                       v1_request_cb: V1RequestCb | None = None,
36                       v2c_request_cb: V2CRequestCb | None = None,
37                       v3_request_cb: V3RequestCb | None = None,
38                       authoritative_engine_id: common.EngineId | None = None,
39                       users: Collection[common.User] = [],
40                       **kwargs
41                       ) -> 'Agent':
42    """Create agent"""
43    endpoint = await udp.create(local_addr=local_addr,
44                                remote_addr=None,
45                                **kwargs)
46
47    try:
48        return Agent(endpoint=endpoint,
49                     v1_request_cb=v1_request_cb,
50                     v2c_request_cb=v2c_request_cb,
51                     v3_request_cb=v3_request_cb,
52                     authoritative_engine_id=authoritative_engine_id,
53                     users=users)
54
55    except BaseException:
56        await aio.uncancellable(endpoint.async_close())
57        raise

Create agent

class Agent(hat.aio.group.Resource):
 60class Agent(aio.Resource):
 61
 62    def __init__(self,
 63                 endpoint: udp.Endpoint,
 64                 v1_request_cb: V1RequestCb | None,
 65                 v2c_request_cb: V2CRequestCb | None,
 66                 v3_request_cb: V3RequestCb | None,
 67                 authoritative_engine_id: common.EngineId | None,
 68                 users: Collection[common.User]):
 69        self._endpoint = endpoint
 70        self._v1_request_cb = v1_request_cb
 71        self._v2c_request_cb = v2c_request_cb
 72        self._v3_request_cb = v3_request_cb
 73        self._auth_engine_id = authoritative_engine_id
 74        self._auth_keys = {}
 75        self._priv_keys = {}
 76        self._log = logger.create_logger(mlog, 'SnmpAgent', endpoint.info)
 77        self._comm_log = logger.CommunicationLogger(mlog, 'SnmpAgent',
 78                                                    endpoint.info)
 79
 80        for user in users:
 81            common.validate_user(user)
 82
 83            if user.auth_type:
 84                key_type = key.auth_type_to_key_type(user.auth_type)
 85                self._auth_keys[user.name] = key.create_key(
 86                    key_type=key_type,
 87                    password=user.auth_password,
 88                    engine_id=authoritative_engine_id)
 89
 90            else:
 91                self._auth_keys[user.name] = None
 92
 93            if user.priv_type:
 94                key_type = key.priv_type_to_key_type(user.priv_type)
 95                self._priv_keys[user.name] = key.create_key(
 96                    key_type=key_type,
 97                    password=user.priv_password,
 98                    engine_id=authoritative_engine_id)
 99
100            else:
101                self._priv_keys[user.name] = None
102
103        self.async_group.spawn(self._receive_loop)
104
105        self.async_group.spawn(aio.call_on_cancel, self._comm_log.log,
106                               common.CommLogAction.CLOSE)
107        self._comm_log.log(common.CommLogAction.OPEN)
108
109    @property
110    def async_group(self) -> aio.Group:
111        return self._endpoint.async_group
112
113    def _on_auth_key(self, engine_id, username):
114        if engine_id != self._auth_engine_id:
115            raise Exception('invalid authoritative engine id')
116
117        if username not in self._auth_keys:
118            raise Exception('invalid user')
119        return self._auth_keys[username]
120
121    def _on_priv_key(self, engine_id, username):
122        if engine_id != self._auth_engine_id:
123            raise Exception('invalid authoritative engine id')
124
125        if username not in self._priv_keys:
126            raise Exception('invalid user')
127        return self._priv_keys[username]
128
129    async def _receive_loop(self):
130        try:
131            while True:
132                req_msg_bytes, addr = await self._endpoint.receive()
133
134                try:
135                    req_msg = encoder.decode(msg_bytes=req_msg_bytes,
136                                             auth_key_cb=self._on_auth_key,
137                                             priv_key_cb=self._on_priv_key)
138
139                except Exception as e:
140                    self._log.warning("error decoding message from %s: %s",
141                                      addr, e, exc_info=e)
142                    continue
143
144                self._comm_log.log(common.CommLogAction.RECEIVE, req_msg)
145
146                try:
147                    if isinstance(req_msg, encoder.v1.Msg):
148                        res_msg = await self._process_v1_req_msg(
149                            req_msg=req_msg,
150                            addr=addr)
151
152                    elif isinstance(req_msg, encoder.v2c.Msg):
153                        res_msg = await self._process_v2c_req_msg(
154                            req_msg=req_msg,
155                            addr=addr)
156
157                    elif isinstance(req_msg, encoder.v3.Msg):
158                        res_msg = await self._process_v3_req_msg(
159                            req_msg=req_msg,
160                            addr=addr)
161
162                    else:
163                        raise ValueError('unsupported message type')
164
165                except Exception as e:
166                    self._log.warning("error processing message from %s: %s",
167                                      addr, e, exc_info=e)
168                    continue
169
170                if not res_msg:
171                    continue
172
173                try:
174                    if isinstance(res_msg, encoder.v3.Msg):
175                        auth_key = (
176                            self._on_auth_key(res_msg.authorative_engine.id,
177                                              res_msg.user)
178                            if res_msg.auth else None)
179                        priv_key = (
180                            self._on_priv_key(res_msg.authorative_engine.id,
181                                              res_msg.user)
182                            if res_msg.priv else None)
183
184                    else:
185                        auth_key = None
186                        priv_key = None
187
188                    res_msg_bytes = encoder.encode(msg=res_msg,
189                                                   auth_key=auth_key,
190                                                   priv_key=priv_key)
191
192                except Exception as e:
193                    self._log.warning("error encoding message: %s",
194                                      e, exc_info=e)
195                    continue
196
197                self._comm_log.log(common.CommLogAction.SEND, res_msg)
198
199                self._endpoint.send(res_msg_bytes, addr)
200
201        except ConnectionError:
202            pass
203
204        except Exception as e:
205            self._log.error("receive loop error: %s", e, exc_info=e)
206
207        finally:
208            self.close()
209
210    async def _process_v1_req_msg(self, req_msg, addr):
211        if not self._v1_request_cb:
212            raise Exception('not accepting V1')
213
214        if req_msg.type == encoder.v1.MsgType.GET_REQUEST:
215            req = common.GetDataReq(names=[i.name for i in req_msg.pdu.data])
216
217        elif req_msg.type == encoder.v1.MsgType.GET_NEXT_REQUEST:
218            req = common.GetNextDataReq(
219                names=[i.name for i in req_msg.pdu.data])
220
221        elif req_msg.type == encoder.v1.MsgType.SET_REQUEST:
222            req = common.SetDataReq(data=req_msg.pdu.data)
223
224        else:
225            raise Exception('invalid request message type')
226
227        try:
228            res = await aio.call(self._v1_request_cb, addr, req_msg.community,
229                                 req)
230
231            if isinstance(res, common.Error):
232                if res.type.value > common.ErrorType.GEN_ERR.value:
233                    raise Exception('invalid error type')
234
235                res_error = res
236                res_data = []
237
238            else:
239                res_error = common.Error(common.ErrorType.NO_ERROR, 0)
240                res_data = res
241
242        except Exception as e:
243            self._log.warning("error processing request: %s", e, exc_info=e)
244
245            res_error = common.Error(common.ErrorType.GEN_ERR, 0)
246            res_data = []
247
248        res_pdu = encoder.v1.BasicPdu(
249            request_id=req_msg.pdu.request_id,
250            error=res_error,
251            data=res_data)
252
253        res_msg = encoder.v1.Msg(
254            type=encoder.v1.MsgType.GET_RESPONSE,
255            community=req_msg.community,
256            pdu=res_pdu)
257
258        return res_msg
259
260    async def _process_v2c_req_msg(self, req_msg, addr):
261        if not self._v2c_request_cb:
262            raise Exception('not accepting V2C')
263
264        if req_msg.type == encoder.v2c.MsgType.GET_REQUEST:
265            req = common.GetDataReq(names=[i.name for i in req_msg.pdu.data])
266
267        elif req_msg.type == encoder.v2c.MsgType.GET_NEXT_REQUEST:
268            req = common.GetNextDataReq(
269                names=[i.name for i in req_msg.pdu.data])
270
271        elif req_msg.type == encoder.v2c.MsgType.GET_BULK_REQUEST:
272            req = common.GetBulkDataReq(
273                names=[i.name for i in req_msg.pdu.data])
274
275        elif req_msg.type == encoder.v2c.MsgType.SET_REQUEST:
276            req = common.SetDataReq(data=req_msg.pdu.data)
277
278        else:
279            raise Exception('invalid request message type')
280
281        try:
282            res = await aio.call(self._v2c_request_cb, addr, req_msg.community,
283                                 req)
284
285            if isinstance(res, common.Error):
286                res_error = res
287                res_data = []
288
289            else:
290                res_error = common.Error(common.ErrorType.NO_ERROR, 0)
291                res_data = res
292
293        except Exception as e:
294            self._log.warning("error processing request: %s", e, exc_info=e)
295
296            res_error = common.Error(common.ErrorType.GEN_ERR, 0)
297            res_data = []
298
299        res_pdu = encoder.v2c.BasicPdu(
300            request_id=req_msg.pdu.request_id,
301            error=res_error,
302            data=res_data)
303
304        res_msg = encoder.v2c.Msg(
305            type=encoder.v2c.MsgType.RESPONSE,
306            community=req_msg.community,
307            pdu=res_pdu)
308
309        return res_msg
310
311    async def _process_v3_req_msg(self, req_msg, addr):
312        if not self._v3_request_cb or self._auth_engine_id is None:
313            raise Exception('not accepting V3')
314
315        if req_msg.type == encoder.v3.MsgType.GET_REQUEST:
316            req = common.GetDataReq(names=[i.name for i in req_msg.pdu.data])
317
318        elif req_msg.type == encoder.v3.MsgType.GET_NEXT_REQUEST:
319            req = common.GetNextDataReq(
320                names=[i.name for i in req_msg.pdu.data])
321
322        elif req_msg.type == encoder.v3.MsgType.GET_BULK_REQUEST:
323            req = common.GetBulkDataReq(
324                names=[i.name for i in req_msg.pdu.data])
325
326        elif req_msg.type == encoder.v3.MsgType.SET_REQUEST:
327            req = common.SetDataReq(data=req_msg.pdu.data)
328
329        else:
330            raise Exception('invalid request message type')
331
332        if req_msg.authorative_engine.id != self._auth_engine_id:
333            if req_msg.reportable:
334
335                # TODO report data and conditions for sending reports
336
337                authorative_engine = encoder.v3.AuthorativeEngine(
338                    id=self._auth_engine_id,
339                    boots=0,
340                    time=round(time.monotonic()))
341
342                res_pdu = encoder.v3.BasicPdu(
343                    request_id=req_msg.pdu.request_id,
344                    error=common.Error(common.ErrorType.NO_ERROR, 0),
345                    data=[])
346
347                res_msg = encoder.v3.Msg(
348                    type=encoder.v3.MsgType.REPORT,
349                    id=req_msg.id,
350                    reportable=False,
351                    auth=False,
352                    priv=False,
353                    authorative_engine=authorative_engine,
354                    user='',
355                    context=req_msg.context,
356                    pdu=res_pdu)
357
358                return res_msg
359
360            raise Exception('invalid authoritative engine id')
361
362        # TODO check authoritative engine boot and time
363
364        if (req_msg.user not in self._auth_keys or
365                req_msg.user not in self._priv_keys):
366            raise Exception('invalid user')
367
368        if self._auth_keys[req_msg.user] is not None and not req_msg.auth:
369            raise Exception('invalid auth flag')
370
371        if self._priv_keys[req_msg.user] is not None and not req_msg.priv:
372            raise Exception('invalid priv flag')
373
374        try:
375            res = await aio.call(self._v3_request_cb, addr, req_msg.user,
376                                 req_msg.context, req)
377
378            if isinstance(res, common.Error):
379                res_error = res
380                res_data = []
381
382            else:
383                res_error = common.Error(common.ErrorType.NO_ERROR, 0)
384                res_data = res
385
386        except Exception as e:
387            self._log.warning("error processing request: %s", e, exc_info=e)
388
389            res_error = common.Error(common.ErrorType.GEN_ERR, 0)
390            res_data = []
391
392        authorative_engine = encoder.v3.AuthorativeEngine(
393            id=req_msg.authorative_engine.id,
394            boots=0,
395            time=round(time.monotonic()))
396
397        res_pdu = encoder.v3.BasicPdu(
398            request_id=req_msg.pdu.request_id,
399            error=res_error,
400            data=res_data)
401
402        # TODO can we reuse request id for res msg id
403
404        res_msg = encoder.v3.Msg(
405            type=encoder.v3.MsgType.RESPONSE,
406            id=req_msg.id,
407            reportable=False,
408            auth=req_msg.auth,
409            priv=req_msg.priv,
410            authorative_engine=authorative_engine,
411            user=req_msg.user,
412            context=req_msg.context,
413            pdu=res_pdu)
414
415        return res_msg

Resource with lifetime control based on Group.

Agent( endpoint: hat.drivers.udp.Endpoint, v1_request_cb: Optional[Callable[[hat.drivers.udp.Address, str, GetDataReq | GetNextDataReq | GetBulkDataReq | SetDataReq], Error | Collection[IntegerData | UnsignedData | CounterData | BigCounterData | StringData | ObjectIdData | IpAddressData | TimeTicksData | ArbitraryData | EmptyData | UnspecifiedData | NoSuchObjectData | NoSuchInstanceData | EndOfMibViewData] | Awaitable[Error | Collection[IntegerData | UnsignedData | CounterData | BigCounterData | StringData | ObjectIdData | IpAddressData | TimeTicksData | ArbitraryData | EmptyData | UnspecifiedData | NoSuchObjectData | NoSuchInstanceData | EndOfMibViewData]]]], v2c_request_cb: Optional[Callable[[hat.drivers.udp.Address, str, GetDataReq | GetNextDataReq | GetBulkDataReq | SetDataReq], Error | Collection[IntegerData | UnsignedData | CounterData | BigCounterData | StringData | ObjectIdData | IpAddressData | TimeTicksData | ArbitraryData | EmptyData | UnspecifiedData | NoSuchObjectData | NoSuchInstanceData | EndOfMibViewData] | Awaitable[Error | Collection[IntegerData | UnsignedData | CounterData | BigCounterData | StringData | ObjectIdData | IpAddressData | TimeTicksData | ArbitraryData | EmptyData | UnspecifiedData | NoSuchObjectData | NoSuchInstanceData | EndOfMibViewData]]]], v3_request_cb: Optional[Callable[[hat.drivers.udp.Address, str, Context, GetDataReq | GetNextDataReq | GetBulkDataReq | SetDataReq], Error | Collection[IntegerData | UnsignedData | CounterData | BigCounterData | StringData | ObjectIdData | IpAddressData | TimeTicksData | ArbitraryData | EmptyData | UnspecifiedData | NoSuchObjectData | NoSuchInstanceData | EndOfMibViewData] | Awaitable[Error | Collection[IntegerData | UnsignedData | CounterData | BigCounterData | StringData | ObjectIdData | IpAddressData | TimeTicksData | ArbitraryData | EmptyData | UnspecifiedData | NoSuchObjectData | NoSuchInstanceData | EndOfMibViewData]]]], authoritative_engine_id: bytes | bytearray | memoryview | None, users: Collection[User])
 62    def __init__(self,
 63                 endpoint: udp.Endpoint,
 64                 v1_request_cb: V1RequestCb | None,
 65                 v2c_request_cb: V2CRequestCb | None,
 66                 v3_request_cb: V3RequestCb | None,
 67                 authoritative_engine_id: common.EngineId | None,
 68                 users: Collection[common.User]):
 69        self._endpoint = endpoint
 70        self._v1_request_cb = v1_request_cb
 71        self._v2c_request_cb = v2c_request_cb
 72        self._v3_request_cb = v3_request_cb
 73        self._auth_engine_id = authoritative_engine_id
 74        self._auth_keys = {}
 75        self._priv_keys = {}
 76        self._log = logger.create_logger(mlog, 'SnmpAgent', endpoint.info)
 77        self._comm_log = logger.CommunicationLogger(mlog, 'SnmpAgent',
 78                                                    endpoint.info)
 79
 80        for user in users:
 81            common.validate_user(user)
 82
 83            if user.auth_type:
 84                key_type = key.auth_type_to_key_type(user.auth_type)
 85                self._auth_keys[user.name] = key.create_key(
 86                    key_type=key_type,
 87                    password=user.auth_password,
 88                    engine_id=authoritative_engine_id)
 89
 90            else:
 91                self._auth_keys[user.name] = None
 92
 93            if user.priv_type:
 94                key_type = key.priv_type_to_key_type(user.priv_type)
 95                self._priv_keys[user.name] = key.create_key(
 96                    key_type=key_type,
 97                    password=user.priv_password,
 98                    engine_id=authoritative_engine_id)
 99
100            else:
101                self._priv_keys[user.name] = None
102
103        self.async_group.spawn(self._receive_loop)
104
105        self.async_group.spawn(aio.call_on_cancel, self._comm_log.log,
106                               common.CommLogAction.CLOSE)
107        self._comm_log.log(common.CommLogAction.OPEN)
async_group: hat.aio.group.Group
109    @property
110    def async_group(self) -> aio.Group:
111        return self._endpoint.async_group

Group controlling resource's lifetime.

class Manager(hat.aio.group.Resource):
11class Manager(aio.Resource):
12
13    @abc.abstractmethod
14    async def send(self, req: Request) -> Response:
15        """Send request and wait for response"""

Resource with lifetime control based on Group.

13    @abc.abstractmethod
14    async def send(self, req: Request) -> Response:
15        """Send request and wait for response"""

Send request and wait for response

async def create_v1_manager( remote_addr: hat.drivers.udp.Address, community: str = 'public', **kwargs) -> Manager:
18async def create_v1_manager(remote_addr: udp.Address,
19                            community: common.CommunityName = 'public',
20                            **kwargs
21                            ) -> common.Manager:
22    """Create v1 manager"""
23    endpoint = await udp.create(local_addr=None,
24                                remote_addr=remote_addr,
25                                **kwargs)
26
27    try:
28        return V1Manager(endpoint=endpoint,
29                         community=community)
30
31    except BaseException:
32        await aio.uncancellable(endpoint.async_close())
33        raise

Create v1 manager

async def create_v2c_manager( remote_addr: hat.drivers.udp.Address, community: str = 'public', **kwargs) -> Manager:
18async def create_v2c_manager(remote_addr: udp.Address,
19                             community: common.CommunityName = 'public',
20                             **kwargs
21                             ) -> common.Manager:
22    """Create v2c manager"""
23    endpoint = await udp.create(local_addr=None,
24                                remote_addr=remote_addr,
25                                **kwargs)
26
27    try:
28        return V2CManager(endpoint=endpoint,
29                          community=community)
30
31    except BaseException:
32        await aio.uncancellable(endpoint.async_close())
33        raise

Create v2c manager

async def create_v3_manager( remote_addr: hat.drivers.udp.Address, context: Context | None = None, user: User = User(name='public', auth_type=None, auth_password=None, priv_type=None, priv_password=None), **kwargs) -> Manager:
26async def create_v3_manager(remote_addr: udp.Address,
27                            context: common.Context | None = None,
28                            user: common.User = _default_user,
29                            **kwargs
30                            ) -> common.Manager:
31    """Create v3 manager"""
32    endpoint = await udp.create(local_addr=None,
33                                remote_addr=remote_addr,
34                                **kwargs)
35
36    try:
37        manager = V3Manager(endpoint=endpoint,
38                            context=context,
39                            user=user)
40
41    except BaseException:
42        await aio.uncancellable(endpoint.async_close())
43        raise
44
45    try:
46        await manager.sync()
47
48    except BaseException:
49        await aio.uncancellable(manager.async_close())
50        raise
51
52    return manager

Create v3 manager

V1TrapCb = typing.Callable[[hat.drivers.udp.Address, str, Trap], None | collections.abc.Awaitable[None]]
V2CTrapCb = typing.Callable[[hat.drivers.udp.Address, str, Trap], None | collections.abc.Awaitable[None]]
V2CInformCb = typing.Callable[[hat.drivers.udp.Address, str, Inform], Error | None | collections.abc.Awaitable[Error | None]]
V3TrapCb = typing.Callable[[hat.drivers.udp.Address, str, Context, Trap], None | collections.abc.Awaitable[None]]
V3InformCb = typing.Callable[[hat.drivers.udp.Address, str, Context, Inform], Error | None | collections.abc.Awaitable[Error | None]]
async def create_trap_listener( local_addr: hat.drivers.udp.Address = Address(host='0.0.0.0', port=162), *, v1_trap_cb: Optional[Callable[[hat.drivers.udp.Address, str, Trap], None | Awaitable[None]]] = None, v2c_trap_cb: Optional[Callable[[hat.drivers.udp.Address, str, Trap], None | Awaitable[None]]] = None, v2c_inform_cb: Optional[Callable[[hat.drivers.udp.Address, str, Inform], Error | None | Awaitable[Error | None]]] = None, v3_trap_cb: Optional[Callable[[hat.drivers.udp.Address, str, Context, Trap], None | Awaitable[None]]] = None, v3_inform_cb: Optional[Callable[[hat.drivers.udp.Address, str, Context, Inform], Error | None | Awaitable[Error | None]]] = None, users: Collection[User] = [], **kwargs) -> TrapListener:
44async def create_trap_listener(local_addr: udp.Address = udp.Address('0.0.0.0', 162),  # NOQA
45                               *,
46                               v1_trap_cb: V1TrapCb | None = None,
47                               v2c_trap_cb: V2CTrapCb | None = None,
48                               v2c_inform_cb: V2CInformCb | None = None,
49                               v3_trap_cb: V3TrapCb | None = None,
50                               v3_inform_cb: V3InformCb | None = None,
51                               users: Collection[common.User] = [],
52                               **kwargs
53                               ) -> 'TrapListener':
54    """Create trap listener"""
55    endpoint = await udp.create(local_addr=local_addr,
56                                remote_addr=None,
57                                **kwargs)
58
59    try:
60        return TrapListener(endpoint=endpoint,
61                            v1_trap_cb=v1_trap_cb,
62                            v2c_trap_cb=v2c_trap_cb,
63                            v2c_inform_cb=v2c_inform_cb,
64                            v3_trap_cb=v3_trap_cb,
65                            v3_inform_cb=v3_inform_cb,
66                            users=users)
67
68    except BaseException:
69        await aio.uncancellable(endpoint.async_close())
70        raise

Create trap listener

class TrapListener(hat.aio.group.Resource):
 73class TrapListener(aio.Resource):
 74
 75    def __init__(self,
 76                 endpoint: udp.Endpoint,
 77                 v1_trap_cb: V1TrapCb | None,
 78                 v2c_trap_cb: V2CTrapCb | None,
 79                 v2c_inform_cb: V2CInformCb | None,
 80                 v3_trap_cb: V3TrapCb | None,
 81                 v3_inform_cb: V3InformCb | None,
 82                 users: Collection[common.User]):
 83        self._endpoint = endpoint
 84        self._v1_trap_cb = v1_trap_cb
 85        self._v2c_trap_cb = v2c_trap_cb
 86        self._v2c_inform_cb = v2c_inform_cb
 87        self._v3_trap_cb = v3_trap_cb
 88        self._v3_inform_cb = v3_inform_cb
 89        self._users = {}
 90        self._auth_keys = {}
 91        self._priv_keys = {}
 92        self._log = logger.create_logger(mlog, 'SnmpTrapListener',
 93                                         endpoint.info)
 94        self._comm_log = logger.CommunicationLogger(mlog, 'SnmpTrapListener',
 95                                                    endpoint.info)
 96
 97        for user in users:
 98            common.validate_user(user)
 99            self._users[user.name] = user
100
101        self.async_group.spawn(self._receive_loop)
102
103        self.async_group.spawn(aio.call_on_cancel, self._comm_log.log,
104                               common.CommLogAction.CLOSE)
105        self._comm_log.log(common.CommLogAction.OPEN)
106
107    @property
108    def async_group(self) -> aio.Group:
109        """Async group"""
110        return self._endpoint.async_group
111
112    def _on_auth_key(self, engine_id, username):
113        user = self._users.get(username)
114        if not user or not user.auth_type:
115            return
116
117        auth_key = self._auth_keys.get((engine_id, username))
118        if auth_key:
119            return auth_key
120
121        key_type = key.auth_type_to_key_type(user.auth_type)
122        auth_key = key.create_key(key_type=key_type,
123                                  password=user.auth_password,
124                                  engine_id=engine_id)
125
126        self._auth_keys[(engine_id, username)] = auth_key
127        return auth_key
128
129    def _on_priv_key(self, engine_id, username):
130        user = self._users.get(username)
131        if not user or not user.priv_type:
132            return
133
134        priv_key = self._priv_keys.get((engine_id, username))
135        if priv_key:
136            return priv_key
137
138        key_type = key.priv_type_to_key_type(user.priv_type)
139        priv_key = key.create_key(key_type=key_type,
140                                  password=user.priv_password,
141                                  engine_id=engine_id)
142
143        self._priv_keys[(engine_id, username)] = priv_key
144        return priv_key
145
146    async def _receive_loop(self):
147        try:
148            while True:
149                req_msg_bytes, addr = await self._endpoint.receive()
150
151                try:
152                    req_msg = encoder.decode(msg_bytes=req_msg_bytes,
153                                             auth_key_cb=self._on_auth_key,
154                                             priv_key_cb=self._on_priv_key)
155
156                except Exception as e:
157                    self._log.warning("error decoding message from %s: %s",
158                                      addr, e, exc_info=e)
159                    continue
160
161                self._comm_log.log(common.CommLogAction.RECEIVE, req_msg)
162
163                try:
164                    if isinstance(req_msg, encoder.v1.Msg):
165                        res_msg = await _process_v1_req_msg(
166                            req_msg=req_msg,
167                            addr=addr,
168                            trap_cb=self._v1_trap_cb)
169
170                    elif isinstance(req_msg, encoder.v2c.Msg):
171                        res_msg = await _process_v2c_req_msg(
172                            req_msg=req_msg,
173                            addr=addr,
174                            trap_cb=self._v2c_trap_cb,
175                            inform_cb=self._v2c_inform_cb)
176
177                    elif isinstance(req_msg, encoder.v3.Msg):
178                        res_msg = await _process_v3_req_msg(
179                            req_msg=req_msg,
180                            addr=addr,
181                            trap_cb=self._v3_trap_cb,
182                            inform_cb=self._v3_inform_cb)
183
184                    else:
185                        raise ValueError('unsupported message type')
186
187                except Exception as e:
188                    self._log.warning("error processing message from %s: %s",
189                                      addr, e, exc_info=e)
190                    continue
191
192                if not res_msg:
193                    continue
194
195                try:
196                    if isinstance(res_msg, encoder.v3.Msg):
197                        auth_key = (
198                            self._on_auth_key(res_msg.authorative_engine.id,
199                                              res_msg.user)
200                            if res_msg.auth else None)
201                        priv_key = (
202                            self._on_priv_key(res_msg.authorative_engine.id,
203                                              res_msg.user)
204                            if res_msg.priv else None)
205
206                    else:
207                        auth_key = None
208                        priv_key = None
209
210                    res_msg_bytes = encoder.encode(msg=res_msg,
211                                                   auth_key=auth_key,
212                                                   priv_key=priv_key)
213
214                except Exception as e:
215                    self._log.warning("error encoding message: %s",
216                                      e, exc_info=e)
217                    continue
218
219                self._comm_log.log(common.CommLogAction.SEND, res_msg)
220
221                self._endpoint.send(res_msg_bytes, addr)
222
223        except ConnectionError:
224            pass
225
226        except Exception as e:
227            self._log.error("receive loop error: %s", e, exc_info=e)
228
229        finally:
230            self.close()

Resource with lifetime control based on Group.

TrapListener( endpoint: hat.drivers.udp.Endpoint, v1_trap_cb: Optional[Callable[[hat.drivers.udp.Address, str, Trap], None | Awaitable[None]]], v2c_trap_cb: Optional[Callable[[hat.drivers.udp.Address, str, Trap], None | Awaitable[None]]], v2c_inform_cb: Optional[Callable[[hat.drivers.udp.Address, str, Inform], Error | None | Awaitable[Error | None]]], v3_trap_cb: Optional[Callable[[hat.drivers.udp.Address, str, Context, Trap], None | Awaitable[None]]], v3_inform_cb: Optional[Callable[[hat.drivers.udp.Address, str, Context, Inform], Error | None | Awaitable[Error | None]]], users: Collection[User])
 75    def __init__(self,
 76                 endpoint: udp.Endpoint,
 77                 v1_trap_cb: V1TrapCb | None,
 78                 v2c_trap_cb: V2CTrapCb | None,
 79                 v2c_inform_cb: V2CInformCb | None,
 80                 v3_trap_cb: V3TrapCb | None,
 81                 v3_inform_cb: V3InformCb | None,
 82                 users: Collection[common.User]):
 83        self._endpoint = endpoint
 84        self._v1_trap_cb = v1_trap_cb
 85        self._v2c_trap_cb = v2c_trap_cb
 86        self._v2c_inform_cb = v2c_inform_cb
 87        self._v3_trap_cb = v3_trap_cb
 88        self._v3_inform_cb = v3_inform_cb
 89        self._users = {}
 90        self._auth_keys = {}
 91        self._priv_keys = {}
 92        self._log = logger.create_logger(mlog, 'SnmpTrapListener',
 93                                         endpoint.info)
 94        self._comm_log = logger.CommunicationLogger(mlog, 'SnmpTrapListener',
 95                                                    endpoint.info)
 96
 97        for user in users:
 98            common.validate_user(user)
 99            self._users[user.name] = user
100
101        self.async_group.spawn(self._receive_loop)
102
103        self.async_group.spawn(aio.call_on_cancel, self._comm_log.log,
104                               common.CommLogAction.CLOSE)
105        self._comm_log.log(common.CommLogAction.OPEN)
async_group: hat.aio.group.Group
107    @property
108    def async_group(self) -> aio.Group:
109        """Async group"""
110        return self._endpoint.async_group

Async group

class TrapSender(hat.aio.group.Resource):
11class TrapSender(aio.Resource):
12
13    @abc.abstractmethod
14    def send_trap(self, trap: Trap):
15        """Send trap"""
16
17    @abc.abstractmethod
18    async def send_inform(self,
19                          inform: Inform
20                          ) -> Error | None:
21        """Send inform"""

Resource with lifetime control based on Group.

@abc.abstractmethod
def send_trap(self, trap: Trap):
13    @abc.abstractmethod
14    def send_trap(self, trap: Trap):
15        """Send trap"""

Send trap

@abc.abstractmethod
async def send_inform( self, inform: Inform) -> Error | None:
17    @abc.abstractmethod
18    async def send_inform(self,
19                          inform: Inform
20                          ) -> Error | None:
21        """Send inform"""

Send inform

async def create_v1_trap_sender( remote_addr: hat.drivers.udp.Address, community: str = 'public', **kwargs) -> TrapSender:
16async def create_v1_trap_sender(remote_addr: udp.Address,
17                                community: common.CommunityName = 'public',
18                                **kwargs
19                                ) -> common.TrapSender:
20    """Create v1 trap sender"""
21    endpoint = await udp.create(local_addr=None,
22                                remote_addr=remote_addr,
23                                **kwargs)
24
25    try:
26        return V1TrapSender(endpoint=endpoint,
27                            community=community)
28
29    except BaseException:
30        await aio.uncancellable(endpoint.async_close())
31        raise

Create v1 trap sender

async def create_v2c_trap_sender( remote_addr: hat.drivers.udp.Address, community: str = 'public', **kwargs) -> TrapSender:
18async def create_v2c_trap_sender(remote_addr: udp.Address,
19                                 community: common.CommunityName = 'public',
20                                 **kwargs
21                                 ) -> common.TrapSender:
22    """Create v2c trap sender"""
23    endpoint = await udp.create(local_addr=None,
24                                remote_addr=remote_addr,
25                                **kwargs)
26
27    try:
28        return V2CTrapSender(endpoint=endpoint,
29                             community=community)
30
31    except BaseException:
32        await aio.uncancellable(endpoint.async_close())
33        raise

Create v2c trap sender

async def create_v3_trap_sender( remote_addr: hat.drivers.udp.Address, authoritative_engine_id: bytes | bytearray | memoryview, context: Context | None = None, user: User = User(name='public', auth_type=None, auth_password=None, priv_type=None, priv_password=None), **kwargs) -> TrapSender:
26async def create_v3_trap_sender(remote_addr: udp.Address,
27                                authoritative_engine_id: common.EngineId,
28                                context: common.Context | None = None,
29                                user: common.User = _default_user,
30                                **kwargs
31                                ) -> common.TrapSender:
32    """Create v3 trap sender"""
33    endpoint = await udp.create(local_addr=None,
34                                remote_addr=remote_addr,
35                                **kwargs)
36
37    try:
38        return V3TrapSender(endpoint=endpoint,
39                            authoritative_engine_id=authoritative_engine_id,
40                            context=context,
41                            user=user)
42
43    except BaseException:
44        await aio.uncancellable(endpoint.async_close())
45        raise

Create v3 trap sender