hat.drivers.ssl

  1from ssl import *  # NOQA
  2
  3import enum
  4import pathlib
  5import ssl
  6import typing
  7
  8try:
  9    from hat.drivers.ssl import _ssl
 10
 11except ImportError:
 12    _ssl = None
 13
 14
 15class SslProtocol(enum.Enum):
 16    TLS_CLIENT = ssl.PROTOCOL_TLS_CLIENT
 17    TLS_SERVER = ssl.PROTOCOL_TLS_SERVER
 18
 19
 20class KeyUpdateType(enum.Enum):
 21    UPDATE_NOT_REQUESTED = 0
 22    UPDATE_REQUESTED = 1
 23
 24
 25def create_ssl_ctx(protocol: SslProtocol,
 26                   *,
 27                   check_hostname: bool = False,
 28                   verify_cert: bool = False,
 29                   load_default_certs: bool = True,
 30                   cert_path: pathlib.PurePath | None = None,
 31                   key_path: pathlib.PurePath | None = None,
 32                   ca_path: pathlib.PurePath | None = None,
 33                   password: str | None = None
 34                   ) -> ssl.SSLContext:
 35    ctx = ssl.SSLContext(protocol.value)
 36    ctx.check_hostname = check_hostname
 37
 38    if verify_cert:
 39        ctx.verify_mode = ssl.VerifyMode.CERT_REQUIRED
 40
 41        if load_default_certs:
 42            ctx.load_default_certs(ssl.Purpose.CLIENT_AUTH
 43                                   if protocol == SslProtocol.TLS_SERVER
 44                                   else ssl.Purpose.SERVER_AUTH)
 45
 46        if ca_path:
 47            ctx.load_verify_locations(cafile=str(ca_path))
 48
 49    else:
 50        ctx.verify_mode = ssl.VerifyMode.CERT_NONE
 51
 52    if cert_path:
 53        ctx.load_cert_chain(certfile=str(cert_path),
 54                            keyfile=str(key_path) if key_path else None,
 55                            password=password)
 56
 57    return ctx
 58
 59
 60def key_update(ssl_object: ssl.SSLObject,
 61               update_type: KeyUpdateType):
 62    if not _ssl:
 63        raise Exception('not supported')
 64
 65    if not isinstance(ssl_object, ssl.SSLObject):
 66        raise TypeError('invalid ssl object')
 67
 68    result = _ssl.key_update(ssl_object._sslobj, update_type.value)
 69    if result != 1:
 70        raise Exception('key update error')
 71
 72
 73def renegotiate(ssl_object: ssl.SSLObject):
 74    if not _ssl:
 75        raise Exception('not supported')
 76
 77    if not isinstance(ssl_object, ssl.SSLObject):
 78        raise TypeError('invalid ssl object')
 79
 80    result = _ssl.renegotiate(ssl_object._sslobj)
 81    if result != 1:
 82        raise Exception('renegotiate error')
 83
 84
 85def get_peer_cert(ssl_object: ssl.SSLObject) -> typing.Optional['Cert']:
 86    if not _ssl:
 87        raise Exception('not supported')
 88
 89    if not isinstance(ssl_object, ssl.SSLObject):
 90        raise TypeError('invalid ssl object')
 91
 92    handle = _ssl.get_peer_cert(ssl_object._sslobj)
 93    if not handle:
 94        return
 95
 96    return Cert(handle)
 97
 98
 99def load_crl(path: pathlib.PurePath) -> 'Crl':
100    if not _ssl:
101        raise Exception('not supported')
102
103    handle = _ssl.load_crl(str(path))
104    return Crl(handle)
105
106
107class Cert:
108
109    def __init__(self, handle):
110        self._handle = handle
111
112    def get_pub_key(self) -> 'PubKey':
113        handle = ssl.get_cert_pub_key(self._handle)
114        return PubKey(handle)
115
116    def get_bytes(self) -> bytes:
117        return ssl.get_cert_bytes(self._handle)
118
119
120class PubKey:
121
122    def __init__(self, handle):
123        self._handle = handle
124
125    def is_rsa(self) -> bool:
126        return ssl.is_pub_key_rsa(self._handle)
127
128    def get_size(self) -> int:
129        return ssl.get_pub_key_size(self._handle)
130
131
132class Crl:
133
134    def __init__(self, handle):
135        self._handle = handle
136
137    def contains_cert(self, cert: Cert) -> bool:
138        if not isinstance(cert, Cert):
139            raise TypeError('invalid cert')
140
141        return ssl.crl_contains_cert(self._handle, cert._handle)
class SslProtocol(enum.Enum):
16class SslProtocol(enum.Enum):
17    TLS_CLIENT = ssl.PROTOCOL_TLS_CLIENT
18    TLS_SERVER = ssl.PROTOCOL_TLS_SERVER

An enumeration.

TLS_CLIENT = <SslProtocol.TLS_CLIENT: <_SSLMethod.PROTOCOL_TLS_CLIENT: 16>>
TLS_SERVER = <SslProtocol.TLS_SERVER: <_SSLMethod.PROTOCOL_TLS_SERVER: 17>>
class KeyUpdateType(enum.Enum):
21class KeyUpdateType(enum.Enum):
22    UPDATE_NOT_REQUESTED = 0
23    UPDATE_REQUESTED = 1

An enumeration.

UPDATE_NOT_REQUESTED = <KeyUpdateType.UPDATE_NOT_REQUESTED: 0>
UPDATE_REQUESTED = <KeyUpdateType.UPDATE_REQUESTED: 1>
def create_ssl_ctx( protocol: SslProtocol, *, check_hostname: bool = False, verify_cert: bool = False, load_default_certs: bool = True, cert_path: pathlib.PurePath | None = None, key_path: pathlib.PurePath | None = None, ca_path: pathlib.PurePath | None = None, password: str | None = None) -> ssl.SSLContext:
26def create_ssl_ctx(protocol: SslProtocol,
27                   *,
28                   check_hostname: bool = False,
29                   verify_cert: bool = False,
30                   load_default_certs: bool = True,
31                   cert_path: pathlib.PurePath | None = None,
32                   key_path: pathlib.PurePath | None = None,
33                   ca_path: pathlib.PurePath | None = None,
34                   password: str | None = None
35                   ) -> ssl.SSLContext:
36    ctx = ssl.SSLContext(protocol.value)
37    ctx.check_hostname = check_hostname
38
39    if verify_cert:
40        ctx.verify_mode = ssl.VerifyMode.CERT_REQUIRED
41
42        if load_default_certs:
43            ctx.load_default_certs(ssl.Purpose.CLIENT_AUTH
44                                   if protocol == SslProtocol.TLS_SERVER
45                                   else ssl.Purpose.SERVER_AUTH)
46
47        if ca_path:
48            ctx.load_verify_locations(cafile=str(ca_path))
49
50    else:
51        ctx.verify_mode = ssl.VerifyMode.CERT_NONE
52
53    if cert_path:
54        ctx.load_cert_chain(certfile=str(cert_path),
55                            keyfile=str(key_path) if key_path else None,
56                            password=password)
57
58    return ctx
def key_update( ssl_object: ssl.SSLObject, update_type: KeyUpdateType):
61def key_update(ssl_object: ssl.SSLObject,
62               update_type: KeyUpdateType):
63    if not _ssl:
64        raise Exception('not supported')
65
66    if not isinstance(ssl_object, ssl.SSLObject):
67        raise TypeError('invalid ssl object')
68
69    result = _ssl.key_update(ssl_object._sslobj, update_type.value)
70    if result != 1:
71        raise Exception('key update error')
def renegotiate(ssl_object: ssl.SSLObject):
74def renegotiate(ssl_object: ssl.SSLObject):
75    if not _ssl:
76        raise Exception('not supported')
77
78    if not isinstance(ssl_object, ssl.SSLObject):
79        raise TypeError('invalid ssl object')
80
81    result = _ssl.renegotiate(ssl_object._sslobj)
82    if result != 1:
83        raise Exception('renegotiate error')
def get_peer_cert(ssl_object: ssl.SSLObject) -> Optional[Cert]:
86def get_peer_cert(ssl_object: ssl.SSLObject) -> typing.Optional['Cert']:
87    if not _ssl:
88        raise Exception('not supported')
89
90    if not isinstance(ssl_object, ssl.SSLObject):
91        raise TypeError('invalid ssl object')
92
93    handle = _ssl.get_peer_cert(ssl_object._sslobj)
94    if not handle:
95        return
96
97    return Cert(handle)
def load_crl(path: pathlib.PurePath) -> Crl:
100def load_crl(path: pathlib.PurePath) -> 'Crl':
101    if not _ssl:
102        raise Exception('not supported')
103
104    handle = _ssl.load_crl(str(path))
105    return Crl(handle)
class Cert:
108class Cert:
109
110    def __init__(self, handle):
111        self._handle = handle
112
113    def get_pub_key(self) -> 'PubKey':
114        handle = ssl.get_cert_pub_key(self._handle)
115        return PubKey(handle)
116
117    def get_bytes(self) -> bytes:
118        return ssl.get_cert_bytes(self._handle)
Cert(handle)
110    def __init__(self, handle):
111        self._handle = handle
def get_pub_key(self) -> PubKey:
113    def get_pub_key(self) -> 'PubKey':
114        handle = ssl.get_cert_pub_key(self._handle)
115        return PubKey(handle)
def get_bytes(self) -> bytes:
117    def get_bytes(self) -> bytes:
118        return ssl.get_cert_bytes(self._handle)
class PubKey:
121class PubKey:
122
123    def __init__(self, handle):
124        self._handle = handle
125
126    def is_rsa(self) -> bool:
127        return ssl.is_pub_key_rsa(self._handle)
128
129    def get_size(self) -> int:
130        return ssl.get_pub_key_size(self._handle)
PubKey(handle)
123    def __init__(self, handle):
124        self._handle = handle
def is_rsa(self) -> bool:
126    def is_rsa(self) -> bool:
127        return ssl.is_pub_key_rsa(self._handle)
def get_size(self) -> int:
129    def get_size(self) -> int:
130        return ssl.get_pub_key_size(self._handle)
class Crl:
133class Crl:
134
135    def __init__(self, handle):
136        self._handle = handle
137
138    def contains_cert(self, cert: Cert) -> bool:
139        if not isinstance(cert, Cert):
140            raise TypeError('invalid cert')
141
142        return ssl.crl_contains_cert(self._handle, cert._handle)
Crl(handle)
135    def __init__(self, handle):
136        self._handle = handle
def contains_cert(self, cert: Cert) -> bool:
138    def contains_cert(self, cert: Cert) -> bool:
139        if not isinstance(cert, Cert):
140            raise TypeError('invalid cert')
141
142        return ssl.crl_contains_cert(self._handle, cert._handle)