hat.drivers.ssl

  1from ssl import *  # NOQA
  2
  3import enum
  4import pathlib
  5import ssl
  6import typing
  7
  8try:
  9    from hat.drivers.ssl import _ssl
 10
 11except ImportError:
 12    _ssl = None
 13
 14
 15class SslProtocol(enum.Enum):
 16    TLS_CLIENT = ssl.PROTOCOL_TLS_CLIENT
 17    TLS_SERVER = ssl.PROTOCOL_TLS_SERVER
 18
 19
 20class KeyUpdateType(enum.Enum):
 21    UPDATE_NOT_REQUESTED = 0
 22    UPDATE_REQUESTED = 1
 23
 24
 25def create_ssl_ctx(protocol: SslProtocol,
 26                   verify_cert: bool = False,
 27                   cert_path: pathlib.PurePath | None = None,
 28                   key_path: pathlib.PurePath | None = None,
 29                   ca_path: pathlib.PurePath | None = None,
 30                   password: str | None = None
 31                   ) -> ssl.SSLContext:
 32    ctx = ssl.SSLContext(protocol.value)
 33    ctx.check_hostname = False
 34
 35    if verify_cert:
 36        ctx.verify_mode = ssl.VerifyMode.CERT_REQUIRED
 37        ctx.load_default_certs(ssl.Purpose.CLIENT_AUTH
 38                               if protocol == SslProtocol.TLS_SERVER
 39                               else ssl.Purpose.SERVER_AUTH)
 40        if ca_path:
 41            ctx.load_verify_locations(cafile=str(ca_path))
 42
 43    else:
 44        ctx.verify_mode = ssl.VerifyMode.CERT_NONE
 45
 46    if cert_path:
 47        ctx.load_cert_chain(certfile=str(cert_path),
 48                            keyfile=str(key_path) if key_path else None,
 49                            password=password)
 50
 51    return ctx
 52
 53
 54def key_update(ssl_object: ssl.SSLObject,
 55               update_type: KeyUpdateType):
 56    if not _ssl:
 57        raise Exception('not supported')
 58
 59    if not isinstance(ssl_object, ssl.SSLObject):
 60        raise TypeError('invalid ssl object')
 61
 62    result = _ssl.key_update(ssl_object._sslobj, update_type.value)
 63    if result != 1:
 64        raise Exception('key update error')
 65
 66
 67def renegotiate(ssl_object: ssl.SSLObject):
 68    if not _ssl:
 69        raise Exception('not supported')
 70
 71    if not isinstance(ssl_object, ssl.SSLObject):
 72        raise TypeError('invalid ssl object')
 73
 74    result = _ssl.renegotiate(ssl_object._sslobj)
 75    if result != 1:
 76        raise Exception('renegotiate error')
 77
 78
 79def get_peer_cert(ssl_object: ssl.SSLObject) -> typing.Optional['Cert']:
 80    if not _ssl:
 81        raise Exception('not supported')
 82
 83    if not isinstance(ssl_object, ssl.SSLObject):
 84        raise TypeError('invalid ssl object')
 85
 86    handle = _ssl.get_peer_cert(ssl_object._sslobj)
 87    if not handle:
 88        return
 89
 90    return Cert(handle)
 91
 92
 93def load_crl(path: pathlib.PurePath) -> 'Crl':
 94    if not _ssl:
 95        raise Exception('not supported')
 96
 97    handle = _ssl.load_crl(str(path))
 98    return Crl(handle)
 99
100
101class Cert:
102
103    def __init__(self, handle):
104        self._handle = handle
105
106    def get_pub_key(self) -> 'PubKey':
107        handle = ssl.get_cert_pub_key(self._handle)
108        return PubKey(handle)
109
110    def get_bytes(self) -> bytes:
111        return ssl.get_cert_bytes(self._handle)
112
113
114class PubKey:
115
116    def __init__(self, handle):
117        self._handle = handle
118
119    def is_rsa(self) -> bool:
120        return ssl.is_pub_key_rsa(self._handle)
121
122    def get_size(self) -> int:
123        return ssl.get_pub_key_size(self._handle)
124
125
126class Crl:
127
128    def __init__(self, handle):
129        self._handle = handle
130
131    def contains_cert(self, cert: Cert) -> bool:
132        if not isinstance(cert, Cert):
133            raise TypeError('invalid cert')
134
135        return ssl.crl_contains_cert(self._handle, cert._handle)
class SslProtocol(enum.Enum):
16class SslProtocol(enum.Enum):
17    TLS_CLIENT = ssl.PROTOCOL_TLS_CLIENT
18    TLS_SERVER = ssl.PROTOCOL_TLS_SERVER

An enumeration.

TLS_CLIENT = <SslProtocol.TLS_CLIENT: <_SSLMethod.PROTOCOL_TLS_CLIENT: 16>>
TLS_SERVER = <SslProtocol.TLS_SERVER: <_SSLMethod.PROTOCOL_TLS_SERVER: 17>>
Inherited Members
enum.Enum
name
value
class KeyUpdateType(enum.Enum):
21class KeyUpdateType(enum.Enum):
22    UPDATE_NOT_REQUESTED = 0
23    UPDATE_REQUESTED = 1

An enumeration.

UPDATE_NOT_REQUESTED = <KeyUpdateType.UPDATE_NOT_REQUESTED: 0>
UPDATE_REQUESTED = <KeyUpdateType.UPDATE_REQUESTED: 1>
Inherited Members
enum.Enum
name
value
def create_ssl_ctx( protocol: SslProtocol, verify_cert: bool = False, cert_path: pathlib.PurePath | None = None, key_path: pathlib.PurePath | None = None, ca_path: pathlib.PurePath | None = None, password: str | None = None) -> ssl.SSLContext:
26def create_ssl_ctx(protocol: SslProtocol,
27                   verify_cert: bool = False,
28                   cert_path: pathlib.PurePath | None = None,
29                   key_path: pathlib.PurePath | None = None,
30                   ca_path: pathlib.PurePath | None = None,
31                   password: str | None = None
32                   ) -> ssl.SSLContext:
33    ctx = ssl.SSLContext(protocol.value)
34    ctx.check_hostname = False
35
36    if verify_cert:
37        ctx.verify_mode = ssl.VerifyMode.CERT_REQUIRED
38        ctx.load_default_certs(ssl.Purpose.CLIENT_AUTH
39                               if protocol == SslProtocol.TLS_SERVER
40                               else ssl.Purpose.SERVER_AUTH)
41        if ca_path:
42            ctx.load_verify_locations(cafile=str(ca_path))
43
44    else:
45        ctx.verify_mode = ssl.VerifyMode.CERT_NONE
46
47    if cert_path:
48        ctx.load_cert_chain(certfile=str(cert_path),
49                            keyfile=str(key_path) if key_path else None,
50                            password=password)
51
52    return ctx
def key_update( ssl_object: ssl.SSLObject, update_type: KeyUpdateType):
55def key_update(ssl_object: ssl.SSLObject,
56               update_type: KeyUpdateType):
57    if not _ssl:
58        raise Exception('not supported')
59
60    if not isinstance(ssl_object, ssl.SSLObject):
61        raise TypeError('invalid ssl object')
62
63    result = _ssl.key_update(ssl_object._sslobj, update_type.value)
64    if result != 1:
65        raise Exception('key update error')
def renegotiate(ssl_object: ssl.SSLObject):
68def renegotiate(ssl_object: ssl.SSLObject):
69    if not _ssl:
70        raise Exception('not supported')
71
72    if not isinstance(ssl_object, ssl.SSLObject):
73        raise TypeError('invalid ssl object')
74
75    result = _ssl.renegotiate(ssl_object._sslobj)
76    if result != 1:
77        raise Exception('renegotiate error')
def get_peer_cert(ssl_object: ssl.SSLObject) -> Optional[Cert]:
80def get_peer_cert(ssl_object: ssl.SSLObject) -> typing.Optional['Cert']:
81    if not _ssl:
82        raise Exception('not supported')
83
84    if not isinstance(ssl_object, ssl.SSLObject):
85        raise TypeError('invalid ssl object')
86
87    handle = _ssl.get_peer_cert(ssl_object._sslobj)
88    if not handle:
89        return
90
91    return Cert(handle)
def load_crl(path: pathlib.PurePath) -> Crl:
94def load_crl(path: pathlib.PurePath) -> 'Crl':
95    if not _ssl:
96        raise Exception('not supported')
97
98    handle = _ssl.load_crl(str(path))
99    return Crl(handle)
class Cert:
102class Cert:
103
104    def __init__(self, handle):
105        self._handle = handle
106
107    def get_pub_key(self) -> 'PubKey':
108        handle = ssl.get_cert_pub_key(self._handle)
109        return PubKey(handle)
110
111    def get_bytes(self) -> bytes:
112        return ssl.get_cert_bytes(self._handle)
Cert(handle)
104    def __init__(self, handle):
105        self._handle = handle
def get_pub_key(self) -> PubKey:
107    def get_pub_key(self) -> 'PubKey':
108        handle = ssl.get_cert_pub_key(self._handle)
109        return PubKey(handle)
def get_bytes(self) -> bytes:
111    def get_bytes(self) -> bytes:
112        return ssl.get_cert_bytes(self._handle)
class PubKey:
115class PubKey:
116
117    def __init__(self, handle):
118        self._handle = handle
119
120    def is_rsa(self) -> bool:
121        return ssl.is_pub_key_rsa(self._handle)
122
123    def get_size(self) -> int:
124        return ssl.get_pub_key_size(self._handle)
PubKey(handle)
117    def __init__(self, handle):
118        self._handle = handle
def is_rsa(self) -> bool:
120    def is_rsa(self) -> bool:
121        return ssl.is_pub_key_rsa(self._handle)
def get_size(self) -> int:
123    def get_size(self) -> int:
124        return ssl.get_pub_key_size(self._handle)
class Crl:
127class Crl:
128
129    def __init__(self, handle):
130        self._handle = handle
131
132    def contains_cert(self, cert: Cert) -> bool:
133        if not isinstance(cert, Cert):
134            raise TypeError('invalid cert')
135
136        return ssl.crl_contains_cert(self._handle, cert._handle)
Crl(handle)
129    def __init__(self, handle):
130        self._handle = handle
def contains_cert(self, cert: Cert) -> bool:
132    def contains_cert(self, cert: Cert) -> bool:
133        if not isinstance(cert, Cert):
134            raise TypeError('invalid cert')
135
136        return ssl.crl_contains_cert(self._handle, cert._handle)