hat.drivers.ssl
1from ssl import * # NOQA 2 3import enum 4import pathlib 5import ssl 6import typing 7 8try: 9 from hat.drivers.ssl import _ssl 10 11except ImportError: 12 _ssl = None 13 14 15class SslProtocol(enum.Enum): 16 TLS_CLIENT = ssl.PROTOCOL_TLS_CLIENT 17 TLS_SERVER = ssl.PROTOCOL_TLS_SERVER 18 19 20class KeyUpdateType(enum.Enum): 21 UPDATE_NOT_REQUESTED = 0 22 UPDATE_REQUESTED = 1 23 24 25def create_ssl_ctx(protocol: SslProtocol, 26 *, 27 check_hostname: bool = False, 28 verify_cert: bool = False, 29 load_default_certs: bool = True, 30 cert_path: pathlib.PurePath | None = None, 31 key_path: pathlib.PurePath | None = None, 32 ca_path: pathlib.PurePath | None = None, 33 password: str | None = None 34 ) -> ssl.SSLContext: 35 ctx = ssl.SSLContext(protocol.value) 36 ctx.check_hostname = check_hostname 37 38 if verify_cert: 39 ctx.verify_mode = ssl.VerifyMode.CERT_REQUIRED 40 41 if load_default_certs: 42 ctx.load_default_certs(ssl.Purpose.CLIENT_AUTH 43 if protocol == SslProtocol.TLS_SERVER 44 else ssl.Purpose.SERVER_AUTH) 45 46 if ca_path: 47 ctx.load_verify_locations(cafile=str(ca_path)) 48 49 else: 50 ctx.verify_mode = ssl.VerifyMode.CERT_NONE 51 52 if cert_path: 53 ctx.load_cert_chain(certfile=str(cert_path), 54 keyfile=str(key_path) if key_path else None, 55 password=password) 56 57 return ctx 58 59 60def key_update(ssl_object: ssl.SSLObject, 61 update_type: KeyUpdateType): 62 if not _ssl: 63 raise Exception('not supported') 64 65 if not isinstance(ssl_object, ssl.SSLObject): 66 raise TypeError('invalid ssl object') 67 68 result = _ssl.key_update(ssl_object._sslobj, update_type.value) 69 if result != 1: 70 raise Exception('key update error') 71 72 73def renegotiate(ssl_object: ssl.SSLObject): 74 if not _ssl: 75 raise Exception('not supported') 76 77 if not isinstance(ssl_object, ssl.SSLObject): 78 raise TypeError('invalid ssl object') 79 80 result = _ssl.renegotiate(ssl_object._sslobj) 81 if result != 1: 82 raise Exception('renegotiate error') 83 84 85def get_peer_cert(ssl_object: ssl.SSLObject) -> typing.Optional['Cert']: 86 if not _ssl: 87 raise Exception('not supported') 88 89 if not isinstance(ssl_object, ssl.SSLObject): 90 raise TypeError('invalid ssl object') 91 92 handle = _ssl.get_peer_cert(ssl_object._sslobj) 93 if not handle: 94 return 95 96 return Cert(handle) 97 98 99def load_crl(path: pathlib.PurePath) -> 'Crl': 100 if not _ssl: 101 raise Exception('not supported') 102 103 handle = _ssl.load_crl(str(path)) 104 return Crl(handle) 105 106 107class Cert: 108 109 def __init__(self, handle): 110 self._handle = handle 111 112 def get_pub_key(self) -> 'PubKey': 113 handle = ssl.get_cert_pub_key(self._handle) 114 return PubKey(handle) 115 116 def get_bytes(self) -> bytes: 117 return ssl.get_cert_bytes(self._handle) 118 119 120class PubKey: 121 122 def __init__(self, handle): 123 self._handle = handle 124 125 def is_rsa(self) -> bool: 126 return ssl.is_pub_key_rsa(self._handle) 127 128 def get_size(self) -> int: 129 return ssl.get_pub_key_size(self._handle) 130 131 132class Crl: 133 134 def __init__(self, handle): 135 self._handle = handle 136 137 def contains_cert(self, cert: Cert) -> bool: 138 if not isinstance(cert, Cert): 139 raise TypeError('invalid cert') 140 141 return ssl.crl_contains_cert(self._handle, cert._handle)
class
SslProtocol(enum.Enum):
16class SslProtocol(enum.Enum): 17 TLS_CLIENT = ssl.PROTOCOL_TLS_CLIENT 18 TLS_SERVER = ssl.PROTOCOL_TLS_SERVER
An enumeration.
TLS_CLIENT =
<SslProtocol.TLS_CLIENT: <_SSLMethod.PROTOCOL_TLS_CLIENT: 16>>
TLS_SERVER =
<SslProtocol.TLS_SERVER: <_SSLMethod.PROTOCOL_TLS_SERVER: 17>>
class
KeyUpdateType(enum.Enum):
An enumeration.
UPDATE_NOT_REQUESTED =
<KeyUpdateType.UPDATE_NOT_REQUESTED: 0>
UPDATE_REQUESTED =
<KeyUpdateType.UPDATE_REQUESTED: 1>
def
create_ssl_ctx( protocol: SslProtocol, *, check_hostname: bool = False, verify_cert: bool = False, load_default_certs: bool = True, cert_path: pathlib.PurePath | None = None, key_path: pathlib.PurePath | None = None, ca_path: pathlib.PurePath | None = None, password: str | None = None) -> ssl.SSLContext:
26def create_ssl_ctx(protocol: SslProtocol, 27 *, 28 check_hostname: bool = False, 29 verify_cert: bool = False, 30 load_default_certs: bool = True, 31 cert_path: pathlib.PurePath | None = None, 32 key_path: pathlib.PurePath | None = None, 33 ca_path: pathlib.PurePath | None = None, 34 password: str | None = None 35 ) -> ssl.SSLContext: 36 ctx = ssl.SSLContext(protocol.value) 37 ctx.check_hostname = check_hostname 38 39 if verify_cert: 40 ctx.verify_mode = ssl.VerifyMode.CERT_REQUIRED 41 42 if load_default_certs: 43 ctx.load_default_certs(ssl.Purpose.CLIENT_AUTH 44 if protocol == SslProtocol.TLS_SERVER 45 else ssl.Purpose.SERVER_AUTH) 46 47 if ca_path: 48 ctx.load_verify_locations(cafile=str(ca_path)) 49 50 else: 51 ctx.verify_mode = ssl.VerifyMode.CERT_NONE 52 53 if cert_path: 54 ctx.load_cert_chain(certfile=str(cert_path), 55 keyfile=str(key_path) if key_path else None, 56 password=password) 57 58 return ctx
61def key_update(ssl_object: ssl.SSLObject, 62 update_type: KeyUpdateType): 63 if not _ssl: 64 raise Exception('not supported') 65 66 if not isinstance(ssl_object, ssl.SSLObject): 67 raise TypeError('invalid ssl object') 68 69 result = _ssl.key_update(ssl_object._sslobj, update_type.value) 70 if result != 1: 71 raise Exception('key update error')
def
renegotiate(ssl_object: ssl.SSLObject):
74def renegotiate(ssl_object: ssl.SSLObject): 75 if not _ssl: 76 raise Exception('not supported') 77 78 if not isinstance(ssl_object, ssl.SSLObject): 79 raise TypeError('invalid ssl object') 80 81 result = _ssl.renegotiate(ssl_object._sslobj) 82 if result != 1: 83 raise Exception('renegotiate error')
86def get_peer_cert(ssl_object: ssl.SSLObject) -> typing.Optional['Cert']: 87 if not _ssl: 88 raise Exception('not supported') 89 90 if not isinstance(ssl_object, ssl.SSLObject): 91 raise TypeError('invalid ssl object') 92 93 handle = _ssl.get_peer_cert(ssl_object._sslobj) 94 if not handle: 95 return 96 97 return Cert(handle)
class
Cert:
108class Cert: 109 110 def __init__(self, handle): 111 self._handle = handle 112 113 def get_pub_key(self) -> 'PubKey': 114 handle = ssl.get_cert_pub_key(self._handle) 115 return PubKey(handle) 116 117 def get_bytes(self) -> bytes: 118 return ssl.get_cert_bytes(self._handle)
class
PubKey:
class
Crl: