hat.drivers.snmp
1from hat.drivers.snmp.common import (Version, 2 ErrorType, 3 CauseType, 4 AuthType, 5 PrivType, 6 Error, 7 Cause, 8 IntegerData, 9 UnsignedData, 10 CounterData, 11 BigCounterData, 12 StringData, 13 ObjectIdData, 14 IpAddressData, 15 TimeTicksData, 16 ArbitraryData, 17 EmptyData, 18 UnspecifiedData, 19 NoSuchObjectData, 20 NoSuchInstanceData, 21 EndOfMibViewData, 22 Data, 23 CommunityName, 24 UserName, 25 Password, 26 EngineId, 27 User, 28 Context, 29 Trap, 30 Inform, 31 GetDataReq, 32 GetNextDataReq, 33 GetBulkDataReq, 34 SetDataReq, 35 Request, 36 Response) 37from hat.drivers.snmp.agent import (V1RequestCb, 38 V2CRequestCb, 39 V3RequestCb, 40 create_agent, 41 Agent) 42from hat.drivers.snmp.manager import (Manager, 43 create_v1_manager, 44 create_v2c_manager, 45 create_v3_manager) 46from hat.drivers.snmp.trap import (V1TrapCb, 47 V2CTrapCb, 48 V2CInformCb, 49 V3TrapCb, 50 V3InformCb, 51 create_trap_listener, 52 TrapListener, 53 TrapSender, 54 create_v1_trap_sender, 55 create_v2c_trap_sender, 56 create_v3_trap_sender) 57 58 59__all__ = ['Version', 60 'ErrorType', 61 'CauseType', 62 'AuthType', 63 'PrivType', 64 'Error', 65 'Cause', 66 'IntegerData', 67 'UnsignedData', 68 'CounterData', 69 'BigCounterData', 70 'StringData', 71 'ObjectIdData', 72 'IpAddressData', 73 'TimeTicksData', 74 'ArbitraryData', 75 'EmptyData', 76 'UnspecifiedData', 77 'NoSuchObjectData', 78 'NoSuchInstanceData', 79 'EndOfMibViewData', 80 'Data', 81 'CommunityName', 82 'UserName', 83 'Password', 84 'EngineId', 85 'User', 86 'Context', 87 'Trap', 88 'Inform', 89 'GetDataReq', 90 'GetNextDataReq', 91 'GetBulkDataReq', 92 'SetDataReq', 93 'Request', 94 'Response', 95 'V1RequestCb', 96 'V2CRequestCb', 97 'V3RequestCb', 98 'create_agent', 99 'Agent', 100 'Manager', 101 'create_v1_manager', 102 'create_v2c_manager', 103 'create_v3_manager', 104 'V1TrapCb', 105 'V2CTrapCb', 106 'V2CInformCb', 107 'V3TrapCb', 108 'V3InformCb', 109 'create_trap_listener', 110 'TrapListener', 111 'TrapSender', 112 'create_v1_trap_sender', 113 'create_v2c_trap_sender', 114 'create_v3_trap_sender']
An enumeration.
16class ErrorType(enum.Enum): 17 NO_ERROR = 0 # v1, v2c, v3 18 TOO_BIG = 1 # v1, v2c, v3 19 NO_SUCH_NAME = 2 # v1, v2c, v3 20 BAD_VALUE = 3 # v1, v2c, v3 21 READ_ONLY = 4 # v1, v2c, v3 22 GEN_ERR = 5 # v1, v2c, v3 23 NO_ACCESS = 6 # v2c, v3 24 WRONG_TYPE = 7 # v2c, v3 25 WRONG_LENGTH = 8 # v2c, v3 26 WRONG_ENCODING = 9 # v2c, v3 27 WRONG_VALUE = 10 # v2c, v3 28 NO_CREATION = 11 # v2c, v3 29 INCONSISTENT_VALUE = 12 # v2c, v3 30 RESOURCE_UNAVAILABLE = 13 # v2c, v3 31 COMMIT_FAILED = 14 # v2c, v3 32 UNDO_FAILED = 15 # v2c, v3 33 AUTHORIZATION_ERROR = 16 # v2c, v3 34 NOT_WRITABLE = 17 # v2c, v3 35 INCONSISTENT_NAME = 18 # v2c, v3
An enumeration.
38class CauseType(enum.Enum): 39 COLD_START = 0 40 WARM_START = 1 41 LINK_DOWN = 2 42 LINK_UP = 3 43 AUTHENICATION_FAILURE = 4 44 EGP_NEIGHBOR_LOSS = 5 45 ENTERPRISE_SPECIFIC = 6
An enumeration.
An enumeration.
An enumeration.
Error(type, index)
Cause(type, value)
IntegerData(name, value)
UnsignedData(name, value)
CounterData(name, value)
BigCounterData(name, value)
StringData(name, value)
98class ObjectIdData(typing.NamedTuple): 99 name: asn1.ObjectIdentifier 100 value: asn1.ObjectIdentifier
ObjectIdData(name, value)
104class IpAddressData(typing.NamedTuple): 105 name: asn1.ObjectIdentifier 106 value: tuple[int, int, int, int]
IpAddressData(name, value)
TimeTicksData(name, value)
ArbitraryData(name, value)
EmptyData(name,)
UnspecifiedData(name,)
NoSuchObjectData(name,)
NoSuchInstanceData(name,)
EndOfMibViewData(name,)
171class User(typing.NamedTuple): 172 name: UserName 173 auth_type: AuthType | None 174 auth_password: Password | None 175 priv_type: PrivType | None 176 priv_password: Password | None
User(name, auth_type, auth_password, priv_type, priv_password)
Context(engine_id, name)
184class Trap(typing.NamedTuple): 185 cause: Cause | None 186 """cause is available in case of v1""" 187 oid: asn1.ObjectIdentifier 188 timestamp: int 189 data: Collection[Data]
Trap(cause, oid, timestamp, data)
Create new instance of Trap(cause, oid, timestamp, data)
Alias for field number 3
Inform(data,)
Create new instance of Inform(data,)
Alias for field number 0
GetDataReq(names,)
GetNextDataReq(names,)
GetBulkDataReq(names,)
SetDataReq(data,)
Create new instance of SetDataReq(data,)
Alias for field number 0
32async def create_agent(local_addr: udp.Address = udp.Address('0.0.0.0', 161), 33 v1_request_cb: V1RequestCb | None = None, 34 v2c_request_cb: V2CRequestCb | None = None, 35 v3_request_cb: V3RequestCb | None = None, 36 authoritative_engine_id: common.EngineId | None = None, 37 users: Collection[common.User] = [] 38 ) -> 'Agent': 39 """Create agent""" 40 endpoint = await udp.create(local_addr=local_addr, 41 remote_addr=None) 42 43 try: 44 return Agent(endpoint=endpoint, 45 v1_request_cb=v1_request_cb, 46 v2c_request_cb=v2c_request_cb, 47 v3_request_cb=v3_request_cb, 48 authoritative_engine_id=authoritative_engine_id, 49 users=users) 50 51 except BaseException: 52 await aio.uncancellable(endpoint.async_close()) 53 raise
Create agent
56class Agent(aio.Resource): 57 58 def __init__(self, 59 endpoint: udp.Endpoint, 60 v1_request_cb: V1RequestCb | None, 61 v2c_request_cb: V2CRequestCb | None, 62 v3_request_cb: V3RequestCb | None, 63 authoritative_engine_id: common.EngineId | None, 64 users: Collection[common.User] = []): 65 self._endpoint = endpoint 66 self._v1_request_cb = v1_request_cb 67 self._v2c_request_cb = v2c_request_cb 68 self._v3_request_cb = v3_request_cb 69 self._auth_engine_id = authoritative_engine_id 70 self._auth_keys = {} 71 self._priv_keys = {} 72 73 for user in users: 74 common.validate_user(user) 75 76 if user.auth_type: 77 key_type = key.auth_type_to_key_type(user.auth_type) 78 self._auth_keys[user.name] = key.create_key( 79 key_type=key_type, 80 password=user.auth_password, 81 engine_id=authoritative_engine_id) 82 83 else: 84 self._auth_keys[user.name] = None 85 86 if user.priv_type: 87 key_type = key.priv_type_to_key_type(user.priv_type) 88 self._priv_keys[user.name] = key.create_key( 89 key_type=key_type, 90 password=user.priv_password, 91 engine_id=authoritative_engine_id) 92 93 else: 94 self._priv_keys[user.name] = None 95 96 self.async_group.spawn(self._receive_loop) 97 98 @property 99 def async_group(self) -> aio.Group: 100 return self._endpoint.async_group 101 102 def _on_auth_key(self, engine_id, username): 103 if engine_id != self._auth_engine_id: 104 raise Exception('invalid authoritative engine id') 105 106 if username not in self._auth_keys: 107 raise Exception('invalid user') 108 return self._auth_keys[username] 109 110 def _on_priv_key(self, engine_id, username): 111 if engine_id != self._auth_engine_id: 112 raise Exception('invalid authoritative engine id') 113 114 if username not in self._priv_keys: 115 raise Exception('invalid user') 116 return self._priv_keys[username] 117 118 async def _receive_loop(self): 119 try: 120 while True: 121 req_msg_bytes, addr = await self._endpoint.receive() 122 123 try: 124 req_msg = encoder.decode(msg_bytes=req_msg_bytes, 125 auth_key_cb=self._on_auth_key, 126 priv_key_cb=self._on_priv_key) 127 128 except Exception as e: 129 mlog.warning("error decoding message from %s: %s", 130 addr, e, exc_info=e) 131 continue 132 133 try: 134 if isinstance(req_msg, encoder.v1.Msg): 135 res_msg = await _process_v1_req_msg( 136 req_msg=req_msg, 137 addr=addr, 138 request_cb=self._v1_request_cb) 139 140 elif isinstance(req_msg, encoder.v2c.Msg): 141 res_msg = await _process_v2c_req_msg( 142 req_msg=req_msg, 143 addr=addr, 144 request_cb=self._v2c_request_cb) 145 146 elif isinstance(req_msg, encoder.v3.Msg): 147 res_msg = await _process_v3_req_msg( 148 req_msg=req_msg, 149 addr=addr, 150 request_cb=self._v3_request_cb, 151 authoritative_engine_id=self._auth_engine_id, 152 auth_keys=self._auth_keys, 153 priv_keys=self._priv_keys) 154 155 else: 156 raise ValueError('unsupported message type') 157 158 except Exception as e: 159 mlog.warning("error processing message from %s: %s", 160 addr, e, exc_info=e) 161 continue 162 163 if not res_msg: 164 continue 165 166 try: 167 if isinstance(res_msg, encoder.v3.Msg): 168 auth_key = ( 169 self._on_auth_key(res_msg.authorative_engine.id, 170 res_msg.user) 171 if res_msg.auth else None) 172 priv_key = ( 173 self._on_priv_key(res_msg.authorative_engine.id, 174 res_msg.user) 175 if res_msg.priv else None) 176 177 else: 178 auth_key = None 179 priv_key = None 180 181 res_msg_bytes = encoder.encode(msg=res_msg, 182 auth_key=auth_key, 183 priv_key=priv_key) 184 185 except Exception as e: 186 mlog.warning("error encoding message: %s", e, exc_info=e) 187 continue 188 189 self._endpoint.send(res_msg_bytes, addr) 190 191 except ConnectionError: 192 pass 193 194 except Exception as e: 195 mlog.error("receive loop error: %s", e, exc_info=e) 196 197 finally: 198 self.close()
Resource with lifetime control based on Group
.
58 def __init__(self, 59 endpoint: udp.Endpoint, 60 v1_request_cb: V1RequestCb | None, 61 v2c_request_cb: V2CRequestCb | None, 62 v3_request_cb: V3RequestCb | None, 63 authoritative_engine_id: common.EngineId | None, 64 users: Collection[common.User] = []): 65 self._endpoint = endpoint 66 self._v1_request_cb = v1_request_cb 67 self._v2c_request_cb = v2c_request_cb 68 self._v3_request_cb = v3_request_cb 69 self._auth_engine_id = authoritative_engine_id 70 self._auth_keys = {} 71 self._priv_keys = {} 72 73 for user in users: 74 common.validate_user(user) 75 76 if user.auth_type: 77 key_type = key.auth_type_to_key_type(user.auth_type) 78 self._auth_keys[user.name] = key.create_key( 79 key_type=key_type, 80 password=user.auth_password, 81 engine_id=authoritative_engine_id) 82 83 else: 84 self._auth_keys[user.name] = None 85 86 if user.priv_type: 87 key_type = key.priv_type_to_key_type(user.priv_type) 88 self._priv_keys[user.name] = key.create_key( 89 key_type=key_type, 90 password=user.priv_password, 91 engine_id=authoritative_engine_id) 92 93 else: 94 self._priv_keys[user.name] = None 95 96 self.async_group.spawn(self._receive_loop)
11class Manager(aio.Resource): 12 13 @abc.abstractmethod 14 async def send(self, req: Request) -> Response: 15 """Send request and wait for response"""
Resource with lifetime control based on Group
.
13 @abc.abstractmethod 14 async def send(self, req: Request) -> Response: 15 """Send request and wait for response"""
Send request and wait for response
17async def create_v1_manager(remote_addr: udp.Address, 18 community: common.CommunityName = 'public' 19 ) -> common.Manager: 20 """Create v1 manager""" 21 endpoint = await udp.create(local_addr=None, 22 remote_addr=remote_addr) 23 24 try: 25 return V1Manager(endpoint=endpoint, 26 community=community) 27 28 except BaseException: 29 await aio.uncancellable(endpoint.async_close()) 30 raise
Create v1 manager
17async def create_v2c_manager(remote_addr: udp.Address, 18 community: common.CommunityName = 'public' 19 ) -> common.Manager: 20 """Create v2c manager""" 21 endpoint = await udp.create(local_addr=None, 22 remote_addr=remote_addr) 23 24 try: 25 return V2CManager(endpoint=endpoint, 26 community=community) 27 28 except BaseException: 29 await aio.uncancellable(endpoint.async_close()) 30 raise
Create v2c manager
25async def create_v3_manager(remote_addr: udp.Address, 26 context: common.Context | None = None, 27 user: common.User = _default_user 28 ) -> common.Manager: 29 """Create v3 manager""" 30 endpoint = await udp.create(local_addr=None, 31 remote_addr=remote_addr) 32 33 try: 34 manager = V3Manager(endpoint=endpoint, 35 context=context, 36 user=user) 37 38 except BaseException: 39 await aio.uncancellable(endpoint.async_close()) 40 raise 41 42 try: 43 await manager.sync() 44 45 except BaseException: 46 await aio.uncancellable(manager.async_close()) 47 raise 48 49 return manager
Create v3 manager
43async def create_trap_listener(local_addr: udp.Address = udp.Address('0.0.0.0', 162), # NOQA 44 v1_trap_cb: V1TrapCb | None = None, 45 v2c_trap_cb: V2CTrapCb | None = None, 46 v2c_inform_cb: V2CInformCb | None = None, 47 v3_trap_cb: V3TrapCb | None = None, 48 v3_inform_cb: V3InformCb | None = None, 49 users: Collection[common.User] = [] 50 ) -> 'TrapListener': 51 """Create trap listener""" 52 endpoint = await udp.create(local_addr=local_addr, 53 remote_addr=None) 54 55 try: 56 return TrapListener(endpoint=endpoint, 57 v1_trap_cb=v1_trap_cb, 58 v2c_trap_cb=v2c_trap_cb, 59 v2c_inform_cb=v2c_inform_cb, 60 v3_trap_cb=v3_trap_cb, 61 v3_inform_cb=v3_inform_cb, 62 users=users) 63 64 except BaseException: 65 await aio.uncancellable(endpoint.async_close()) 66 raise
Create trap listener
69class TrapListener(aio.Resource): 70 71 def __init__(self, 72 endpoint: udp.Endpoint, 73 v1_trap_cb: V1TrapCb | None, 74 v2c_trap_cb: V2CTrapCb | None, 75 v2c_inform_cb: V2CInformCb | None, 76 v3_trap_cb: V3TrapCb | None, 77 v3_inform_cb: V3InformCb | None, 78 users: Collection[common.User]): 79 self._endpoint = endpoint 80 self._v1_trap_cb = v1_trap_cb 81 self._v2c_trap_cb = v2c_trap_cb 82 self._v2c_inform_cb = v2c_inform_cb 83 self._v3_trap_cb = v3_trap_cb 84 self._v3_inform_cb = v3_inform_cb 85 self._users = {} 86 self._auth_keys = {} 87 self._priv_keys = {} 88 89 for user in users: 90 common.validate_user(user) 91 self._users[user.name] = user 92 93 self.async_group.spawn(self._receive_loop) 94 95 @property 96 def async_group(self) -> aio.Group: 97 """Async group""" 98 return self._endpoint.async_group 99 100 def _on_auth_key(self, engine_id, username): 101 user = self._users.get(username) 102 if not user or not user.auth_type: 103 return 104 105 auth_key = self._auth_keys.get((engine_id, username)) 106 if auth_key: 107 return auth_key 108 109 key_type = key.auth_type_to_key_type(user.auth_type) 110 auth_key = key.create_key(key_type=key_type, 111 password=user.auth_password, 112 engine_id=engine_id) 113 114 self._auth_keys[(engine_id, username)] = auth_key 115 return auth_key 116 117 def _on_priv_key(self, engine_id, username): 118 user = self._users.get(username) 119 if not user or not user.priv_type: 120 return 121 122 priv_key = self._priv_keys.get((engine_id, username)) 123 if priv_key: 124 return priv_key 125 126 key_type = key.priv_type_to_key_type(user.priv_type) 127 priv_key = key.create_key(key_type=key_type, 128 password=user.priv_password, 129 engine_id=engine_id) 130 131 self._priv_keys[(engine_id, username)] = priv_key 132 return priv_key 133 134 async def _receive_loop(self): 135 try: 136 while True: 137 req_msg_bytes, addr = await self._endpoint.receive() 138 139 try: 140 req_msg = encoder.decode(msg_bytes=req_msg_bytes, 141 auth_key_cb=self._on_auth_key, 142 priv_key_cb=self._on_priv_key) 143 144 except Exception as e: 145 mlog.warning("error decoding message from %s: %s", 146 addr, e, exc_info=e) 147 continue 148 149 try: 150 if isinstance(req_msg, encoder.v1.Msg): 151 res_msg = await _process_v1_req_msg( 152 req_msg=req_msg, 153 addr=addr, 154 trap_cb=self._v1_trap_cb) 155 156 elif isinstance(req_msg, encoder.v2c.Msg): 157 res_msg = await _process_v2c_req_msg( 158 req_msg=req_msg, 159 addr=addr, 160 trap_cb=self._v2c_trap_cb, 161 inform_cb=self._v2c_inform_cb) 162 163 elif isinstance(req_msg, encoder.v3.Msg): 164 res_msg = await _process_v3_req_msg( 165 req_msg=req_msg, 166 addr=addr, 167 trap_cb=self._v3_trap_cb, 168 inform_cb=self._v3_inform_cb) 169 170 else: 171 raise ValueError('unsupported message type') 172 173 except Exception as e: 174 mlog.warning("error processing message from %s: %s", 175 addr, e, exc_info=e) 176 continue 177 178 if not res_msg: 179 continue 180 181 try: 182 if isinstance(res_msg, encoder.v3.Msg): 183 auth_key = ( 184 self._on_auth_key(res_msg.authorative_engine.id, 185 res_msg.user) 186 if res_msg.auth else None) 187 priv_key = ( 188 self._on_priv_key(res_msg.authorative_engine.id, 189 res_msg.user) 190 if res_msg.priv else None) 191 192 else: 193 auth_key = None 194 priv_key = None 195 196 res_msg_bytes = encoder.encode(msg=res_msg, 197 auth_key=auth_key, 198 priv_key=priv_key) 199 200 except Exception as e: 201 mlog.warning("error encoding message: %s", e, exc_info=e) 202 continue 203 204 self._endpoint.send(res_msg_bytes, addr) 205 206 except ConnectionError: 207 pass 208 209 except Exception as e: 210 mlog.error("receive loop error: %s", e, exc_info=e) 211 212 finally: 213 self.close()
Resource with lifetime control based on Group
.
71 def __init__(self, 72 endpoint: udp.Endpoint, 73 v1_trap_cb: V1TrapCb | None, 74 v2c_trap_cb: V2CTrapCb | None, 75 v2c_inform_cb: V2CInformCb | None, 76 v3_trap_cb: V3TrapCb | None, 77 v3_inform_cb: V3InformCb | None, 78 users: Collection[common.User]): 79 self._endpoint = endpoint 80 self._v1_trap_cb = v1_trap_cb 81 self._v2c_trap_cb = v2c_trap_cb 82 self._v2c_inform_cb = v2c_inform_cb 83 self._v3_trap_cb = v3_trap_cb 84 self._v3_inform_cb = v3_inform_cb 85 self._users = {} 86 self._auth_keys = {} 87 self._priv_keys = {} 88 89 for user in users: 90 common.validate_user(user) 91 self._users[user.name] = user 92 93 self.async_group.spawn(self._receive_loop)
11class TrapSender(aio.Resource): 12 13 @abc.abstractmethod 14 def send_trap(self, trap: Trap): 15 """Send trap""" 16 17 @abc.abstractmethod 18 async def send_inform(self, 19 inform: Inform 20 ) -> Error | None: 21 """Send inform"""
Resource with lifetime control based on Group
.
15async def create_v1_trap_sender(remote_addr: udp.Address, 16 community: common.CommunityName = 'public' 17 ) -> common.TrapSender: 18 """Create v1 trap sender""" 19 endpoint = await udp.create(local_addr=None, 20 remote_addr=remote_addr) 21 22 try: 23 return V1TrapSender(endpoint=endpoint, 24 community=community) 25 26 except BaseException: 27 await aio.uncancellable(endpoint.async_close()) 28 raise
Create v1 trap sender
17async def create_v2c_trap_sender(remote_addr: udp.Address, 18 community: common.CommunityName = 'public' 19 ) -> common.TrapSender: 20 """Create v2c trap sender""" 21 endpoint = await udp.create(local_addr=None, 22 remote_addr=remote_addr) 23 24 try: 25 return V2CTrapSender(endpoint=endpoint, 26 community=community) 27 28 except BaseException: 29 await aio.uncancellable(endpoint.async_close()) 30 raise
Create v2c trap sender
25async def create_v3_trap_sender(remote_addr: udp.Address, 26 authoritative_engine_id: common.EngineId, 27 context: common.Context | None = None, 28 user: common.User = _default_user 29 ) -> common.TrapSender: 30 """Create v3 trap sender""" 31 endpoint = await udp.create(local_addr=None, 32 remote_addr=remote_addr) 33 34 try: 35 return V3TrapSender(endpoint=endpoint, 36 authoritative_engine_id=authoritative_engine_id, 37 context=context, 38 user=user) 39 40 except BaseException: 41 await aio.uncancellable(endpoint.async_close()) 42 raise
Create v3 trap sender