hat.drivers.snmp
1from hat.drivers.snmp.common import (Version, 2 ErrorType, 3 CauseType, 4 AuthType, 5 PrivType, 6 Error, 7 Cause, 8 IntegerData, 9 UnsignedData, 10 CounterData, 11 BigCounterData, 12 StringData, 13 ObjectIdData, 14 IpAddressData, 15 TimeTicksData, 16 ArbitraryData, 17 EmptyData, 18 UnspecifiedData, 19 NoSuchObjectData, 20 NoSuchInstanceData, 21 EndOfMibViewData, 22 Data, 23 CommunityName, 24 UserName, 25 Password, 26 EngineId, 27 User, 28 Context, 29 Trap, 30 Inform, 31 GetDataReq, 32 GetNextDataReq, 33 GetBulkDataReq, 34 SetDataReq, 35 Request, 36 Response) 37from hat.drivers.snmp.agent import (V1RequestCb, 38 V2CRequestCb, 39 V3RequestCb, 40 create_agent, 41 Agent) 42from hat.drivers.snmp.manager import (Manager, 43 create_v1_manager, 44 create_v2c_manager, 45 create_v3_manager) 46from hat.drivers.snmp.trap import (V1TrapCb, 47 V2CTrapCb, 48 V2CInformCb, 49 V3TrapCb, 50 V3InformCb, 51 create_trap_listener, 52 TrapListener, 53 TrapSender, 54 create_v1_trap_sender, 55 create_v2c_trap_sender, 56 create_v3_trap_sender) 57 58 59__all__ = ['Version', 60 'ErrorType', 61 'CauseType', 62 'AuthType', 63 'PrivType', 64 'Error', 65 'Cause', 66 'IntegerData', 67 'UnsignedData', 68 'CounterData', 69 'BigCounterData', 70 'StringData', 71 'ObjectIdData', 72 'IpAddressData', 73 'TimeTicksData', 74 'ArbitraryData', 75 'EmptyData', 76 'UnspecifiedData', 77 'NoSuchObjectData', 78 'NoSuchInstanceData', 79 'EndOfMibViewData', 80 'Data', 81 'CommunityName', 82 'UserName', 83 'Password', 84 'EngineId', 85 'User', 86 'Context', 87 'Trap', 88 'Inform', 89 'GetDataReq', 90 'GetNextDataReq', 91 'GetBulkDataReq', 92 'SetDataReq', 93 'Request', 94 'Response', 95 'V1RequestCb', 96 'V2CRequestCb', 97 'V3RequestCb', 98 'create_agent', 99 'Agent', 100 'Manager', 101 'create_v1_manager', 102 'create_v2c_manager', 103 'create_v3_manager', 104 'V1TrapCb', 105 'V2CTrapCb', 106 'V2CInformCb', 107 'V3TrapCb', 108 'V3InformCb', 109 'create_trap_listener', 110 'TrapListener', 111 'TrapSender', 112 'create_v1_trap_sender', 113 'create_v2c_trap_sender', 114 'create_v3_trap_sender']
18class ErrorType(enum.Enum): 19 NO_ERROR = 0 # v1, v2c, v3 20 TOO_BIG = 1 # v1, v2c, v3 21 NO_SUCH_NAME = 2 # v1, v2c, v3 22 BAD_VALUE = 3 # v1, v2c, v3 23 READ_ONLY = 4 # v1, v2c, v3 24 GEN_ERR = 5 # v1, v2c, v3 25 NO_ACCESS = 6 # v2c, v3 26 WRONG_TYPE = 7 # v2c, v3 27 WRONG_LENGTH = 8 # v2c, v3 28 WRONG_ENCODING = 9 # v2c, v3 29 WRONG_VALUE = 10 # v2c, v3 30 NO_CREATION = 11 # v2c, v3 31 INCONSISTENT_VALUE = 12 # v2c, v3 32 RESOURCE_UNAVAILABLE = 13 # v2c, v3 33 COMMIT_FAILED = 14 # v2c, v3 34 UNDO_FAILED = 15 # v2c, v3 35 AUTHORIZATION_ERROR = 16 # v2c, v3 36 NOT_WRITABLE = 17 # v2c, v3 37 INCONSISTENT_NAME = 18 # v2c, v3
40class CauseType(enum.Enum): 41 COLD_START = 0 42 WARM_START = 1 43 LINK_DOWN = 2 44 LINK_UP = 3 45 AUTHENICATION_FAILURE = 4 46 EGP_NEIGHBOR_LOSS = 5 47 ENTERPRISE_SPECIFIC = 6
Error(type, index)
Cause(type, value)
IntegerData(name, value)
UnsignedData(name, value)
CounterData(name, value)
BigCounterData(name, value)
StringData(name, value)
100class ObjectIdData(typing.NamedTuple): 101 name: asn1.ObjectIdentifier 102 value: asn1.ObjectIdentifier
ObjectIdData(name, value)
106class IpAddressData(typing.NamedTuple): 107 name: asn1.ObjectIdentifier 108 value: tuple[int, int, int, int]
IpAddressData(name, value)
TimeTicksData(name, value)
ArbitraryData(name, value)
EmptyData(name,)
UnspecifiedData(name,)
NoSuchObjectData(name,)
NoSuchInstanceData(name,)
EndOfMibViewData(name,)
173class User(typing.NamedTuple): 174 name: UserName 175 auth_type: AuthType | None 176 auth_password: Password | None 177 priv_type: PrivType | None 178 priv_password: Password | None
User(name, auth_type, auth_password, priv_type, priv_password)
Context(engine_id, name)
186class Trap(typing.NamedTuple): 187 cause: Cause | None 188 """cause is available in case of v1""" 189 oid: asn1.ObjectIdentifier 190 timestamp: int 191 data: Collection[Data]
Trap(cause, oid, timestamp, data)
Create new instance of Trap(cause, oid, timestamp, data)
Alias for field number 3
Inform(data,)
Create new instance of Inform(data,)
Alias for field number 0
GetDataReq(names,)
GetNextDataReq(names,)
GetBulkDataReq(names,)
SetDataReq(data,)
Create new instance of SetDataReq(data,)
Alias for field number 0
33async def create_agent(local_addr: udp.Address = udp.Address('0.0.0.0', 161), 34 *, 35 v1_request_cb: V1RequestCb | None = None, 36 v2c_request_cb: V2CRequestCb | None = None, 37 v3_request_cb: V3RequestCb | None = None, 38 authoritative_engine_id: common.EngineId | None = None, 39 users: Collection[common.User] = [], 40 **kwargs 41 ) -> 'Agent': 42 """Create agent""" 43 endpoint = await udp.create(local_addr=local_addr, 44 remote_addr=None, 45 **kwargs) 46 47 try: 48 return Agent(endpoint=endpoint, 49 v1_request_cb=v1_request_cb, 50 v2c_request_cb=v2c_request_cb, 51 v3_request_cb=v3_request_cb, 52 authoritative_engine_id=authoritative_engine_id, 53 users=users) 54 55 except BaseException: 56 await aio.uncancellable(endpoint.async_close()) 57 raise
Create agent
60class Agent(aio.Resource): 61 62 def __init__(self, 63 endpoint: udp.Endpoint, 64 v1_request_cb: V1RequestCb | None, 65 v2c_request_cb: V2CRequestCb | None, 66 v3_request_cb: V3RequestCb | None, 67 authoritative_engine_id: common.EngineId | None, 68 users: Collection[common.User]): 69 self._endpoint = endpoint 70 self._v1_request_cb = v1_request_cb 71 self._v2c_request_cb = v2c_request_cb 72 self._v3_request_cb = v3_request_cb 73 self._auth_engine_id = authoritative_engine_id 74 self._auth_keys = {} 75 self._priv_keys = {} 76 self._log = logger.create_logger(mlog, 'SnmpAgent', endpoint.info) 77 self._comm_log = logger.CommunicationLogger(mlog, 'SnmpAgent', 78 endpoint.info) 79 80 for user in users: 81 common.validate_user(user) 82 83 if user.auth_type: 84 key_type = key.auth_type_to_key_type(user.auth_type) 85 self._auth_keys[user.name] = key.create_key( 86 key_type=key_type, 87 password=user.auth_password, 88 engine_id=authoritative_engine_id) 89 90 else: 91 self._auth_keys[user.name] = None 92 93 if user.priv_type: 94 key_type = key.priv_type_to_key_type(user.priv_type) 95 self._priv_keys[user.name] = key.create_key( 96 key_type=key_type, 97 password=user.priv_password, 98 engine_id=authoritative_engine_id) 99 100 else: 101 self._priv_keys[user.name] = None 102 103 self.async_group.spawn(self._receive_loop) 104 105 self.async_group.spawn(aio.call_on_cancel, self._comm_log.log, 106 common.CommLogAction.CLOSE) 107 self._comm_log.log(common.CommLogAction.OPEN) 108 109 @property 110 def async_group(self) -> aio.Group: 111 return self._endpoint.async_group 112 113 def _on_auth_key(self, engine_id, username): 114 if engine_id != self._auth_engine_id: 115 raise Exception('invalid authoritative engine id') 116 117 if username not in self._auth_keys: 118 raise Exception('invalid user') 119 return self._auth_keys[username] 120 121 def _on_priv_key(self, engine_id, username): 122 if engine_id != self._auth_engine_id: 123 raise Exception('invalid authoritative engine id') 124 125 if username not in self._priv_keys: 126 raise Exception('invalid user') 127 return self._priv_keys[username] 128 129 async def _receive_loop(self): 130 try: 131 while True: 132 req_msg_bytes, addr = await self._endpoint.receive() 133 134 try: 135 req_msg = encoder.decode(msg_bytes=req_msg_bytes, 136 auth_key_cb=self._on_auth_key, 137 priv_key_cb=self._on_priv_key) 138 139 except Exception as e: 140 self._log.warning("error decoding message from %s: %s", 141 addr, e, exc_info=e) 142 continue 143 144 self._comm_log.log(common.CommLogAction.RECEIVE, req_msg) 145 146 try: 147 if isinstance(req_msg, encoder.v1.Msg): 148 res_msg = await self._process_v1_req_msg( 149 req_msg=req_msg, 150 addr=addr) 151 152 elif isinstance(req_msg, encoder.v2c.Msg): 153 res_msg = await self._process_v2c_req_msg( 154 req_msg=req_msg, 155 addr=addr) 156 157 elif isinstance(req_msg, encoder.v3.Msg): 158 res_msg = await self._process_v3_req_msg( 159 req_msg=req_msg, 160 addr=addr) 161 162 else: 163 raise ValueError('unsupported message type') 164 165 except Exception as e: 166 self._log.warning("error processing message from %s: %s", 167 addr, e, exc_info=e) 168 continue 169 170 if not res_msg: 171 continue 172 173 try: 174 if isinstance(res_msg, encoder.v3.Msg): 175 auth_key = ( 176 self._on_auth_key(res_msg.authorative_engine.id, 177 res_msg.user) 178 if res_msg.auth else None) 179 priv_key = ( 180 self._on_priv_key(res_msg.authorative_engine.id, 181 res_msg.user) 182 if res_msg.priv else None) 183 184 else: 185 auth_key = None 186 priv_key = None 187 188 res_msg_bytes = encoder.encode(msg=res_msg, 189 auth_key=auth_key, 190 priv_key=priv_key) 191 192 except Exception as e: 193 self._log.warning("error encoding message: %s", 194 e, exc_info=e) 195 continue 196 197 self._comm_log.log(common.CommLogAction.SEND, res_msg) 198 199 self._endpoint.send(res_msg_bytes, addr) 200 201 except ConnectionError: 202 pass 203 204 except Exception as e: 205 self._log.error("receive loop error: %s", e, exc_info=e) 206 207 finally: 208 self.close() 209 210 async def _process_v1_req_msg(self, req_msg, addr): 211 if not self._v1_request_cb: 212 raise Exception('not accepting V1') 213 214 if req_msg.type == encoder.v1.MsgType.GET_REQUEST: 215 req = common.GetDataReq(names=[i.name for i in req_msg.pdu.data]) 216 217 elif req_msg.type == encoder.v1.MsgType.GET_NEXT_REQUEST: 218 req = common.GetNextDataReq( 219 names=[i.name for i in req_msg.pdu.data]) 220 221 elif req_msg.type == encoder.v1.MsgType.SET_REQUEST: 222 req = common.SetDataReq(data=req_msg.pdu.data) 223 224 else: 225 raise Exception('invalid request message type') 226 227 try: 228 res = await aio.call(self._v1_request_cb, addr, req_msg.community, 229 req) 230 231 if isinstance(res, common.Error): 232 if res.type.value > common.ErrorType.GEN_ERR.value: 233 raise Exception('invalid error type') 234 235 res_error = res 236 res_data = [] 237 238 else: 239 res_error = common.Error(common.ErrorType.NO_ERROR, 0) 240 res_data = res 241 242 except Exception as e: 243 self._log.warning("error processing request: %s", e, exc_info=e) 244 245 res_error = common.Error(common.ErrorType.GEN_ERR, 0) 246 res_data = [] 247 248 res_pdu = encoder.v1.BasicPdu( 249 request_id=req_msg.pdu.request_id, 250 error=res_error, 251 data=res_data) 252 253 res_msg = encoder.v1.Msg( 254 type=encoder.v1.MsgType.GET_RESPONSE, 255 community=req_msg.community, 256 pdu=res_pdu) 257 258 return res_msg 259 260 async def _process_v2c_req_msg(self, req_msg, addr): 261 if not self._v2c_request_cb: 262 raise Exception('not accepting V2C') 263 264 if req_msg.type == encoder.v2c.MsgType.GET_REQUEST: 265 req = common.GetDataReq(names=[i.name for i in req_msg.pdu.data]) 266 267 elif req_msg.type == encoder.v2c.MsgType.GET_NEXT_REQUEST: 268 req = common.GetNextDataReq( 269 names=[i.name for i in req_msg.pdu.data]) 270 271 elif req_msg.type == encoder.v2c.MsgType.GET_BULK_REQUEST: 272 req = common.GetBulkDataReq( 273 names=[i.name for i in req_msg.pdu.data]) 274 275 elif req_msg.type == encoder.v2c.MsgType.SET_REQUEST: 276 req = common.SetDataReq(data=req_msg.pdu.data) 277 278 else: 279 raise Exception('invalid request message type') 280 281 try: 282 res = await aio.call(self._v2c_request_cb, addr, req_msg.community, 283 req) 284 285 if isinstance(res, common.Error): 286 res_error = res 287 res_data = [] 288 289 else: 290 res_error = common.Error(common.ErrorType.NO_ERROR, 0) 291 res_data = res 292 293 except Exception as e: 294 self._log.warning("error processing request: %s", e, exc_info=e) 295 296 res_error = common.Error(common.ErrorType.GEN_ERR, 0) 297 res_data = [] 298 299 res_pdu = encoder.v2c.BasicPdu( 300 request_id=req_msg.pdu.request_id, 301 error=res_error, 302 data=res_data) 303 304 res_msg = encoder.v2c.Msg( 305 type=encoder.v2c.MsgType.RESPONSE, 306 community=req_msg.community, 307 pdu=res_pdu) 308 309 return res_msg 310 311 async def _process_v3_req_msg(self, req_msg, addr): 312 if not self._v3_request_cb or self._auth_engine_id is None: 313 raise Exception('not accepting V3') 314 315 if req_msg.type == encoder.v3.MsgType.GET_REQUEST: 316 req = common.GetDataReq(names=[i.name for i in req_msg.pdu.data]) 317 318 elif req_msg.type == encoder.v3.MsgType.GET_NEXT_REQUEST: 319 req = common.GetNextDataReq( 320 names=[i.name for i in req_msg.pdu.data]) 321 322 elif req_msg.type == encoder.v3.MsgType.GET_BULK_REQUEST: 323 req = common.GetBulkDataReq( 324 names=[i.name for i in req_msg.pdu.data]) 325 326 elif req_msg.type == encoder.v3.MsgType.SET_REQUEST: 327 req = common.SetDataReq(data=req_msg.pdu.data) 328 329 else: 330 raise Exception('invalid request message type') 331 332 if req_msg.authorative_engine.id != self._auth_engine_id: 333 if req_msg.reportable: 334 335 # TODO report data and conditions for sending reports 336 337 authorative_engine = encoder.v3.AuthorativeEngine( 338 id=self._auth_engine_id, 339 boots=0, 340 time=round(time.monotonic())) 341 342 res_pdu = encoder.v3.BasicPdu( 343 request_id=req_msg.pdu.request_id, 344 error=common.Error(common.ErrorType.NO_ERROR, 0), 345 data=[]) 346 347 res_msg = encoder.v3.Msg( 348 type=encoder.v3.MsgType.REPORT, 349 id=req_msg.id, 350 reportable=False, 351 auth=False, 352 priv=False, 353 authorative_engine=authorative_engine, 354 user='', 355 context=req_msg.context, 356 pdu=res_pdu) 357 358 return res_msg 359 360 raise Exception('invalid authoritative engine id') 361 362 # TODO check authoritative engine boot and time 363 364 if (req_msg.user not in self._auth_keys or 365 req_msg.user not in self._priv_keys): 366 raise Exception('invalid user') 367 368 if self._auth_keys[req_msg.user] is not None and not req_msg.auth: 369 raise Exception('invalid auth flag') 370 371 if self._priv_keys[req_msg.user] is not None and not req_msg.priv: 372 raise Exception('invalid priv flag') 373 374 try: 375 res = await aio.call(self._v3_request_cb, addr, req_msg.user, 376 req_msg.context, req) 377 378 if isinstance(res, common.Error): 379 res_error = res 380 res_data = [] 381 382 else: 383 res_error = common.Error(common.ErrorType.NO_ERROR, 0) 384 res_data = res 385 386 except Exception as e: 387 self._log.warning("error processing request: %s", e, exc_info=e) 388 389 res_error = common.Error(common.ErrorType.GEN_ERR, 0) 390 res_data = [] 391 392 authorative_engine = encoder.v3.AuthorativeEngine( 393 id=req_msg.authorative_engine.id, 394 boots=0, 395 time=round(time.monotonic())) 396 397 res_pdu = encoder.v3.BasicPdu( 398 request_id=req_msg.pdu.request_id, 399 error=res_error, 400 data=res_data) 401 402 # TODO can we reuse request id for res msg id 403 404 res_msg = encoder.v3.Msg( 405 type=encoder.v3.MsgType.RESPONSE, 406 id=req_msg.id, 407 reportable=False, 408 auth=req_msg.auth, 409 priv=req_msg.priv, 410 authorative_engine=authorative_engine, 411 user=req_msg.user, 412 context=req_msg.context, 413 pdu=res_pdu) 414 415 return res_msg
Resource with lifetime control based on Group.
62 def __init__(self, 63 endpoint: udp.Endpoint, 64 v1_request_cb: V1RequestCb | None, 65 v2c_request_cb: V2CRequestCb | None, 66 v3_request_cb: V3RequestCb | None, 67 authoritative_engine_id: common.EngineId | None, 68 users: Collection[common.User]): 69 self._endpoint = endpoint 70 self._v1_request_cb = v1_request_cb 71 self._v2c_request_cb = v2c_request_cb 72 self._v3_request_cb = v3_request_cb 73 self._auth_engine_id = authoritative_engine_id 74 self._auth_keys = {} 75 self._priv_keys = {} 76 self._log = logger.create_logger(mlog, 'SnmpAgent', endpoint.info) 77 self._comm_log = logger.CommunicationLogger(mlog, 'SnmpAgent', 78 endpoint.info) 79 80 for user in users: 81 common.validate_user(user) 82 83 if user.auth_type: 84 key_type = key.auth_type_to_key_type(user.auth_type) 85 self._auth_keys[user.name] = key.create_key( 86 key_type=key_type, 87 password=user.auth_password, 88 engine_id=authoritative_engine_id) 89 90 else: 91 self._auth_keys[user.name] = None 92 93 if user.priv_type: 94 key_type = key.priv_type_to_key_type(user.priv_type) 95 self._priv_keys[user.name] = key.create_key( 96 key_type=key_type, 97 password=user.priv_password, 98 engine_id=authoritative_engine_id) 99 100 else: 101 self._priv_keys[user.name] = None 102 103 self.async_group.spawn(self._receive_loop) 104 105 self.async_group.spawn(aio.call_on_cancel, self._comm_log.log, 106 common.CommLogAction.CLOSE) 107 self._comm_log.log(common.CommLogAction.OPEN)
11class Manager(aio.Resource): 12 13 @abc.abstractmethod 14 async def send(self, req: Request) -> Response: 15 """Send request and wait for response"""
Resource with lifetime control based on Group.
13 @abc.abstractmethod 14 async def send(self, req: Request) -> Response: 15 """Send request and wait for response"""
Send request and wait for response
18async def create_v1_manager(remote_addr: udp.Address, 19 community: common.CommunityName = 'public', 20 **kwargs 21 ) -> common.Manager: 22 """Create v1 manager""" 23 endpoint = await udp.create(local_addr=None, 24 remote_addr=remote_addr, 25 **kwargs) 26 27 try: 28 return V1Manager(endpoint=endpoint, 29 community=community) 30 31 except BaseException: 32 await aio.uncancellable(endpoint.async_close()) 33 raise
Create v1 manager
18async def create_v2c_manager(remote_addr: udp.Address, 19 community: common.CommunityName = 'public', 20 **kwargs 21 ) -> common.Manager: 22 """Create v2c manager""" 23 endpoint = await udp.create(local_addr=None, 24 remote_addr=remote_addr, 25 **kwargs) 26 27 try: 28 return V2CManager(endpoint=endpoint, 29 community=community) 30 31 except BaseException: 32 await aio.uncancellable(endpoint.async_close()) 33 raise
Create v2c manager
26async def create_v3_manager(remote_addr: udp.Address, 27 context: common.Context | None = None, 28 user: common.User = _default_user, 29 **kwargs 30 ) -> common.Manager: 31 """Create v3 manager""" 32 endpoint = await udp.create(local_addr=None, 33 remote_addr=remote_addr, 34 **kwargs) 35 36 try: 37 manager = V3Manager(endpoint=endpoint, 38 context=context, 39 user=user) 40 41 except BaseException: 42 await aio.uncancellable(endpoint.async_close()) 43 raise 44 45 try: 46 await manager.sync() 47 48 except BaseException: 49 await aio.uncancellable(manager.async_close()) 50 raise 51 52 return manager
Create v3 manager
44async def create_trap_listener(local_addr: udp.Address = udp.Address('0.0.0.0', 162), # NOQA 45 *, 46 v1_trap_cb: V1TrapCb | None = None, 47 v2c_trap_cb: V2CTrapCb | None = None, 48 v2c_inform_cb: V2CInformCb | None = None, 49 v3_trap_cb: V3TrapCb | None = None, 50 v3_inform_cb: V3InformCb | None = None, 51 users: Collection[common.User] = [], 52 **kwargs 53 ) -> 'TrapListener': 54 """Create trap listener""" 55 endpoint = await udp.create(local_addr=local_addr, 56 remote_addr=None, 57 **kwargs) 58 59 try: 60 return TrapListener(endpoint=endpoint, 61 v1_trap_cb=v1_trap_cb, 62 v2c_trap_cb=v2c_trap_cb, 63 v2c_inform_cb=v2c_inform_cb, 64 v3_trap_cb=v3_trap_cb, 65 v3_inform_cb=v3_inform_cb, 66 users=users) 67 68 except BaseException: 69 await aio.uncancellable(endpoint.async_close()) 70 raise
Create trap listener
73class TrapListener(aio.Resource): 74 75 def __init__(self, 76 endpoint: udp.Endpoint, 77 v1_trap_cb: V1TrapCb | None, 78 v2c_trap_cb: V2CTrapCb | None, 79 v2c_inform_cb: V2CInformCb | None, 80 v3_trap_cb: V3TrapCb | None, 81 v3_inform_cb: V3InformCb | None, 82 users: Collection[common.User]): 83 self._endpoint = endpoint 84 self._v1_trap_cb = v1_trap_cb 85 self._v2c_trap_cb = v2c_trap_cb 86 self._v2c_inform_cb = v2c_inform_cb 87 self._v3_trap_cb = v3_trap_cb 88 self._v3_inform_cb = v3_inform_cb 89 self._users = {} 90 self._auth_keys = {} 91 self._priv_keys = {} 92 self._log = logger.create_logger(mlog, 'SnmpTrapListener', 93 endpoint.info) 94 self._comm_log = logger.CommunicationLogger(mlog, 'SnmpTrapListener', 95 endpoint.info) 96 97 for user in users: 98 common.validate_user(user) 99 self._users[user.name] = user 100 101 self.async_group.spawn(self._receive_loop) 102 103 self.async_group.spawn(aio.call_on_cancel, self._comm_log.log, 104 common.CommLogAction.CLOSE) 105 self._comm_log.log(common.CommLogAction.OPEN) 106 107 @property 108 def async_group(self) -> aio.Group: 109 """Async group""" 110 return self._endpoint.async_group 111 112 def _on_auth_key(self, engine_id, username): 113 user = self._users.get(username) 114 if not user or not user.auth_type: 115 return 116 117 auth_key = self._auth_keys.get((engine_id, username)) 118 if auth_key: 119 return auth_key 120 121 key_type = key.auth_type_to_key_type(user.auth_type) 122 auth_key = key.create_key(key_type=key_type, 123 password=user.auth_password, 124 engine_id=engine_id) 125 126 self._auth_keys[(engine_id, username)] = auth_key 127 return auth_key 128 129 def _on_priv_key(self, engine_id, username): 130 user = self._users.get(username) 131 if not user or not user.priv_type: 132 return 133 134 priv_key = self._priv_keys.get((engine_id, username)) 135 if priv_key: 136 return priv_key 137 138 key_type = key.priv_type_to_key_type(user.priv_type) 139 priv_key = key.create_key(key_type=key_type, 140 password=user.priv_password, 141 engine_id=engine_id) 142 143 self._priv_keys[(engine_id, username)] = priv_key 144 return priv_key 145 146 async def _receive_loop(self): 147 try: 148 while True: 149 req_msg_bytes, addr = await self._endpoint.receive() 150 151 try: 152 req_msg = encoder.decode(msg_bytes=req_msg_bytes, 153 auth_key_cb=self._on_auth_key, 154 priv_key_cb=self._on_priv_key) 155 156 except Exception as e: 157 self._log.warning("error decoding message from %s: %s", 158 addr, e, exc_info=e) 159 continue 160 161 self._comm_log.log(common.CommLogAction.RECEIVE, req_msg) 162 163 try: 164 if isinstance(req_msg, encoder.v1.Msg): 165 res_msg = await _process_v1_req_msg( 166 req_msg=req_msg, 167 addr=addr, 168 trap_cb=self._v1_trap_cb) 169 170 elif isinstance(req_msg, encoder.v2c.Msg): 171 res_msg = await _process_v2c_req_msg( 172 req_msg=req_msg, 173 addr=addr, 174 trap_cb=self._v2c_trap_cb, 175 inform_cb=self._v2c_inform_cb) 176 177 elif isinstance(req_msg, encoder.v3.Msg): 178 res_msg = await _process_v3_req_msg( 179 req_msg=req_msg, 180 addr=addr, 181 trap_cb=self._v3_trap_cb, 182 inform_cb=self._v3_inform_cb) 183 184 else: 185 raise ValueError('unsupported message type') 186 187 except Exception as e: 188 self._log.warning("error processing message from %s: %s", 189 addr, e, exc_info=e) 190 continue 191 192 if not res_msg: 193 continue 194 195 try: 196 if isinstance(res_msg, encoder.v3.Msg): 197 auth_key = ( 198 self._on_auth_key(res_msg.authorative_engine.id, 199 res_msg.user) 200 if res_msg.auth else None) 201 priv_key = ( 202 self._on_priv_key(res_msg.authorative_engine.id, 203 res_msg.user) 204 if res_msg.priv else None) 205 206 else: 207 auth_key = None 208 priv_key = None 209 210 res_msg_bytes = encoder.encode(msg=res_msg, 211 auth_key=auth_key, 212 priv_key=priv_key) 213 214 except Exception as e: 215 self._log.warning("error encoding message: %s", 216 e, exc_info=e) 217 continue 218 219 self._comm_log.log(common.CommLogAction.SEND, res_msg) 220 221 self._endpoint.send(res_msg_bytes, addr) 222 223 except ConnectionError: 224 pass 225 226 except Exception as e: 227 self._log.error("receive loop error: %s", e, exc_info=e) 228 229 finally: 230 self.close()
Resource with lifetime control based on Group.
75 def __init__(self, 76 endpoint: udp.Endpoint, 77 v1_trap_cb: V1TrapCb | None, 78 v2c_trap_cb: V2CTrapCb | None, 79 v2c_inform_cb: V2CInformCb | None, 80 v3_trap_cb: V3TrapCb | None, 81 v3_inform_cb: V3InformCb | None, 82 users: Collection[common.User]): 83 self._endpoint = endpoint 84 self._v1_trap_cb = v1_trap_cb 85 self._v2c_trap_cb = v2c_trap_cb 86 self._v2c_inform_cb = v2c_inform_cb 87 self._v3_trap_cb = v3_trap_cb 88 self._v3_inform_cb = v3_inform_cb 89 self._users = {} 90 self._auth_keys = {} 91 self._priv_keys = {} 92 self._log = logger.create_logger(mlog, 'SnmpTrapListener', 93 endpoint.info) 94 self._comm_log = logger.CommunicationLogger(mlog, 'SnmpTrapListener', 95 endpoint.info) 96 97 for user in users: 98 common.validate_user(user) 99 self._users[user.name] = user 100 101 self.async_group.spawn(self._receive_loop) 102 103 self.async_group.spawn(aio.call_on_cancel, self._comm_log.log, 104 common.CommLogAction.CLOSE) 105 self._comm_log.log(common.CommLogAction.OPEN)
11class TrapSender(aio.Resource): 12 13 @abc.abstractmethod 14 def send_trap(self, trap: Trap): 15 """Send trap""" 16 17 @abc.abstractmethod 18 async def send_inform(self, 19 inform: Inform 20 ) -> Error | None: 21 """Send inform"""
Resource with lifetime control based on Group.
16async def create_v1_trap_sender(remote_addr: udp.Address, 17 community: common.CommunityName = 'public', 18 **kwargs 19 ) -> common.TrapSender: 20 """Create v1 trap sender""" 21 endpoint = await udp.create(local_addr=None, 22 remote_addr=remote_addr, 23 **kwargs) 24 25 try: 26 return V1TrapSender(endpoint=endpoint, 27 community=community) 28 29 except BaseException: 30 await aio.uncancellable(endpoint.async_close()) 31 raise
Create v1 trap sender
18async def create_v2c_trap_sender(remote_addr: udp.Address, 19 community: common.CommunityName = 'public', 20 **kwargs 21 ) -> common.TrapSender: 22 """Create v2c trap sender""" 23 endpoint = await udp.create(local_addr=None, 24 remote_addr=remote_addr, 25 **kwargs) 26 27 try: 28 return V2CTrapSender(endpoint=endpoint, 29 community=community) 30 31 except BaseException: 32 await aio.uncancellable(endpoint.async_close()) 33 raise
Create v2c trap sender
26async def create_v3_trap_sender(remote_addr: udp.Address, 27 authoritative_engine_id: common.EngineId, 28 context: common.Context | None = None, 29 user: common.User = _default_user, 30 **kwargs 31 ) -> common.TrapSender: 32 """Create v3 trap sender""" 33 endpoint = await udp.create(local_addr=None, 34 remote_addr=remote_addr, 35 **kwargs) 36 37 try: 38 return V3TrapSender(endpoint=endpoint, 39 authoritative_engine_id=authoritative_engine_id, 40 context=context, 41 user=user) 42 43 except BaseException: 44 await aio.uncancellable(endpoint.async_close()) 45 raise
Create v3 trap sender