hat.drivers.iec60870.encodings.security
IEC 60870-5-7 security extension messages
1"""IEC 60870-5-7 security extension messages""" 2 3from hat.drivers.iec60870.encodings.security.common import ( 4 CauseSize, 5 AsduAddressSize, 6 IoAddressSize, 7 TimeSize, 8 Time, 9 time_from_datetime, 10 time_to_datetime, 11 OriginatorAddress, 12 AsduAddress, 13 IoAddress, 14 OtherCauseType, 15 BinaryCounterValue, 16 AssociationId, 17 SequenceNumber, 18 UserNumber, 19 AsduType, 20 MacAlgorithm, 21 KeyWrapAlgorithm, 22 KeyStatus, 23 ErrorCode, 24 KeyChangeMethod, 25 Operation, 26 UserRole, 27 CauseType, 28 Cause, 29 IoElement_S_IT_TC, 30 IoElement_S_CH_NA, 31 IoElement_S_RP_NA, 32 IoElement_S_AR_NA, 33 IoElement_S_KR_NA, 34 IoElement_S_KS_NA, 35 IoElement_S_KC_NA, 36 IoElement_S_ER_NA, 37 IoElement_S_UC_NA_X, 38 IoElement_S_US_NA, 39 IoElement_S_UQ_NA, 40 IoElement_S_UR_NA, 41 IoElement_S_UK_NA, 42 IoElement_S_UA_NA, 43 IoElement_S_UC_NA, 44 IoElement, 45 IO, 46 ASDU) 47from hat.drivers.iec60870.encodings.security.encoder import Encoder 48 49 50__all__ = ['CauseSize', 51 'AsduAddressSize', 52 'IoAddressSize', 53 'TimeSize', 54 'Time', 55 'time_from_datetime', 56 'time_to_datetime', 57 'OriginatorAddress', 58 'AsduAddress', 59 'IoAddress', 60 'OtherCauseType', 61 'BinaryCounterValue', 62 'AssociationId', 63 'SequenceNumber', 64 'UserNumber', 65 'AsduType', 66 'MacAlgorithm', 67 'KeyWrapAlgorithm', 68 'KeyStatus', 69 'ErrorCode', 70 'KeyChangeMethod', 71 'Operation', 72 'UserRole', 73 'CauseType', 74 'Cause', 75 'IoElement_S_IT_TC', 76 'IoElement_S_CH_NA', 77 'IoElement_S_RP_NA', 78 'IoElement_S_AR_NA', 79 'IoElement_S_KR_NA', 80 'IoElement_S_KS_NA', 81 'IoElement_S_KC_NA', 82 'IoElement_S_ER_NA', 83 'IoElement_S_UC_NA_X', 84 'IoElement_S_US_NA', 85 'IoElement_S_UQ_NA', 86 'IoElement_S_UR_NA', 87 'IoElement_S_UK_NA', 88 'IoElement_S_UA_NA', 89 'IoElement_S_UC_NA', 90 'IoElement', 91 'IO', 92 'ASDU', 93 'Encoder']
An enumeration.
An enumeration.
An enumeration.
An enumeration.
31class Time(typing.NamedTuple): 32 size: TimeSize 33 milliseconds: int 34 """milliseconds in range [0, 59999]""" 35 invalid: bool | None 36 """available for size THREE, FOUR, SEVEN""" 37 minutes: int | None 38 """available for size THREE, FOUR, SEVEN (minutes in range [0, 59])""" 39 summer_time: bool | None 40 """available for size FOUR, SEVEN""" 41 hours: int | None 42 """available for size FOUR, SEVEN (hours in range [0, 23])""" 43 day_of_week: int | None 44 """available for size SEVEN (day_of_week in range [1, 7])""" 45 day_of_month: int | None 46 """available for size SEVEN (day_of_month in range [1, 31])""" 47 months: int | None 48 """available for size SEVEN (months in range [1, 12])""" 49 years: int | None 50 """available for size SEVEN (years in range [0, 99])"""
Time(size, milliseconds, invalid, minutes, summer_time, hours, day_of_week, day_of_month, months, years)
Create new instance of Time(size, milliseconds, invalid, minutes, summer_time, hours, day_of_week, day_of_month, months, years)
66def time_from_datetime(dt: datetime.datetime, 67 invalid: bool = False 68 ) -> Time: 69 """Create Time from datetime.datetime""" 70 # TODO document edge cases (local time, os implementation, ...) 71 # rounding microseconds to the nearest millisecond 72 dt_rounded = ( 73 dt.replace(microsecond=0) + 74 datetime.timedelta(milliseconds=round(dt.microsecond / 1000))) 75 local_time = time.localtime(dt_rounded.timestamp()) 76 77 return Time( 78 size=TimeSize.SEVEN, 79 milliseconds=(local_time.tm_sec * 1000 + 80 dt_rounded.microsecond // 1000), 81 invalid=invalid, 82 minutes=local_time.tm_min, 83 summer_time=bool(local_time.tm_isdst), 84 hours=local_time.tm_hour, 85 day_of_week=local_time.tm_wday + 1, 86 day_of_month=local_time.tm_mday, 87 months=local_time.tm_mon, 88 years=local_time.tm_year % 100)
Create Time from datetime.datetime
91def time_to_datetime(t: Time 92 ) -> datetime.datetime: 93 """Convert Time to datetime.datetime""" 94 # TODO document edge cases (local time, os implementation, ...) 95 # TODO maybe allow diferent time size (use now for time) 96 if t.size != TimeSize.SEVEN: 97 raise ValueError('unsupported time size') 98 99 local_dt = datetime.datetime( 100 year=2000 + t.years if t.years < 70 else 1900 + t.years, 101 month=t.months, 102 day=t.day_of_month, 103 hour=t.hours, 104 minute=t.minutes, 105 second=int(t.milliseconds / 1000), 106 microsecond=(t.milliseconds % 1000) * 1000, 107 fold=not t.summer_time) 108 109 return local_dt.astimezone(tz=datetime.timezone.utc)
Convert Time to datetime.datetime
239class BinaryCounterValue(typing.NamedTuple): 240 value: int 241 """value in range [-2^31, 2^31-1]"""
BinaryCounterValue(value,)
37class AsduType(enum.Enum): 38 S_IT_TC = 41 39 S_CH_NA = 81 40 S_RP_NA = 82 41 S_AR_NA = 83 42 S_KR_NA = 84 43 S_KS_NA = 85 44 S_KC_NA = 86 45 S_ER_NA = 87 46 S_UC_NA_X = 88 47 S_US_NA = 90 48 S_UQ_NA = 91 49 S_UR_NA = 92 50 S_UK_NA = 93 51 S_UA_NA = 94 52 S_UC_NA = 95
An enumeration.
55class MacAlgorithm(enum.Enum): 56 NO_MAC = 0 57 HMAC_SHA_256_8 = 3 58 HMAC_SHA_256_16 = 4 59 AES_GMAC = 6
An enumeration.
An enumeration.
An enumeration.
74class ErrorCode(enum.Enum): 75 AUTHENTICATION_FAILED = 1 76 AGGRESSIVE_NOT_PERMITTED = 4 77 MAC_ALGORITHM_NOT_PERMITTED = 5 78 KEY_WRAP_ALGORITHM_NOT_PERMITTED = 6 79 AUTHORIZATION_FAILED = 7 80 UPDATE_KEY_NOT_PERMITTED = 8 81 INVALID_SIGNATURE = 9 82 INVALID_CERTIFICATION = 10 83 UNKNOWN_USER = 11
An enumeration.
86class KeyChangeMethod(enum.Enum): 87 SYMMETRIC_AES_128_HMAC_SHA_1 = 3 88 SYMMETRIC_AES_256_HMAC_SHA_256 = 4 89 SYMMETRIC_AES_256_AES_GMAC = 5 90 ASYMMETRIC_RSE_2048_DSA_SHA_1_HMAC_SHA_1 = 67 91 ASYMMETRIC_RSE_2048_DSA_SHA_256_HMAC_SHA_256 = 68 92 ASYMMETRIC_RSE_3072_DSA_SHA_256_HMAC_SHA_256 = 69 93 ASYMMETRIC_RSE_2048_DSA_SHA_256_AES_GMAC = 70 94 ASYMMETRIC_RSE_3072_DSA_SHA_256_AES_GMAC = 71
An enumeration.
An enumeration.
103class UserRole(enum.Enum): 104 VIEWER = 0 105 OPERATOR = 1 106 ENGINEER = 2 107 INSTALLER = 3 108 SECADM = 4 109 SECAUD = 5 110 RBACMNT = 6
An enumeration.
An enumeration.
120class Cause(typing.NamedTuple): 121 type: CauseType | OtherCauseType 122 is_negative_confirm: bool 123 is_test: bool 124 originator_address: OriginatorAddress
Cause(type, is_negative_confirm, is_test, originator_address)
Create new instance of Cause(type, is_negative_confirm, is_test, originator_address)
127class IoElement_S_IT_TC(typing.NamedTuple): 128 association_id: AssociationId 129 value: BinaryCounterValue
IoElement_S_IT_TC(association_id, value)
Create new instance of IoElement_S_IT_TC(association_id, value)
132class IoElement_S_CH_NA(typing.NamedTuple): 133 sequence: SequenceNumber 134 user: UserNumber 135 mac_algorithm: MacAlgorithm | int 136 """MAC algorithm can be value in range [0, 255]""" 137 reason: int 138 """reason in range [0, 255] - only valid value is 1""" 139 data: util.Bytes 140 """data length in range [4, 65535]"""
IoElement_S_CH_NA(sequence, user, mac_algorithm, reason, data)
Create new instance of IoElement_S_CH_NA(sequence, user, mac_algorithm, reason, data)
143class IoElement_S_RP_NA(typing.NamedTuple): 144 sequence: SequenceNumber 145 user: UserNumber 146 mac: util.Bytes
IoElement_S_RP_NA(sequence, user, mac)
149class IoElement_S_AR_NA(typing.NamedTuple): 150 asdu: util.Bytes 151 sequence: SequenceNumber 152 user: UserNumber 153 mac: util.Bytes
IoElement_S_AR_NA(asdu, sequence, user, mac)
IoElement_S_KR_NA(user,)
160class IoElement_S_KS_NA(typing.NamedTuple): 161 sequence: SequenceNumber 162 user: UserNumber 163 key_wrap_algorithm: KeyWrapAlgorithm | int 164 """Key wrap algorithm can be value in range [0, 255]""" 165 key_status: KeyStatus | int 166 """Key status can be value in range [0, 255]""" 167 mac_algorithm: MacAlgorithm | int 168 """MAC algorithm can be value in range [0, 255]""" 169 data: util.Bytes 170 """data length in range [8, 65535]""" 171 mac: util.Bytes
IoElement_S_KS_NA(sequence, user, key_wrap_algorithm, key_status, mac_algorithm, data, mac)
Create new instance of IoElement_S_KS_NA(sequence, user, key_wrap_algorithm, key_status, mac_algorithm, data, mac)
174class IoElement_S_KC_NA(typing.NamedTuple): 175 sequence: SequenceNumber 176 user: UserNumber 177 wrapped_key: util.Bytes 178 """wrapped key length in range [8, 65535]"""
IoElement_S_KC_NA(sequence, user, wrapped_key)
181class IoElement_S_ER_NA(typing.NamedTuple): 182 challenge_sequence: SequenceNumber 183 key_change_sequence: SequenceNumber 184 user: UserNumber 185 association_id: AssociationId 186 code: ErrorCode | int 187 """Code can be value in range [0, 255]""" 188 time: Time 189 """Time size SEVEN""" 190 text: util.Bytes 191 """Text length in range [0, 65535]"""
IoElement_S_ER_NA(challenge_sequence, key_change_sequence, user, association_id, code, time, text)
194class IoElement_S_UC_NA_X(typing.NamedTuple): 195 key_change_method: KeyChangeMethod | int 196 """Key change method can be value in range [0, 255]""" 197 data: util.Bytes 198 """Data length in range [0, 65535]"""
IoElement_S_UC_NA_X(key_change_method, data)
Create new instance of IoElement_S_UC_NA_X(key_change_method, data)
201class IoElement_S_US_NA(typing.NamedTuple): 202 key_change_method: KeyChangeMethod | int 203 """Key change method can be value in range [0, 255]""" 204 operation: Operation | int 205 """Operation can be value in range [0, 255]""" 206 sequence: SequenceNumber 207 role: UserRole | int 208 """Role can be value in range [0, 65535]""" 209 role_expiry: int 210 """Role expiry in range [0, 65535]""" 211 name: util.Bytes 212 """Name length in range [0, 65535]""" 213 public_key: util.Bytes 214 """Public key length in range [0, 65535]""" 215 certification: util.Bytes 216 """Certification length in range [0, 65535]"""
IoElement_S_US_NA(key_change_method, operation, sequence, role, role_expiry, name, public_key, certification)
Create new instance of IoElement_S_US_NA(key_change_method, operation, sequence, role, role_expiry, name, public_key, certification)
219class IoElement_S_UQ_NA(typing.NamedTuple): 220 key_change_method: KeyChangeMethod | int 221 """Key change method can be value in range [0, 255]""" 222 name: util.Bytes 223 """Name length in range [0, 65535]""" 224 data: util.Bytes 225 """Data length in range [4, 65535]"""
IoElement_S_UQ_NA(key_change_method, name, data)
Create new instance of IoElement_S_UQ_NA(key_change_method, name, data)
228class IoElement_S_UR_NA(typing.NamedTuple): 229 sequence: SequenceNumber 230 user: UserNumber 231 data: util.Bytes 232 """Data length in range [4, 65535]"""
IoElement_S_UR_NA(sequence, user, data)
235class IoElement_S_UK_NA(typing.NamedTuple): 236 sequence: SequenceNumber 237 user: UserNumber 238 encrypted_update_key: util.Bytes 239 """Encrypted update key length in range [16, 65535]""" 240 mac: util.Bytes
IoElement_S_UK_NA(sequence, user, encrypted_update_key, mac)
Create new instance of IoElement_S_UK_NA(sequence, user, encrypted_update_key, mac)
243class IoElement_S_UA_NA(typing.NamedTuple): 244 sequence: SequenceNumber 245 user: UserNumber 246 encrypted_update_key: util.Bytes 247 """Encrypted update key length in range [16, 65535]""" 248 signature: util.Bytes
IoElement_S_UA_NA(sequence, user, encrypted_update_key, signature)
Create new instance of IoElement_S_UA_NA(sequence, user, encrypted_update_key, signature)
IoElement_S_UC_NA(mac,)
272class IO(typing.NamedTuple): 273 address: IoAddress | None 274 element: IoElement 275 time: Time | None 276 """Time size SEVEN"""
IO(address, element, time)
Create new instance of IO(address, element, time)
Alias for field number 1
279class ASDU(typing.NamedTuple): 280 type: AsduType 281 cause: Cause 282 address: AsduAddress 283 ios: list[IO]
ASDU(type, cause, address, ios)
23class Encoder: 24 25 def __init__(self, encoder: iec101.Encoder | iec104.Encoder): 26 self._encoder = encoder 27 self._buffer = None 28 29 def decode_asdu(self, 30 asdu_bytes: util.Bytes 31 ) -> tuple[ASDU | None, util.Bytes]: 32 asdu_type = asdu_bytes[0] 33 with contextlib.suppress(ValueError): 34 asdu_type = common.AsduType(asdu_type) 35 36 if isinstance(asdu_type, common.AsduType): 37 return self._decode_asdu(asdu_bytes, asdu_type) 38 39 if self._buffer: 40 mlog.warning('unsegmented asdu - discarding buffer') 41 self._buffer = None 42 43 return self._encoder.decode_asdu(asdu_bytes) 44 45 def encode_asdu(self, asdu: ASDU) -> list[util.Bytes]: 46 if isinstance(asdu, common.ASDU): 47 return list(self._encode_asdu(asdu)) 48 49 return [self._encoder.encode_asdu(asdu)] 50 51 def _decode_asdu(self, asdu_bytes, asdu_type): 52 io_count, rest = _decode_io_count(asdu_bytes[1:], asdu_type) 53 cause, rest = _decode_cause(rest, self._encoder.cause_size) 54 asdu_address, rest = _decode_int(rest, 55 self._encoder.asdu_address_size.value) 56 57 ios = collections.deque() 58 for _ in range(io_count): 59 if asdu_type in _unsegmented_asdu_types: 60 io, rest = self._decode_unsegmented_io(rest, asdu_type) 61 62 else: 63 io, rest = self._decode_segmented_io(rest, asdu_type, cause, 64 asdu_address) 65 66 if io: 67 ios.append(io) 68 69 if asdu_type != common.AsduType.S_IT_TC and not ios: 70 return None, rest 71 72 asdu = common.ASDU(type=asdu_type, 73 cause=cause, 74 address=asdu_address, 75 ios=list(ios)) 76 return asdu, rest 77 78 def _encode_asdu(self, asdu): 79 identifier = collections.deque() 80 identifier.append(asdu.type.value) 81 identifier.append(len(asdu.ios)) 82 identifier.extend(_encode_cause(asdu.cause, self._encoder.cause_size)) 83 identifier.extend(_encode_int(asdu.address, 84 self._encoder.asdu_address_size.value)) 85 86 parts = collections.deque() 87 for io in asdu.ios: 88 if io.address is not None: 89 parts.append( 90 _encode_int(io.address, 91 self._encoder.io_address_size.value)) 92 93 parts.append(_encode_io_element(io.element, self._encoder)) 94 95 if io.time is not None: 96 parts.append(encode_time(io.time, common.TimeSize.SEVEN)) 97 98 rest = itertools.chain.from_iterable(parts) 99 100 if asdu.type in _unsegmented_asdu_types: 101 yield bytes(itertools.chain(identifier, rest)) 102 103 else: 104 first = True 105 segment = 0 106 rest = memoryview(bytes(rest)) 107 max_size = self._encoder.max_asdu_size - len(identifier) - 1 108 109 while rest: 110 data, rest = rest[:max_size], rest[max_size:] 111 last = not rest 112 113 yield bytes(itertools.chain(identifier, 114 [(0x40 if first else 0) | 115 (0x80 if last else 0) | 116 segment], 117 data)) 118 119 first = False 120 segment = (segment + 1) % 64 121 122 def _decode_unsegmented_io(self, io_bytes, asdu_type): 123 if self._buffer: 124 mlog.warning('unsegmented asdu - discarding buffer') 125 self._buffer = None 126 127 if asdu_type == common.AsduType.S_IT_TC: 128 io_address, rest = _decode_int( 129 io_bytes, self._encoder.io_address_size.value) 130 element, rest = _decode_io_element(rest, self._encoder, 131 asdu_type) 132 time, rest = (decode_time(rest, common.TimeSize.SEVEN), 133 rest[7:]) 134 135 else: 136 io_address = None 137 element, rest = _decode_io_element(io_bytes, self._encoder, 138 asdu_type) 139 time = None 140 141 io = common.IO(address=io_address, 142 element=element, 143 time=time) 144 return io, rest 145 146 def _decode_segmented_io(self, io_bytes, asdu_type, cause, asdu_address): 147 first = bool(io_bytes[0] & 0x40) 148 last = bool(io_bytes[0] & 0x80) 149 segment = io_bytes[0] & 0x3f 150 rest = io_bytes[1:] 151 152 if first: 153 if self._buffer: 154 mlog.warning('new first segment - discarding buffer') 155 156 self._buffer = _Buffer(asdu_type=asdu_type, 157 cause=cause, 158 asdu_address=asdu_address, 159 prev_first=first, 160 prev_segment=segment, 161 prev_io_bytes=rest, 162 all_io_bytes=rest) 163 164 else: 165 if not self._buffer: 166 mlog.warning('empty buffer - discarding segment') 167 return None, b'' 168 169 elif self._buffer.asdu_type != asdu_type: 170 mlog.warning('asdu type not matching - ' 171 'discarding segment and buffer') 172 self._buffer = None 173 return None, b'' 174 175 elif self._buffer.cause != cause: 176 mlog.warning('cause not matching - ' 177 'discarding segment and buffer') 178 self._buffer = None 179 return None, b'' 180 181 elif self._buffer.asdu_address != asdu_address: 182 mlog.warning('asdu address not matching - ' 183 'discarding segment and buffer') 184 self._buffer = None 185 return None, b'' 186 187 elif ((self._buffer.prev_segment + 1) % 64) != segment: 188 if (self._buffer.prev_first == first and 189 self._buffer.prev_segment == segment and 190 self._buffer.prev_io_bytes == rest): 191 mlog.warning('duplicated segment - discarding segment') 192 193 else: 194 mlog.warning('segment number not matching - ' 195 'discarding segment and buffer') 196 self._buffer = None 197 198 return None, b'' 199 200 self._buffer = self._buffer._replace( 201 prev_first=first, 202 prev_segment=segment, 203 prev_io_bytes=rest, 204 all_io_bytes=itertools.chain(self._buffer.all_io_bytes, rest)) 205 206 if not last: 207 return None, b'' 208 209 all_io_bytes = (self._buffer.all_io_bytes 210 if isinstance(self._buffer.all_io_bytes, memoryview) 211 else memoryview(bytes(self._buffer.all_io_bytes))) 212 self._buffer = None 213 214 element, rest = _decode_io_element(all_io_bytes, self._encoder, 215 asdu_type) 216 io = common.IO(address=None, 217 element=element, 218 time=None) 219 return io, rest
29 def decode_asdu(self, 30 asdu_bytes: util.Bytes 31 ) -> tuple[ASDU | None, util.Bytes]: 32 asdu_type = asdu_bytes[0] 33 with contextlib.suppress(ValueError): 34 asdu_type = common.AsduType(asdu_type) 35 36 if isinstance(asdu_type, common.AsduType): 37 return self._decode_asdu(asdu_bytes, asdu_type) 38 39 if self._buffer: 40 mlog.warning('unsegmented asdu - discarding buffer') 41 self._buffer = None 42 43 return self._encoder.decode_asdu(asdu_bytes)