hat.drivers.iec60870.encodings.security

IEC 60870-5-7 security extension messages

 1"""IEC 60870-5-7 security extension messages"""
 2
 3from hat.drivers.iec60870.encodings.security.common import (
 4    CauseSize,
 5    AsduAddressSize,
 6    IoAddressSize,
 7    TimeSize,
 8    Time,
 9    time_from_datetime,
10    time_to_datetime,
11    OriginatorAddress,
12    AsduAddress,
13    IoAddress,
14    OtherCauseType,
15    BinaryCounterValue,
16    AssociationId,
17    SequenceNumber,
18    UserNumber,
19    AsduType,
20    MacAlgorithm,
21    KeyWrapAlgorithm,
22    KeyStatus,
23    ErrorCode,
24    KeyChangeMethod,
25    Operation,
26    UserRole,
27    CauseType,
28    Cause,
29    IoElement_S_IT_TC,
30    IoElement_S_CH_NA,
31    IoElement_S_RP_NA,
32    IoElement_S_AR_NA,
33    IoElement_S_KR_NA,
34    IoElement_S_KS_NA,
35    IoElement_S_KC_NA,
36    IoElement_S_ER_NA,
37    IoElement_S_UC_NA_X,
38    IoElement_S_US_NA,
39    IoElement_S_UQ_NA,
40    IoElement_S_UR_NA,
41    IoElement_S_UK_NA,
42    IoElement_S_UA_NA,
43    IoElement_S_UC_NA,
44    IoElement,
45    IO,
46    ASDU)
47from hat.drivers.iec60870.encodings.security.encoder import Encoder
48
49
50__all__ = ['CauseSize',
51           'AsduAddressSize',
52           'IoAddressSize',
53           'TimeSize',
54           'Time',
55           'time_from_datetime',
56           'time_to_datetime',
57           'OriginatorAddress',
58           'AsduAddress',
59           'IoAddress',
60           'OtherCauseType',
61           'BinaryCounterValue',
62           'AssociationId',
63           'SequenceNumber',
64           'UserNumber',
65           'AsduType',
66           'MacAlgorithm',
67           'KeyWrapAlgorithm',
68           'KeyStatus',
69           'ErrorCode',
70           'KeyChangeMethod',
71           'Operation',
72           'UserRole',
73           'CauseType',
74           'Cause',
75           'IoElement_S_IT_TC',
76           'IoElement_S_CH_NA',
77           'IoElement_S_RP_NA',
78           'IoElement_S_AR_NA',
79           'IoElement_S_KR_NA',
80           'IoElement_S_KS_NA',
81           'IoElement_S_KC_NA',
82           'IoElement_S_ER_NA',
83           'IoElement_S_UC_NA_X',
84           'IoElement_S_US_NA',
85           'IoElement_S_UQ_NA',
86           'IoElement_S_UR_NA',
87           'IoElement_S_UK_NA',
88           'IoElement_S_UA_NA',
89           'IoElement_S_UC_NA',
90           'IoElement',
91           'IO',
92           'ASDU',
93           'Encoder']
class CauseSize(enum.Enum):
 8class CauseSize(enum.Enum):
 9    ONE = 1
10    TWO = 2

An enumeration.

ONE = <CauseSize.ONE: 1>
TWO = <CauseSize.TWO: 2>
class AsduAddressSize(enum.Enum):
13class AsduAddressSize(enum.Enum):
14    ONE = 1
15    TWO = 2

An enumeration.

ONE = <AsduAddressSize.ONE: 1>
TWO = <AsduAddressSize.TWO: 2>
class IoAddressSize(enum.Enum):
18class IoAddressSize(enum.Enum):
19    ONE = 1
20    TWO = 2
21    THREE = 3

An enumeration.

ONE = <IoAddressSize.ONE: 1>
TWO = <IoAddressSize.TWO: 2>
THREE = <IoAddressSize.THREE: 3>
class TimeSize(enum.Enum):
24class TimeSize(enum.Enum):
25    TWO = 2
26    THREE = 3
27    FOUR = 4
28    SEVEN = 7

An enumeration.

TWO = <TimeSize.TWO: 2>
THREE = <TimeSize.THREE: 3>
FOUR = <TimeSize.FOUR: 4>
SEVEN = <TimeSize.SEVEN: 7>
class Time(typing.NamedTuple):
31class Time(typing.NamedTuple):
32    size: TimeSize
33    milliseconds: int
34    """milliseconds in range [0, 59999]"""
35    invalid: bool | None
36    """available for size THREE, FOUR, SEVEN"""
37    minutes: int | None
38    """available for size THREE, FOUR, SEVEN (minutes in range [0, 59])"""
39    summer_time: bool | None
40    """available for size FOUR, SEVEN"""
41    hours: int | None
42    """available for size FOUR, SEVEN (hours in range [0, 23])"""
43    day_of_week: int | None
44    """available for size SEVEN (day_of_week in range [1, 7])"""
45    day_of_month: int | None
46    """available for size SEVEN (day_of_month in range [1, 31])"""
47    months: int | None
48    """available for size SEVEN (months in range [1, 12])"""
49    years: int | None
50    """available for size SEVEN (years in range [0, 99])"""

Time(size, milliseconds, invalid, minutes, summer_time, hours, day_of_week, day_of_month, months, years)

Time( size: TimeSize, milliseconds: int, invalid: bool | None, minutes: int | None, summer_time: bool | None, hours: int | None, day_of_week: int | None, day_of_month: int | None, months: int | None, years: int | None)

