hat.drivers.ssl
1from ssl import * # NOQA 2 3import enum 4import pathlib 5import ssl 6import typing 7 8try: 9 from hat.drivers.ssl import _ssl 10 11except ImportError: 12 _ssl = None 13 14 15class SslProtocol(enum.Enum): 16 TLS_CLIENT = ssl.PROTOCOL_TLS_CLIENT 17 TLS_SERVER = ssl.PROTOCOL_TLS_SERVER 18 19 20class KeyUpdateType(enum.Enum): 21 UPDATE_NOT_REQUESTED = 0 22 UPDATE_REQUESTED = 1 23 24 25def create_ssl_ctx(protocol: SslProtocol, 26 verify_cert: bool = False, 27 cert_path: pathlib.PurePath | None = None, 28 key_path: pathlib.PurePath | None = None, 29 ca_path: pathlib.PurePath | None = None, 30 password: str | None = None 31 ) -> ssl.SSLContext: 32 ctx = ssl.SSLContext(protocol.value) 33 ctx.check_hostname = False 34 35 if verify_cert: 36 ctx.verify_mode = ssl.VerifyMode.CERT_REQUIRED 37 ctx.load_default_certs(ssl.Purpose.CLIENT_AUTH 38 if protocol == SslProtocol.TLS_SERVER 39 else ssl.Purpose.SERVER_AUTH) 40 if ca_path: 41 ctx.load_verify_locations(cafile=str(ca_path)) 42 43 else: 44 ctx.verify_mode = ssl.VerifyMode.CERT_NONE 45 46 if cert_path: 47 ctx.load_cert_chain(certfile=str(cert_path), 48 keyfile=str(key_path) if key_path else None, 49 password=password) 50 51 return ctx 52 53 54def key_update(ssl_object: ssl.SSLObject, 55 update_type: KeyUpdateType): 56 if not _ssl: 57 raise Exception('not supported') 58 59 if not isinstance(ssl_object, ssl.SSLObject): 60 raise TypeError('invalid ssl object') 61 62 result = _ssl.key_update(ssl_object._sslobj, update_type.value) 63 if result != 1: 64 raise Exception('key update error') 65 66 67def renegotiate(ssl_object: ssl.SSLObject): 68 if not _ssl: 69 raise Exception('not supported') 70 71 if not isinstance(ssl_object, ssl.SSLObject): 72 raise TypeError('invalid ssl object') 73 74 result = _ssl.renegotiate(ssl_object._sslobj) 75 if result != 1: 76 raise Exception('renegotiate error') 77 78 79def get_peer_cert(ssl_object: ssl.SSLObject) -> typing.Optional['Cert']: 80 if not _ssl: 81 raise Exception('not supported') 82 83 if not isinstance(ssl_object, ssl.SSLObject): 84 raise TypeError('invalid ssl object') 85 86 handle = _ssl.get_peer_cert(ssl_object._sslobj) 87 if not handle: 88 return 89 90 return Cert(handle) 91 92 93def load_crl(path: pathlib.PurePath) -> 'Crl': 94 if not _ssl: 95 raise Exception('not supported') 96 97 handle = _ssl.load_crl(str(path)) 98 return Crl(handle) 99 100 101class Cert: 102 103 def __init__(self, handle): 104 self._handle = handle 105 106 def get_pub_key(self) -> 'PubKey': 107 handle = ssl.get_cert_pub_key(self._handle) 108 return PubKey(handle) 109 110 def get_bytes(self) -> bytes: 111 return ssl.get_cert_bytes(self._handle) 112 113 114class PubKey: 115 116 def __init__(self, handle): 117 self._handle = handle 118 119 def is_rsa(self) -> bool: 120 return ssl.is_pub_key_rsa(self._handle) 121 122 def get_size(self) -> int: 123 return ssl.get_pub_key_size(self._handle) 124 125 126class Crl: 127 128 def __init__(self, handle): 129 self._handle = handle 130 131 def contains_cert(self, cert: Cert) -> bool: 132 if not isinstance(cert, Cert): 133 raise TypeError('invalid cert') 134 135 return ssl.crl_contains_cert(self._handle, cert._handle)
class
SslProtocol(enum.Enum):
16class SslProtocol(enum.Enum): 17 TLS_CLIENT = ssl.PROTOCOL_TLS_CLIENT 18 TLS_SERVER = ssl.PROTOCOL_TLS_SERVER
An enumeration.
TLS_CLIENT =
<SslProtocol.TLS_CLIENT: <_SSLMethod.PROTOCOL_TLS_CLIENT: 16>>
TLS_SERVER =
<SslProtocol.TLS_SERVER: <_SSLMethod.PROTOCOL_TLS_SERVER: 17>>
Inherited Members
- enum.Enum
- name
- value
class
KeyUpdateType(enum.Enum):
An enumeration.
UPDATE_NOT_REQUESTED =
<KeyUpdateType.UPDATE_NOT_REQUESTED: 0>
UPDATE_REQUESTED =
<KeyUpdateType.UPDATE_REQUESTED: 1>
Inherited Members
- enum.Enum
- name
- value
def
create_ssl_ctx( protocol: SslProtocol, verify_cert: bool = False, cert_path: pathlib.PurePath | None = None, key_path: pathlib.PurePath | None = None, ca_path: pathlib.PurePath | None = None, password: str | None = None) -> ssl.SSLContext:
26def create_ssl_ctx(protocol: SslProtocol, 27 verify_cert: bool = False, 28 cert_path: pathlib.PurePath | None = None, 29 key_path: pathlib.PurePath | None = None, 30 ca_path: pathlib.PurePath | None = None, 31 password: str | None = None 32 ) -> ssl.SSLContext: 33 ctx = ssl.SSLContext(protocol.value) 34 ctx.check_hostname = False 35 36 if verify_cert: 37 ctx.verify_mode = ssl.VerifyMode.CERT_REQUIRED 38 ctx.load_default_certs(ssl.Purpose.CLIENT_AUTH 39 if protocol == SslProtocol.TLS_SERVER 40 else ssl.Purpose.SERVER_AUTH) 41 if ca_path: 42 ctx.load_verify_locations(cafile=str(ca_path)) 43 44 else: 45 ctx.verify_mode = ssl.VerifyMode.CERT_NONE 46 47 if cert_path: 48 ctx.load_cert_chain(certfile=str(cert_path), 49 keyfile=str(key_path) if key_path else None, 50 password=password) 51 52 return ctx
55def key_update(ssl_object: ssl.SSLObject, 56 update_type: KeyUpdateType): 57 if not _ssl: 58 raise Exception('not supported') 59 60 if not isinstance(ssl_object, ssl.SSLObject): 61 raise TypeError('invalid ssl object') 62 63 result = _ssl.key_update(ssl_object._sslobj, update_type.value) 64 if result != 1: 65 raise Exception('key update error')
def
renegotiate(ssl_object: ssl.SSLObject):
68def renegotiate(ssl_object: ssl.SSLObject): 69 if not _ssl: 70 raise Exception('not supported') 71 72 if not isinstance(ssl_object, ssl.SSLObject): 73 raise TypeError('invalid ssl object') 74 75 result = _ssl.renegotiate(ssl_object._sslobj) 76 if result != 1: 77 raise Exception('renegotiate error')
80def get_peer_cert(ssl_object: ssl.SSLObject) -> typing.Optional['Cert']: 81 if not _ssl: 82 raise Exception('not supported') 83 84 if not isinstance(ssl_object, ssl.SSLObject): 85 raise TypeError('invalid ssl object') 86 87 handle = _ssl.get_peer_cert(ssl_object._sslobj) 88 if not handle: 89 return 90 91 return Cert(handle)
class
Cert:
102class Cert: 103 104 def __init__(self, handle): 105 self._handle = handle 106 107 def get_pub_key(self) -> 'PubKey': 108 handle = ssl.get_cert_pub_key(self._handle) 109 return PubKey(handle) 110 111 def get_bytes(self) -> bytes: 112 return ssl.get_cert_bytes(self._handle)
class
PubKey:
class
Crl: