hat.drivers.iec60870.encodings.common
1import datetime 2import enum 3import time 4import typing 5 6 7class AsduTypeError(Exception): 8 pass 9 10 11class CauseSize(enum.Enum): 12 ONE = 1 13 TWO = 2 14 15 16class AsduAddressSize(enum.Enum): 17 ONE = 1 18 TWO = 2 19 20 21class IoAddressSize(enum.Enum): 22 ONE = 1 23 TWO = 2 24 THREE = 3 25 26 27class TimeSize(enum.Enum): 28 TWO = 2 29 THREE = 3 30 FOUR = 4 31 SEVEN = 7 32 33 34class Time(typing.NamedTuple): 35 size: TimeSize 36 milliseconds: int 37 """milliseconds in range [0, 59999]""" 38 invalid: bool | None 39 """available for size THREE, FOUR, SEVEN""" 40 minutes: int | None 41 """available for size THREE, FOUR, SEVEN (minutes in range [0, 59])""" 42 summer_time: bool | None 43 """available for size FOUR, SEVEN""" 44 hours: int | None 45 """available for size FOUR, SEVEN (hours in range [0, 23])""" 46 day_of_week: int | None 47 """available for size SEVEN (day_of_week in range [1, 7])""" 48 day_of_month: int | None 49 """available for size SEVEN (day_of_month in range [1, 31])""" 50 months: int | None 51 """available for size SEVEN (months in range [1, 12])""" 52 years: int | None 53 """available for size SEVEN (years in range [0, 99])""" 54 55 56class IO(typing.NamedTuple): 57 address: int 58 elements: list 59 time: Time | None 60 61 62class ASDU(typing.NamedTuple): 63 type: int 64 cause: int 65 address: int 66 ios: list[IO] 67 68 69def time_from_datetime(dt: datetime.datetime, 70 invalid: bool = False 71 ) -> Time: 72 """Create Time from datetime.datetime""" 73 # TODO document edge cases (local time, os implementation, ...) 74 # rounding microseconds to the nearest millisecond 75 dt_rounded = ( 76 dt.replace(microsecond=0) + 77 datetime.timedelta(milliseconds=round(dt.microsecond / 1000))) 78 local_time = time.localtime(dt_rounded.timestamp()) 79 80 return Time( 81 size=TimeSize.SEVEN, 82 milliseconds=(local_time.tm_sec * 1000 + 83 dt_rounded.microsecond // 1000), 84 invalid=invalid, 85 minutes=local_time.tm_min, 86 summer_time=bool(local_time.tm_isdst), 87 hours=local_time.tm_hour, 88 day_of_week=local_time.tm_wday + 1, 89 day_of_month=local_time.tm_mday, 90 months=local_time.tm_mon, 91 years=local_time.tm_year % 100) 92 93 94def time_to_datetime(t: Time 95 ) -> datetime.datetime: 96 """Convert Time to datetime.datetime""" 97 # TODO document edge cases (local time, os implementation, ...) 98 # TODO support TimeSize.FOUR 99 if t.size == TimeSize.TWO: 100 local_now = datetime.datetime.now() 101 local_dt = local_now.replace( 102 second=int(t.milliseconds / 1000), 103 microsecond=(t.milliseconds % 1000) * 1000) 104 105 local_seconds = local_now.second + local_now.microsecond / 1_000_000 106 t_seconds = t.milliseconds / 1_000 107 108 if abs(local_seconds - t_seconds) > 30: 109 if local_seconds < t_seconds: 110 local_dt = local_dt - datetime.timedelta(minutes=1) 111 112 else: 113 local_dt = local_dt + datetime.timedelta(minutes=1) 114 115 elif t.size == TimeSize.THREE: 116 local_now = datetime.datetime.now() 117 local_dt = local_now.replace( 118 minute=t.minutes, 119 second=int(t.milliseconds / 1000), 120 microsecond=(t.milliseconds % 1000) * 1000) 121 122 local_minutes = (local_now.minute + 123 local_now.second / 60 + 124 local_now.microsecond / 60_000_000) 125 t_minutes = t.minutes + t.milliseconds / 60_000 126 127 if abs(local_minutes - t_minutes) > 30: 128 if local_minutes < t_minutes: 129 local_dt = local_dt - datetime.timedelta(hours=1) 130 131 else: 132 local_dt = local_dt + datetime.timedelta(hours=1) 133 134 elif t.size == TimeSize.SEVEN: 135 local_dt = datetime.datetime( 136 year=2000 + t.years if t.years < 70 else 1900 + t.years, 137 month=t.months, 138 day=t.day_of_month, 139 hour=t.hours, 140 minute=t.minutes, 141 second=int(t.milliseconds / 1000), 142 microsecond=(t.milliseconds % 1000) * 1000, 143 fold=not t.summer_time) 144 145 else: 146 raise ValueError('unsupported time size') 147 148 return local_dt.astimezone(tz=datetime.timezone.utc)
class
AsduTypeError(builtins.Exception):
Common base class for all non-exit exceptions.
class
CauseSize(enum.Enum):
ONE =
<CauseSize.ONE: 1>
TWO =
<CauseSize.TWO: 2>
class
AsduAddressSize(enum.Enum):
ONE =
<AsduAddressSize.ONE: 1>
TWO =
<AsduAddressSize.TWO: 2>
class
IoAddressSize(enum.Enum):
ONE =
<IoAddressSize.ONE: 1>
TWO =
<IoAddressSize.TWO: 2>
THREE =
<IoAddressSize.THREE: 3>
class
TimeSize(enum.Enum):
TWO =
<TimeSize.TWO: 2>
THREE =
<TimeSize.THREE: 3>
FOUR =
<TimeSize.FOUR: 4>
SEVEN =
<TimeSize.SEVEN: 7>
class
Time(typing.NamedTuple):
35class Time(typing.NamedTuple): 36 size: TimeSize 37 milliseconds: int 38 """milliseconds in range [0, 59999]""" 39 invalid: bool | None 40 """available for size THREE, FOUR, SEVEN""" 41 minutes: int | None 42 """available for size THREE, FOUR, SEVEN (minutes in range [0, 59])""" 43 summer_time: bool | None 44 """available for size FOUR, SEVEN""" 45 hours: int | None 46 """available for size FOUR, SEVEN (hours in range [0, 23])""" 47 day_of_week: int | None 48 """available for size SEVEN (day_of_week in range [1, 7])""" 49 day_of_month: int | None 50 """available for size SEVEN (day_of_month in range [1, 31])""" 51 months: int | None 52 """available for size SEVEN (months in range [1, 12])""" 53 years: int | None 54 """available for size SEVEN (years in range [0, 99])"""
Time(size, milliseconds, invalid, minutes, summer_time, hours, day_of_week, day_of_month, months, years)
Time( size: TimeSize, milliseconds: int, invalid: bool | None, minutes: int | None, summer_time: bool | None, hours: int | None, day_of_week: int | None, day_of_month: int | None, months: int | None, years: int | None)
Create new instance of Time(size, milliseconds, invalid, minutes, summer_time, hours, day_of_week, day_of_month, months, years)
class
IO(typing.NamedTuple):
IO(address, elements, time)
IO( address: int, elements: list, time: Time | None)
Create new instance of IO(address, elements, time)
class
ASDU(typing.NamedTuple):
ASDU(type, cause, address, ios)
ASDU( type: int, cause: int, address: int, ios: list[IO])
Create new instance of ASDU(type, cause, address, ios)
70def time_from_datetime(dt: datetime.datetime, 71 invalid: bool = False 72 ) -> Time: 73 """Create Time from datetime.datetime""" 74 # TODO document edge cases (local time, os implementation, ...) 75 # rounding microseconds to the nearest millisecond 76 dt_rounded = ( 77 dt.replace(microsecond=0) + 78 datetime.timedelta(milliseconds=round(dt.microsecond / 1000))) 79 local_time = time.localtime(dt_rounded.timestamp()) 80 81 return Time( 82 size=TimeSize.SEVEN, 83 milliseconds=(local_time.tm_sec * 1000 + 84 dt_rounded.microsecond // 1000), 85 invalid=invalid, 86 minutes=local_time.tm_min, 87 summer_time=bool(local_time.tm_isdst), 88 hours=local_time.tm_hour, 89 day_of_week=local_time.tm_wday + 1, 90 day_of_month=local_time.tm_mday, 91 months=local_time.tm_mon, 92 years=local_time.tm_year % 100)
Create Time from datetime.datetime
95def time_to_datetime(t: Time 96 ) -> datetime.datetime: 97 """Convert Time to datetime.datetime""" 98 # TODO document edge cases (local time, os implementation, ...) 99 # TODO support TimeSize.FOUR 100 if t.size == TimeSize.TWO: 101 local_now = datetime.datetime.now() 102 local_dt = local_now.replace( 103 second=int(t.milliseconds / 1000), 104 microsecond=(t.milliseconds % 1000) * 1000) 105 106 local_seconds = local_now.second + local_now.microsecond / 1_000_000 107 t_seconds = t.milliseconds / 1_000 108 109 if abs(local_seconds - t_seconds) > 30: 110 if local_seconds < t_seconds: 111 local_dt = local_dt - datetime.timedelta(minutes=1) 112 113 else: 114 local_dt = local_dt + datetime.timedelta(minutes=1) 115 116 elif t.size == TimeSize.THREE: 117 local_now = datetime.datetime.now() 118 local_dt = local_now.replace( 119 minute=t.minutes, 120 second=int(t.milliseconds / 1000), 121 microsecond=(t.milliseconds % 1000) * 1000) 122 123 local_minutes = (local_now.minute + 124 local_now.second / 60 + 125 local_now.microsecond / 60_000_000) 126 t_minutes = t.minutes + t.milliseconds / 60_000 127 128 if abs(local_minutes - t_minutes) > 30: 129 if local_minutes < t_minutes: 130 local_dt = local_dt - datetime.timedelta(hours=1) 131 132 else: 133 local_dt = local_dt + datetime.timedelta(hours=1) 134 135 elif t.size == TimeSize.SEVEN: 136 local_dt = datetime.datetime( 137 year=2000 + t.years if t.years < 70 else 1900 + t.years, 138 month=t.months, 139 day=t.day_of_month, 140 hour=t.hours, 141 minute=t.minutes, 142 second=int(t.milliseconds / 1000), 143 microsecond=(t.milliseconds % 1000) * 1000, 144 fold=not t.summer_time) 145 146 else: 147 raise ValueError('unsupported time size') 148 149 return local_dt.astimezone(tz=datetime.timezone.utc)
Convert Time to datetime.datetime