Create new instance of Time(size, milliseconds, invalid, minutes, summer_time, hours, day_of_week, day_of_month, months, years)

size: TimeSize

Alias for field number 0

milliseconds: int

milliseconds in range [0, 59999]

invalid: bool | None

available for size THREE, FOUR, SEVEN

minutes: int | None

available for size THREE, FOUR, SEVEN (minutes in range [0, 59])

summer_time: bool | None

available for size FOUR, SEVEN

hours: int | None

available for size FOUR, SEVEN (hours in range [0, 23])

day_of_week: int | None

available for size SEVEN (day_of_week in range [1, 7])

day_of_month: int | None

available for size SEVEN (day_of_month in range [1, 31])

months: int | None

available for size SEVEN (months in range [1, 12])

years: int | None

available for size SEVEN (years in range [0, 99])

def time_from_datetime( dt: datetime.datetime, invalid: bool = False) -> Time:
66def time_from_datetime(dt: datetime.datetime,
67                       invalid: bool = False
68                       ) -> Time:
69    """Create Time from datetime.datetime"""
70    # TODO document edge cases (local time, os implementation, ...)
71    #  rounding microseconds to the nearest millisecond
72    dt_rounded = (
73        dt.replace(microsecond=0) +
74        datetime.timedelta(milliseconds=round(dt.microsecond / 1000)))
75    local_time = time.localtime(dt_rounded.timestamp())
76
77    return Time(
78        size=TimeSize.SEVEN,
79        milliseconds=(local_time.tm_sec * 1000 +
80                      dt_rounded.microsecond // 1000),
81        invalid=invalid,
82        minutes=local_time.tm_min,
83        summer_time=bool(local_time.tm_isdst),
84        hours=local_time.tm_hour,
85        day_of_week=local_time.tm_wday + 1,
86        day_of_month=local_time.tm_mday,
87        months=local_time.tm_mon,
88        years=local_time.tm_year % 100)

Create Time from datetime.datetime

def time_to_datetime(t: Time) -> datetime.datetime:
 91def time_to_datetime(t: Time
 92                     ) -> datetime.datetime:
 93    """Convert Time to datetime.datetime"""
 94    # TODO document edge cases (local time, os implementation, ...)
 95    # TODO maybe allow diferent time size (use now for time)
 96    if t.size != TimeSize.SEVEN:
 97        raise ValueError('unsupported time size')
 98
 99    local_dt = datetime.datetime(
100        year=2000 + t.years if t.years < 70 else 1900 + t.years,
101        month=t.months,
102        day=t.day_of_month,
103        hour=t.hours,
104        minute=t.minutes,
105        second=int(t.milliseconds / 1000),
106        microsecond=(t.milliseconds % 1000) * 1000,
107        fold=not t.summer_time)
108
109    return local_dt.astimezone(tz=datetime.timezone.utc)

Convert Time to datetime.datetime

OriginatorAddress = <class 'int'>
AsduAddress = <class 'int'>
IoAddress = <class 'int'>
OtherCauseType = <class 'int'>
class BinaryCounterValue(typing.NamedTuple):
239class BinaryCounterValue(typing.NamedTuple):
240    value: int
241    """value in range [-2^31, 2^31-1]"""

BinaryCounterValue(value,)

BinaryCounterValue(value: int)

Create new instance of BinaryCounterValue(value,)

value: int

value in range [-2^31, 2^31-1]

AssociationId = <class 'int'>
SequenceNumber = <class 'int'>
UserNumber = <class 'int'>
class AsduType(enum.Enum):
37class AsduType(enum.Enum):
38    S_IT_TC = 41
39    S_CH_NA = 81
40    S_RP_NA = 82
41    S_AR_NA = 83
42    S_KR_NA = 84
43    S_KS_NA = 85
44    S_KC_NA = 86
45    S_ER_NA = 87
46    S_UC_NA_X = 88
47    S_US_NA = 90
48    S_UQ_NA = 91
49    S_UR_NA = 92
50    S_UK_NA = 93
51    S_UA_NA = 94
52    S_UC_NA = 95

An enumeration.

S_IT_TC = <AsduType.S_IT_TC: 41>
S_CH_NA = <AsduType.S_CH_NA: 81>
S_RP_NA = <AsduType.S_RP_NA: 82>
S_AR_NA = <AsduType.S_AR_NA: 83>
S_KR_NA = <AsduType.S_KR_NA: 84>
S_KS_NA = <AsduType.S_KS_NA: 85>
S_KC_NA = <AsduType.S_KC_NA: 86>
S_ER_NA = <AsduType.S_ER_NA: 87>
S_UC_NA_X = <AsduType.S_UC_NA_X: 88>
S_US_NA = <AsduType.S_US_NA: 90>
S_UQ_NA = <AsduType.S_UQ_NA: 91>
S_UR_NA = <AsduType.S_UR_NA: 92>
S_UK_NA = <AsduType.S_UK_NA: 93>
S_UA_NA = <AsduType.S_UA_NA: 94>
S_UC_NA = <AsduType.S_UC_NA: 95>
class MacAlgorithm(enum.Enum):
55class MacAlgorithm(enum.Enum):
56    NO_MAC = 0
57    HMAC_SHA_256_8 = 3
58    HMAC_SHA_256_16 = 4
59    AES_GMAC = 6

An enumeration.

NO_MAC = <MacAlgorithm.NO_MAC: 0>
HMAC_SHA_256_8 = <MacAlgorithm.HMAC_SHA_256_8: 3>
HMAC_SHA_256_16 = <MacAlgorithm.HMAC_SHA_256_16: 4>
AES_GMAC = <MacAlgorithm.AES_GMAC: 6>
class KeyWrapAlgorithm(enum.Enum):
62class KeyWrapAlgorithm(enum.Enum):
63    AES_128 = 1
64    AES_256 = 2

An enumeration.

AES_128 = <KeyWrapAlgorithm.AES_128: 1>
AES_256 = <KeyWrapAlgorithm.AES_256: 2>
class KeyStatus(enum.Enum):
67class KeyStatus(enum.Enum):
68    OK = 1
69    NOT_INIT = 2
70    COMM_FAIL = 3
71    AUTH_FAIL = 4

An enumeration.

OK = <KeyStatus.OK: 1>
NOT_INIT = <KeyStatus.NOT_INIT: 2>
COMM_FAIL = <KeyStatus.COMM_FAIL: 3>
AUTH_FAIL = <KeyStatus.AUTH_FAIL: 4>
class ErrorCode(enum.Enum):
74class ErrorCode(enum.Enum):
75    AUTHENTICATION_FAILED = 1
76    AGGRESSIVE_NOT_PERMITTED = 4
77    MAC_ALGORITHM_NOT_PERMITTED = 5
78    KEY_WRAP_ALGORITHM_NOT_PERMITTED = 6
79    AUTHORIZATION_FAILED = 7
80    UPDATE_KEY_NOT_PERMITTED = 8
81    INVALID_SIGNATURE = 9
82    INVALID_CERTIFICATION = 10
83    UNKNOWN_USER = 11

An enumeration.

AUTHENTICATION_FAILED = <ErrorCode.AUTHENTICATION_FAILED: 1>
AGGRESSIVE_NOT_PERMITTED = <ErrorCode.AGGRESSIVE_NOT_PERMITTED: 4>
MAC_ALGORITHM_NOT_PERMITTED = <ErrorCode.MAC_ALGORITHM_NOT_PERMITTED: 5>
KEY_WRAP_ALGORITHM_NOT_PERMITTED = <ErrorCode.KEY_WRAP_ALGORITHM_NOT_PERMITTED: 6>
AUTHORIZATION_FAILED = <ErrorCode.AUTHORIZATION_FAILED: 7>
UPDATE_KEY_NOT_PERMITTED = <ErrorCode.UPDATE_KEY_NOT_PERMITTED: 8>
INVALID_SIGNATURE = <ErrorCode.INVALID_SIGNATURE: 9>
INVALID_CERTIFICATION = <ErrorCode.INVALID_CERTIFICATION: 10>
UNKNOWN_USER = <ErrorCode.UNKNOWN_USER: 11>
class KeyChangeMethod(enum.Enum):
86class KeyChangeMethod(enum.Enum):
87    SYMMETRIC_AES_128_HMAC_SHA_1 = 3
88    SYMMETRIC_AES_256_HMAC_SHA_256 = 4
89    SYMMETRIC_AES_256_AES_GMAC = 5
90    ASYMMETRIC_RSE_2048_DSA_SHA_1_HMAC_SHA_1 = 67
91    ASYMMETRIC_RSE_2048_DSA_SHA_256_HMAC_SHA_256 = 68
92    ASYMMETRIC_RSE_3072_DSA_SHA_256_HMAC_SHA_256 = 69
93    ASYMMETRIC_RSE_2048_DSA_SHA_256_AES_GMAC = 70
94    ASYMMETRIC_RSE_3072_DSA_SHA_256_AES_GMAC = 71

An enumeration.

SYMMETRIC_AES_128_HMAC_SHA_1 = <KeyChangeMethod.SYMMETRIC_AES_128_HMAC_SHA_1: 3>
SYMMETRIC_AES_256_HMAC_SHA_256 = <KeyChangeMethod.SYMMETRIC_AES_256_HMAC_SHA_256: 4>
SYMMETRIC_AES_256_AES_GMAC = <KeyChangeMethod.SYMMETRIC_AES_256_AES_GMAC: 5>
ASYMMETRIC_RSE_2048_DSA_SHA_1_HMAC_SHA_1 = <KeyChangeMethod.ASYMMETRIC_RSE_2048_DSA_SHA_1_HMAC_SHA_1: 67>
ASYMMETRIC_RSE_2048_DSA_SHA_256_HMAC_SHA_256 = <KeyChangeMethod.ASYMMETRIC_RSE_2048_DSA_SHA_256_HMAC_SHA_256: 68>
ASYMMETRIC_RSE_3072_DSA_SHA_256_HMAC_SHA_256 = <KeyChangeMethod.ASYMMETRIC_RSE_3072_DSA_SHA_256_HMAC_SHA_256: 69>
ASYMMETRIC_RSE_2048_DSA_SHA_256_AES_GMAC = <KeyChangeMethod.ASYMMETRIC_RSE_2048_DSA_SHA_256_AES_GMAC: 70>
ASYMMETRIC_RSE_3072_DSA_SHA_256_AES_GMAC = <KeyChangeMethod.ASYMMETRIC_RSE_3072_DSA_SHA_256_AES_GMAC: 71>
class Operation(enum.Enum):
 97class Operation(enum.Enum):
 98    ADD = 1
 99    DELETE = 2
100    CHANGE = 3

An enumeration.

ADD = <Operation.ADD: 1>
DELETE = <Operation.DELETE: 2>
CHANGE = <Operation.CHANGE: 3>
class UserRole(enum.Enum):
103class UserRole(enum.Enum):
104    VIEWER = 0
105    OPERATOR = 1
106    ENGINEER = 2
107    INSTALLER = 3
108    SECADM = 4
109    SECAUD = 5
110    RBACMNT = 6

An enumeration.

VIEWER = <UserRole.VIEWER: 0>
OPERATOR = <UserRole.OPERATOR: 1>
ENGINEER = <UserRole.ENGINEER: 2>
INSTALLER = <UserRole.INSTALLER: 3>
SECADM = <UserRole.SECADM: 4>
SECAUD = <UserRole.SECAUD: 5>
RBACMNT = <UserRole.RBACMNT: 6>
class CauseType(enum.Enum):

An enumeration.

UNDEFINED = <CauseType.UNDEFINED: 0>
PERIODIC = <CauseType.PERIODIC: 1>
BACKGROUND_SCAN = <CauseType.BACKGROUND_SCAN: 2>
SPONTANEOUS = <CauseType.SPONTANEOUS: 3>
INITIALIZED = <CauseType.INITIALIZED: 4>
REQUEST = <CauseType.REQUEST: 5>
ACTIVATION = <CauseType.ACTIVATION: 6>
ACTIVATION_CONFIRMATION = <CauseType.ACTIVATION_CONFIRMATION: 7>
DEACTIVATION = <CauseType.DEACTIVATION: 8>
DEACTIVATION_CONFIRMATION = <CauseType.DEACTIVATION_CONFIRMATION: 9>
ACTIVATION_TERMINATION = <CauseType.ACTIVATION_TERMINATION: 10>
REMOTE_COMMAND = <CauseType.REMOTE_COMMAND: 11>
LOCAL_COMMAND = <CauseType.LOCAL_COMMAND: 12>
FILE_TRANSFER = <CauseType.FILE_TRANSFER: 13>
INTERROGATED_STATION = <CauseType.INTERROGATED_STATION: 20>
INTERROGATED_GROUP01 = <CauseType.INTERROGATED_GROUP01: 21>
INTERROGATED_GROUP02 = <CauseType.INTERROGATED_GROUP02: 22>
INTERROGATED_GROUP03 = <CauseType.INTERROGATED_GROUP03: 23>
INTERROGATED_GROUP04 = <CauseType.INTERROGATED_GROUP04: 24>
INTERROGATED_GROUP05 = <CauseType.INTERROGATED_GROUP05: 25>
INTERROGATED_GROUP06 = <CauseType.INTERROGATED_GROUP06: 26>
INTERROGATED_GROUP07 = <CauseType.INTERROGATED_GROUP07: 27>
INTERROGATED_GROUP08 = <CauseType.INTERROGATED_GROUP08: 28>
INTERROGATED_GROUP09 = <CauseType.INTERROGATED_GROUP09: 29>
INTERROGATED_GROUP10 = <CauseType.INTERROGATED_GROUP10: 30>
INTERROGATED_GROUP11 = <CauseType.INTERROGATED_GROUP11: 31>
INTERROGATED_GROUP12 = <CauseType.INTERROGATED_GROUP12: 32>
INTERROGATED_GROUP13 = <CauseType.INTERROGATED_GROUP13: 33>
INTERROGATED_GROUP14 = <CauseType.INTERROGATED_GROUP14: 34>
INTERROGATED_GROUP15 = <CauseType.INTERROGATED_GROUP15: 35>
INTERROGATED_GROUP16 = <CauseType.INTERROGATED_GROUP16: 36>
INTERROGATED_COUNTER = <CauseType.INTERROGATED_COUNTER: 37>
INTERROGATED_COUNTER01 = <CauseType.INTERROGATED_COUNTER01: 38>
INTERROGATED_COUNTER02 = <CauseType.INTERROGATED_COUNTER02: 39>
INTERROGATED_COUNTER03 = <CauseType.INTERROGATED_COUNTER03: 40>
INTERROGATED_COUNTER04 = <CauseType.INTERROGATED_COUNTER04: 41>
UNKNOWN_TYPE = <CauseType.UNKNOWN_TYPE: 44>
UNKNOWN_CAUSE = <CauseType.UNKNOWN_CAUSE: 45>
UNKNOWN_ASDU_ADDRESS = <CauseType.UNKNOWN_ASDU_ADDRESS: 46>
UNKNOWN_IO_ADDRESS = <CauseType.UNKNOWN_IO_ADDRESS: 47>
AUTHENTICATION = <CauseType.AUTHENTICATION: 14>
SESSION_KEY_MAINTENANCE = <CauseType.SESSION_KEY_MAINTENANCE: 15>
UPDATE_KEY_MAINTENANCE = <CauseType.UPDATE_KEY_MAINTENANCE: 16>
class Cause(typing.NamedTuple):
120class Cause(typing.NamedTuple):
121    type: CauseType | OtherCauseType
122    is_negative_confirm: bool
123    is_test: bool
124    originator_address: OriginatorAddress

Cause(type, is_negative_confirm, is_test, originator_address)

Cause( type: CauseType | int, is_negative_confirm: bool, is_test: bool, originator_address: int)

Create new instance of Cause(type, is_negative_confirm, is_test, originator_address)

type: CauseType | int

Alias for field number 0

is_negative_confirm: bool

Alias for field number 1

is_test: bool

Alias for field number 2

originator_address: int

Alias for field number 3

class IoElement_S_IT_TC(typing.NamedTuple):
127class IoElement_S_IT_TC(typing.NamedTuple):
128    association_id: AssociationId
129    value: BinaryCounterValue

IoElement_S_IT_TC(association_id, value)

IoElement_S_IT_TC( association_id: int, value: BinaryCounterValue)

Create new instance of IoElement_S_IT_TC(association_id, value)

association_id: int

Alias for field number 0

Alias for field number 1

class IoElement_S_CH_NA(typing.NamedTuple):
132class IoElement_S_CH_NA(typing.NamedTuple):
133    sequence: SequenceNumber
134    user: UserNumber
135    mac_algorithm: MacAlgorithm | int
136    """MAC algorithm can be value in range [0, 255]"""
137    reason: int
138    """reason in range [0, 255] - only valid value is 1"""
139    data: util.Bytes
140    """data length in range [4, 65535]"""

IoElement_S_CH_NA(sequence, user, mac_algorithm, reason, data)

IoElement_S_CH_NA( sequence: int, user: int, mac_algorithm: MacAlgorithm | int, reason: int, data: bytes | bytearray | memoryview)

Create new instance of IoElement_S_CH_NA(sequence, user, mac_algorithm, reason, data)

sequence: int

Alias for field number 0

user: int

Alias for field number 1

mac_algorithm: MacAlgorithm | int

MAC algorithm can be value in range [0, 255]

reason: int

reason in range [0, 255] - only valid value is 1

data: bytes | bytearray | memoryview

data length in range [4, 65535]

class IoElement_S_RP_NA(typing.NamedTuple):
143class IoElement_S_RP_NA(typing.NamedTuple):
144    sequence: SequenceNumber
145    user: UserNumber
146    mac: util.Bytes

IoElement_S_RP_NA(sequence, user, mac)

IoElement_S_RP_NA(sequence: int, user: int, mac: bytes | bytearray | memoryview)

Create new instance of IoElement_S_RP_NA(sequence, user, mac)

sequence: int

Alias for field number 0

user: int

Alias for field number 1

mac: bytes | bytearray | memoryview

Alias for field number 2

class IoElement_S_AR_NA(typing.NamedTuple):
149class IoElement_S_AR_NA(typing.NamedTuple):
150    asdu: util.Bytes
151    sequence: SequenceNumber
152    user: UserNumber
153    mac: util.Bytes

IoElement_S_AR_NA(asdu, sequence, user, mac)

IoElement_S_AR_NA( asdu: bytes | bytearray | memoryview, sequence: int, user: int, mac: bytes | bytearray | memoryview)

Create new instance of IoElement_S_AR_NA(asdu, sequence, user, mac)

asdu: bytes | bytearray | memoryview

Alias for field number 0

sequence: int

Alias for field number 1

user: int

Alias for field number 2

mac: bytes | bytearray | memoryview

Alias for field number 3

class IoElement_S_KR_NA(typing.NamedTuple):
156class IoElement_S_KR_NA(typing.NamedTuple):
157    user: UserNumber

IoElement_S_KR_NA(user,)

IoElement_S_KR_NA(user: int)

Create new instance of IoElement_S_KR_NA(user,)

user: int

Alias for field number 0

class IoElement_S_KS_NA(typing.NamedTuple):
160class IoElement_S_KS_NA(typing.NamedTuple):
161    sequence: SequenceNumber
162    user: UserNumber
163    key_wrap_algorithm: KeyWrapAlgorithm | int
164    """Key wrap algorithm can be value in range [0, 255]"""
165    key_status: KeyStatus | int
166    """Key status can be value in range [0, 255]"""
167    mac_algorithm: MacAlgorithm | int
168    """MAC algorithm can be value in range [0, 255]"""
169    data: util.Bytes
170    """data length in range [8, 65535]"""
171    mac: util.Bytes

IoElement_S_KS_NA(sequence, user, key_wrap_algorithm, key_status, mac_algorithm, data, mac)

IoElement_S_KS_NA( sequence: int, user: int, key_wrap_algorithm: KeyWrapAlgorithm | int, key_status: KeyStatus | int, mac_algorithm: MacAlgorithm | int, data: bytes | bytearray | memoryview, mac: bytes | bytearray | memoryview)

Create new instance of IoElement_S_KS_NA(sequence, user, key_wrap_algorithm, key_status, mac_algorithm, data, mac)

sequence: int

Alias for field number 0

user: int

Alias for field number 1

key_wrap_algorithm: KeyWrapAlgorithm | int

Key wrap algorithm can be value in range [0, 255]

key_status: KeyStatus | int

Key status can be value in range [0, 255]

mac_algorithm: MacAlgorithm | int

MAC algorithm can be value in range [0, 255]

data: bytes | bytearray | memoryview

data length in range [8, 65535]

mac: bytes | bytearray | memoryview

Alias for field number 6

class IoElement_S_KC_NA(typing.NamedTuple):
174class IoElement_S_KC_NA(typing.NamedTuple):
175    sequence: SequenceNumber
176    user: UserNumber
177    wrapped_key: util.Bytes
178    """wrapped key length in range [8, 65535]"""

IoElement_S_KC_NA(sequence, user, wrapped_key)

IoElement_S_KC_NA( sequence: int, user: int, wrapped_key: bytes | bytearray | memoryview)

Create new instance of IoElement_S_KC_NA(sequence, user, wrapped_key)

sequence: int

Alias for field number 0

user: int

Alias for field number 1

wrapped_key: bytes | bytearray | memoryview

wrapped key length in range [8, 65535]

class IoElement_S_ER_NA(typing.NamedTuple):
181class IoElement_S_ER_NA(typing.NamedTuple):
182    challenge_sequence: SequenceNumber
183    key_change_sequence: SequenceNumber
184    user: UserNumber
185    association_id: AssociationId
186    code: ErrorCode | int
187    """Code can be value in range [0, 255]"""
188    time: Time
189    """Time size SEVEN"""
190    text: util.Bytes
191    """Text length in range [0, 65535]"""

IoElement_S_ER_NA(challenge_sequence, key_change_sequence, user, association_id, code, time, text)

IoElement_S_ER_NA( challenge_sequence: int, key_change_sequence: int, user: int, association_id: int, code: ErrorCode | int, time: Time, text: bytes | bytearray | memoryview)

Create new instance of IoElement_S_ER_NA(challenge_sequence, key_change_sequence, user, association_id, code, time, text)

challenge_sequence: int

Alias for field number 0

key_change_sequence: int

Alias for field number 1

user: int

Alias for field number 2

association_id: int

Alias for field number 3

code: ErrorCode | int

Code can be value in range [0, 255]

time: Time

Time size SEVEN

text: bytes | bytearray | memoryview

Text length in range [0, 65535]

class IoElement_S_UC_NA_X(typing.NamedTuple):
194class IoElement_S_UC_NA_X(typing.NamedTuple):
195    key_change_method: KeyChangeMethod | int
196    """Key change method can be value in range [0, 255]"""
197    data: util.Bytes
198    """Data length in range [0, 65535]"""

IoElement_S_UC_NA_X(key_change_method, data)

IoElement_S_UC_NA_X( key_change_method: KeyChangeMethod | int, data: bytes | bytearray | memoryview)

Create new instance of IoElement_S_UC_NA_X(key_change_method, data)

key_change_method: KeyChangeMethod | int

Key change method can be value in range [0, 255]

data: bytes | bytearray | memoryview

Data length in range [0, 65535]

class IoElement_S_US_NA(typing.NamedTuple):
201class IoElement_S_US_NA(typing.NamedTuple):
202    key_change_method: KeyChangeMethod | int
203    """Key change method can be value in range [0, 255]"""
204    operation: Operation | int
205    """Operation can be value in range [0, 255]"""
206    sequence: SequenceNumber
207    role: UserRole | int
208    """Role can be value in range [0, 65535]"""
209    role_expiry: int
210    """Role expiry in range [0, 65535]"""
211    name: util.Bytes
212    """Name length in range [0, 65535]"""
213    public_key: util.Bytes
214    """Public key length in range [0, 65535]"""
215    certification: util.Bytes
216    """Certification length in range [0, 65535]"""

IoElement_S_US_NA(key_change_method, operation, sequence, role, role_expiry, name, public_key, certification)

IoElement_S_US_NA( key_change_method: KeyChangeMethod | int, operation: Operation | int, sequence: int, role: UserRole | int, role_expiry: int, name: bytes | bytearray | memoryview, public_key: bytes | bytearray | memoryview, certification: bytes | bytearray | memoryview)

Create new instance of IoElement_S_US_NA(key_change_method, operation, sequence, role, role_expiry, name, public_key, certification)

key_change_method: KeyChangeMethod | int

Key change method can be value in range [0, 255]

operation: Operation | int

Operation can be value in range [0, 255]

sequence: int

Alias for field number 2

role: UserRole | int

Role can be value in range [0, 65535]

role_expiry: int

Role expiry in range [0, 65535]

name: bytes | bytearray | memoryview

Name length in range [0, 65535]

public_key: bytes | bytearray | memoryview

Public key length in range [0, 65535]

certification: bytes | bytearray | memoryview

Certification length in range [0, 65535]

class IoElement_S_UQ_NA(typing.NamedTuple):
219class IoElement_S_UQ_NA(typing.NamedTuple):
220    key_change_method: KeyChangeMethod | int
221    """Key change method can be value in range [0, 255]"""
222    name: util.Bytes
223    """Name length in range [0, 65535]"""
224    data: util.Bytes
225    """Data length in range [4, 65535]"""

IoElement_S_UQ_NA(key_change_method, name, data)

IoElement_S_UQ_NA( key_change_method: KeyChangeMethod | int, name: bytes | bytearray | memoryview, data: bytes | bytearray | memoryview)

Create new instance of IoElement_S_UQ_NA(key_change_method, name, data)

key_change_method: KeyChangeMethod | int

Key change method can be value in range [0, 255]

name: bytes | bytearray | memoryview

Name length in range [0, 65535]

data: bytes | bytearray | memoryview

Data length in range [4, 65535]

class IoElement_S_UR_NA(typing.NamedTuple):
228class IoElement_S_UR_NA(typing.NamedTuple):
229    sequence: SequenceNumber
230    user: UserNumber
231    data: util.Bytes
232    """Data length in range [4, 65535]"""

IoElement_S_UR_NA(sequence, user, data)

IoElement_S_UR_NA(sequence: int, user: int, data: bytes | bytearray | memoryview)

Create new instance of IoElement_S_UR_NA(sequence, user, data)

sequence: int

Alias for field number 0

user: int

Alias for field number 1

data: bytes | bytearray | memoryview

Data length in range [4, 65535]

class IoElement_S_UK_NA(typing.NamedTuple):
235class IoElement_S_UK_NA(typing.NamedTuple):
236    sequence: SequenceNumber
237    user: UserNumber
238    encrypted_update_key: util.Bytes
239    """Encrypted update key length in range [16, 65535]"""
240    mac: util.Bytes

IoElement_S_UK_NA(sequence, user, encrypted_update_key, mac)

IoElement_S_UK_NA( sequence: int, user: int, encrypted_update_key: bytes | bytearray | memoryview, mac: bytes | bytearray | memoryview)

Create new instance of IoElement_S_UK_NA(sequence, user, encrypted_update_key, mac)

sequence: int

Alias for field number 0

user: int

Alias for field number 1

encrypted_update_key: bytes | bytearray | memoryview

Encrypted update key length in range [16, 65535]

mac: bytes | bytearray | memoryview

Alias for field number 3

class IoElement_S_UA_NA(typing.NamedTuple):
243class IoElement_S_UA_NA(typing.NamedTuple):
244    sequence: SequenceNumber
245    user: UserNumber
246    encrypted_update_key: util.Bytes
247    """Encrypted update key length in range [16, 65535]"""
248    signature: util.Bytes

IoElement_S_UA_NA(sequence, user, encrypted_update_key, signature)

IoElement_S_UA_NA( sequence: int, user: int, encrypted_update_key: bytes | bytearray | memoryview, signature: bytes | bytearray | memoryview)

Create new instance of IoElement_S_UA_NA(sequence, user, encrypted_update_key, signature)

sequence: int

Alias for field number 0

user: int

Alias for field number 1

encrypted_update_key: bytes | bytearray | memoryview

Encrypted update key length in range [16, 65535]

signature: bytes | bytearray | memoryview

Alias for field number 3

class IoElement_S_UC_NA(typing.NamedTuple):
251class IoElement_S_UC_NA(typing.NamedTuple):
252    mac: util.Bytes

IoElement_S_UC_NA(mac,)

IoElement_S_UC_NA(mac: bytes | bytearray | memoryview)

Create new instance of IoElement_S_UC_NA(mac,)

mac: bytes | bytearray | memoryview

Alias for field number 0

class IO(typing.NamedTuple):
272class IO(typing.NamedTuple):
273    address: IoAddress | None
274    element: IoElement
275    time: Time | None
276    """Time size SEVEN"""

IO(address, element, time)

address: int | None

Alias for field number 0

time: Time | None

Time size SEVEN

class ASDU(typing.NamedTuple):
279class ASDU(typing.NamedTuple):
280    type: AsduType
281    cause: Cause
282    address: AsduAddress
283    ios: list[IO]

ASDU(type, cause, address, ios)

ASDU( type: AsduType, cause: Cause, address: int, ios: list[IO])

Create new instance of ASDU(type, cause, address, ios)

type: AsduType

Alias for field number 0

cause: Cause

Alias for field number 1

address: int

Alias for field number 2

ios: list[IO]

Alias for field number 3

class Encoder:
 23class Encoder:
 24
 25    def __init__(self, encoder: iec101.Encoder | iec104.Encoder):
 26        self._encoder = encoder
 27        self._buffer = None
 28
 29    def decode_asdu(self,
 30                    asdu_bytes: util.Bytes
 31                    ) -> tuple[ASDU | None, util.Bytes]:
 32        asdu_type = asdu_bytes[0]
 33        with contextlib.suppress(ValueError):
 34            asdu_type = common.AsduType(asdu_type)
 35
 36        if isinstance(asdu_type, common.AsduType):
 37            return self._decode_asdu(asdu_bytes, asdu_type)
 38
 39        if self._buffer:
 40            mlog.warning('unsegmented asdu - discarding buffer')
 41            self._buffer = None
 42
 43        return self._encoder.decode_asdu(asdu_bytes)
 44
 45    def encode_asdu(self, asdu: ASDU) -> list[util.Bytes]:
 46        if isinstance(asdu, common.ASDU):
 47            return list(self._encode_asdu(asdu))
 48
 49        return [self._encoder.encode_asdu(asdu)]
 50
 51    def _decode_asdu(self, asdu_bytes, asdu_type):
 52        io_count, rest = _decode_io_count(asdu_bytes[1:], asdu_type)
 53        cause, rest = _decode_cause(rest, self._encoder.cause_size)
 54        asdu_address, rest = _decode_int(rest,
 55                                         self._encoder.asdu_address_size.value)
 56
 57        ios = collections.deque()
 58        for _ in range(io_count):
 59            if asdu_type in _unsegmented_asdu_types:
 60                io, rest = self._decode_unsegmented_io(rest, asdu_type)
 61
 62            else:
 63                io, rest = self._decode_segmented_io(rest, asdu_type, cause,
 64                                                     asdu_address)
 65
 66            if io:
 67                ios.append(io)
 68
 69        if asdu_type != common.AsduType.S_IT_TC and not ios:
 70            return None, rest
 71
 72        asdu = common.ASDU(type=asdu_type,
 73                           cause=cause,
 74                           address=asdu_address,
 75                           ios=list(ios))
 76        return asdu, rest
 77
 78    def _encode_asdu(self, asdu):
 79        identifier = collections.deque()
 80        identifier.append(asdu.type.value)
 81        identifier.append(len(asdu.ios))
 82        identifier.extend(_encode_cause(asdu.cause, self._encoder.cause_size))
 83        identifier.extend(_encode_int(asdu.address,
 84                                      self._encoder.asdu_address_size.value))
 85
 86        parts = collections.deque()
 87        for io in asdu.ios:
 88            if io.address is not None:
 89                parts.append(
 90                    _encode_int(io.address,
 91                                self._encoder.io_address_size.value))
 92
 93            parts.append(_encode_io_element(io.element, self._encoder))
 94
 95            if io.time is not None:
 96                parts.append(encode_time(io.time, common.TimeSize.SEVEN))
 97
 98        rest = itertools.chain.from_iterable(parts)
 99
100        if asdu.type in _unsegmented_asdu_types:
101            yield bytes(itertools.chain(identifier, rest))
102
103        else:
104            first = True
105            segment = 0
106            rest = memoryview(bytes(rest))
107            max_size = self._encoder.max_asdu_size - len(identifier) - 1
108
109            while rest:
110                data, rest = rest[:max_size], rest[max_size:]
111                last = not rest
112
113                yield bytes(itertools.chain(identifier,
114                                            [(0x40 if first else 0) |
115                                             (0x80 if last else 0) |
116                                             segment],
117                                            data))
118
119                first = False
120                segment = (segment + 1) % 64
121
122    def _decode_unsegmented_io(self, io_bytes, asdu_type):
123        if self._buffer:
124            mlog.warning('unsegmented asdu - discarding buffer')
125            self._buffer = None
126
127        if asdu_type == common.AsduType.S_IT_TC:
128            io_address, rest = _decode_int(
129                io_bytes, self._encoder.io_address_size.value)
130            element, rest = _decode_io_element(rest, self._encoder,
131                                               asdu_type)
132            time, rest = (decode_time(rest, common.TimeSize.SEVEN),
133                          rest[7:])
134
135        else:
136            io_address = None
137            element, rest = _decode_io_element(io_bytes, self._encoder,
138                                               asdu_type)
139            time = None
140
141        io = common.IO(address=io_address,
142                       element=element,
143                       time=time)
144        return io, rest
145
146    def _decode_segmented_io(self, io_bytes, asdu_type, cause, asdu_address):
147        first = bool(io_bytes[0] & 0x40)
148        last = bool(io_bytes[0] & 0x80)
149        segment = io_bytes[0] & 0x3f
150        rest = io_bytes[1:]
151
152        if first:
153            if self._buffer:
154                mlog.warning('new first segment - discarding buffer')
155
156            self._buffer = _Buffer(asdu_type=asdu_type,
157                                   cause=cause,
158                                   asdu_address=asdu_address,
159                                   prev_first=first,
160                                   prev_segment=segment,
161                                   prev_io_bytes=rest,
162                                   all_io_bytes=rest)
163
164        else:
165            if not self._buffer:
166                mlog.warning('empty buffer - discarding segment')
167                return None, b''
168
169            elif self._buffer.asdu_type != asdu_type:
170                mlog.warning('asdu type not matching - '
171                             'discarding segment and buffer')
172                self._buffer = None
173                return None, b''
174
175            elif self._buffer.cause != cause:
176                mlog.warning('cause not matching - '
177                             'discarding segment and buffer')
178                self._buffer = None
179                return None, b''
180
181            elif self._buffer.asdu_address != asdu_address:
182                mlog.warning('asdu address not matching - '
183                             'discarding segment and buffer')
184                self._buffer = None
185                return None, b''
186
187            elif ((self._buffer.prev_segment + 1) % 64) != segment:
188                if (self._buffer.prev_first == first and
189                        self._buffer.prev_segment == segment and
190                        self._buffer.prev_io_bytes == rest):
191                    mlog.warning('duplicated segment - discarding segment')
192
193                else:
194                    mlog.warning('segment number not matching - '
195                                 'discarding segment and buffer')
196                    self._buffer = None
197
198                return None, b''
199
200            self._buffer = self._buffer._replace(
201                prev_first=first,
202                prev_segment=segment,
203                prev_io_bytes=rest,
204                all_io_bytes=itertools.chain(self._buffer.all_io_bytes, rest))
205
206        if not last:
207            return None, b''
208
209        all_io_bytes = (self._buffer.all_io_bytes
210                        if isinstance(self._buffer.all_io_bytes, memoryview)
211                        else memoryview(bytes(self._buffer.all_io_bytes)))
212        self._buffer = None
213
214        element, rest = _decode_io_element(all_io_bytes, self._encoder,
215                                           asdu_type)
216        io = common.IO(address=None,
217                       element=element,
218                       time=None)
219        return io, rest
Encoder( encoder: Encoder | Encoder)
25    def __init__(self, encoder: iec101.Encoder | iec104.Encoder):
26        self._encoder = encoder
27        self._buffer = None
def decode_asdu( self, asdu_bytes: bytes | bytearray | memoryview) -> tuple[ASDU | ASDU | ASDU | None, bytes | bytearray | memoryview]:
29    def decode_asdu(self,
30                    asdu_bytes: util.Bytes
31                    ) -> tuple[ASDU | None, util.Bytes]:
32        asdu_type = asdu_bytes[0]
33        with contextlib.suppress(ValueError):
34            asdu_type = common.AsduType(asdu_type)
35
36        if isinstance(asdu_type, common.AsduType):
37            return self._decode_asdu(asdu_bytes, asdu_type)
38
39        if self._buffer:
40            mlog.warning('unsegmented asdu - discarding buffer')
41            self._buffer = None
42
43        return self._encoder.decode_asdu(asdu_bytes)
def encode_asdu( self, asdu: ASDU | ASDU | ASDU) -> list[bytes | bytearray | memoryview]:
45    def encode_asdu(self, asdu: ASDU) -> list[util.Bytes]:
46        if isinstance(asdu, common.ASDU):
47            return list(self._encode_asdu(asdu))
48
49        return [self._encoder.encode_asdu(asdu)]