hat.drivers.iec60870.encodings.common
1import datetime 2import enum 3import time 4import typing 5 6 7class AsduTypeError(Exception): 8 pass 9 10 11class CauseSize(enum.Enum): 12 ONE = 1 13 TWO = 2 14 15 16class AsduAddressSize(enum.Enum): 17 ONE = 1 18 TWO = 2 19 20 21class IoAddressSize(enum.Enum): 22 ONE = 1 23 TWO = 2 24 THREE = 3 25 26 27class TimeSize(enum.Enum): 28 TWO = 2 29 THREE = 3 30 FOUR = 4 31 SEVEN = 7 32 33 34class Time(typing.NamedTuple): 35 size: TimeSize 36 milliseconds: int 37 """milliseconds in range [0, 59999]""" 38 invalid: bool | None 39 """available for size THREE, FOUR, SEVEN""" 40 substituted: bool | None 41 """available for size THREE, FOUR, SEVEN""" 42 minutes: int | None 43 """available for size THREE, FOUR, SEVEN (minutes in range [0, 59])""" 44 summer_time: bool | None 45 """available for size FOUR, SEVEN""" 46 hours: int | None 47 """available for size FOUR, SEVEN (hours in range [0, 23])""" 48 day_of_week: int | None 49 """available for size SEVEN (day_of_week in range [1, 7])""" 50 day_of_month: int | None 51 """available for size SEVEN (day_of_month in range [1, 31])""" 52 months: int | None 53 """available for size SEVEN (months in range [1, 12])""" 54 years: int | None 55 """available for size SEVEN (years in range [0, 99])""" 56 57 58class IO(typing.NamedTuple): 59 address: int 60 elements: list 61 time: Time | None 62 63 64class ASDU(typing.NamedTuple): 65 type: int 66 cause: int 67 address: int 68 ios: list[IO] 69 70 71def time_from_datetime(dt: datetime.datetime, 72 invalid: bool = False, 73 substituted: bool = False 74 ) -> Time: 75 """Create Time from datetime.datetime""" 76 # TODO document edge cases (local time, os implementation, ...) 77 # rounding microseconds to the nearest millisecond 78 dt_rounded = ( 79 dt.replace(microsecond=0) + 80 datetime.timedelta(milliseconds=round(dt.microsecond / 1000))) 81 local_time = time.localtime(dt_rounded.timestamp()) 82 83 return Time( 84 size=TimeSize.SEVEN, 85 milliseconds=(local_time.tm_sec * 1000 + 86 dt_rounded.microsecond // 1000), 87 invalid=invalid, 88 substituted=substituted, 89 minutes=local_time.tm_min, 90 summer_time=bool(local_time.tm_isdst), 91 hours=local_time.tm_hour, 92 day_of_week=local_time.tm_wday + 1, 93 day_of_month=local_time.tm_mday, 94 months=local_time.tm_mon, 95 years=local_time.tm_year % 100) 96 97 98def time_to_datetime(t: Time 99 ) -> datetime.datetime: 100 """Convert Time to datetime.datetime""" 101 # TODO document edge cases (local time, os implementation, ...) 102 # TODO support TimeSize.FOUR 103 if t.size == TimeSize.TWO: 104 local_now = datetime.datetime.now() 105 local_dt = local_now.replace( 106 second=int(t.milliseconds / 1000), 107 microsecond=(t.milliseconds % 1000) * 1000) 108 109 local_seconds = local_now.second + local_now.microsecond / 1_000_000 110 t_seconds = t.milliseconds / 1_000 111 112 if abs(local_seconds - t_seconds) > 30: 113 if local_seconds < t_seconds: 114 local_dt = local_dt - datetime.timedelta(minutes=1) 115 116 else: 117 local_dt = local_dt + datetime.timedelta(minutes=1) 118 119 elif t.size == TimeSize.THREE: 120 local_now = datetime.datetime.now() 121 local_dt = local_now.replace( 122 minute=t.minutes, 123 second=int(t.milliseconds / 1000), 124 microsecond=(t.milliseconds % 1000) * 1000) 125 126 local_minutes = (local_now.minute + 127 local_now.second / 60 + 128 local_now.microsecond / 60_000_000) 129 t_minutes = t.minutes + t.milliseconds / 60_000 130 131 if abs(local_minutes - t_minutes) > 30: 132 if local_minutes < t_minutes: 133 local_dt = local_dt - datetime.timedelta(hours=1) 134 135 else: 136 local_dt = local_dt + datetime.timedelta(hours=1) 137 138 elif t.size == TimeSize.SEVEN: 139 local_dt = datetime.datetime( 140 year=2000 + t.years if t.years < 70 else 1900 + t.years, 141 month=t.months, 142 day=t.day_of_month, 143 hour=t.hours, 144 minute=t.minutes, 145 second=int(t.milliseconds / 1000), 146 microsecond=(t.milliseconds % 1000) * 1000, 147 fold=not t.summer_time) 148 149 else: 150 raise ValueError('unsupported time size') 151 152 return local_dt.astimezone(tz=datetime.timezone.utc)
class
AsduTypeError(builtins.Exception):
Common base class for all non-exit exceptions.
class
CauseSize(enum.Enum):
ONE =
<CauseSize.ONE: 1>
TWO =
<CauseSize.TWO: 2>
class
AsduAddressSize(enum.Enum):
ONE =
<AsduAddressSize.ONE: 1>
TWO =
<AsduAddressSize.TWO: 2>
class
IoAddressSize(enum.Enum):
ONE =
<IoAddressSize.ONE: 1>
TWO =
<IoAddressSize.TWO: 2>
THREE =
<IoAddressSize.THREE: 3>
class
TimeSize(enum.Enum):
TWO =
<TimeSize.TWO: 2>
THREE =
<TimeSize.THREE: 3>
FOUR =
<TimeSize.FOUR: 4>
SEVEN =
<TimeSize.SEVEN: 7>
class
Time(typing.NamedTuple):
35class Time(typing.NamedTuple): 36 size: TimeSize 37 milliseconds: int 38 """milliseconds in range [0, 59999]""" 39 invalid: bool | None 40 """available for size THREE, FOUR, SEVEN""" 41 substituted: bool | None 42 """available for size THREE, FOUR, SEVEN""" 43 minutes: int | None 44 """available for size THREE, FOUR, SEVEN (minutes in range [0, 59])""" 45 summer_time: bool | None 46 """available for size FOUR, SEVEN""" 47 hours: int | None 48 """available for size FOUR, SEVEN (hours in range [0, 23])""" 49 day_of_week: int | None 50 """available for size SEVEN (day_of_week in range [1, 7])""" 51 day_of_month: int | None 52 """available for size SEVEN (day_of_month in range [1, 31])""" 53 months: int | None 54 """available for size SEVEN (months in range [1, 12])""" 55 years: int | None 56 """available for size SEVEN (years in range [0, 99])"""
Time(size, milliseconds, invalid, substituted, minutes, summer_time, hours, day_of_week, day_of_month, months, years)
Time( size: TimeSize, milliseconds: int, invalid: bool | None, substituted: bool | None, minutes: int | None, summer_time: bool | None, hours: int | None, day_of_week: int | None, day_of_month: int | None, months: int | None, years: int | None)
Create new instance of Time(size, milliseconds, invalid, substituted, minutes, summer_time, hours, day_of_week, day_of_month, months, years)
class
IO(typing.NamedTuple):
IO(address, elements, time)
IO( address: int, elements: list, time: Time | None)
Create new instance of IO(address, elements, time)
class
ASDU(typing.NamedTuple):
ASDU(type, cause, address, ios)
ASDU( type: int, cause: int, address: int, ios: list[IO])
Create new instance of ASDU(type, cause, address, ios)
def
time_from_datetime( dt: datetime.datetime, invalid: bool = False, substituted: bool = False) -> Time:
72def time_from_datetime(dt: datetime.datetime, 73 invalid: bool = False, 74 substituted: bool = False 75 ) -> Time: 76 """Create Time from datetime.datetime""" 77 # TODO document edge cases (local time, os implementation, ...) 78 # rounding microseconds to the nearest millisecond 79 dt_rounded = ( 80 dt.replace(microsecond=0) + 81 datetime.timedelta(milliseconds=round(dt.microsecond / 1000))) 82 local_time = time.localtime(dt_rounded.timestamp()) 83 84 return Time( 85 size=TimeSize.SEVEN, 86 milliseconds=(local_time.tm_sec * 1000 + 87 dt_rounded.microsecond // 1000), 88 invalid=invalid, 89 substituted=substituted, 90 minutes=local_time.tm_min, 91 summer_time=bool(local_time.tm_isdst), 92 hours=local_time.tm_hour, 93 day_of_week=local_time.tm_wday + 1, 94 day_of_month=local_time.tm_mday, 95 months=local_time.tm_mon, 96 years=local_time.tm_year % 100)
Create Time from datetime.datetime
99def time_to_datetime(t: Time 100 ) -> datetime.datetime: 101 """Convert Time to datetime.datetime""" 102 # TODO document edge cases (local time, os implementation, ...) 103 # TODO support TimeSize.FOUR 104 if t.size == TimeSize.TWO: 105 local_now = datetime.datetime.now() 106 local_dt = local_now.replace( 107 second=int(t.milliseconds / 1000), 108 microsecond=(t.milliseconds % 1000) * 1000) 109 110 local_seconds = local_now.second + local_now.microsecond / 1_000_000 111 t_seconds = t.milliseconds / 1_000 112 113 if abs(local_seconds - t_seconds) > 30: 114 if local_seconds < t_seconds: 115 local_dt = local_dt - datetime.timedelta(minutes=1) 116 117 else: 118 local_dt = local_dt + datetime.timedelta(minutes=1) 119 120 elif t.size == TimeSize.THREE: 121 local_now = datetime.datetime.now() 122 local_dt = local_now.replace( 123 minute=t.minutes, 124 second=int(t.milliseconds / 1000), 125 microsecond=(t.milliseconds % 1000) * 1000) 126 127 local_minutes = (local_now.minute + 128 local_now.second / 60 + 129 local_now.microsecond / 60_000_000) 130 t_minutes = t.minutes + t.milliseconds / 60_000 131 132 if abs(local_minutes - t_minutes) > 30: 133 if local_minutes < t_minutes: 134 local_dt = local_dt - datetime.timedelta(hours=1) 135 136 else: 137 local_dt = local_dt + datetime.timedelta(hours=1) 138 139 elif t.size == TimeSize.SEVEN: 140 local_dt = datetime.datetime( 141 year=2000 + t.years if t.years < 70 else 1900 + t.years, 142 month=t.months, 143 day=t.day_of_month, 144 hour=t.hours, 145 minute=t.minutes, 146 second=int(t.milliseconds / 1000), 147 microsecond=(t.milliseconds % 1000) * 1000, 148 fold=not t.summer_time) 149 150 else: 151 raise ValueError('unsupported time size') 152 153 return local_dt.astimezone(tz=datetime.timezone.utc)
Convert Time to datetime.datetime