hat.drivers.iec60870.encodings.common
1import datetime 2import enum 3import time 4import typing 5 6 7class CauseSize(enum.Enum): 8 ONE = 1 9 TWO = 2 10 11 12class AsduAddressSize(enum.Enum): 13 ONE = 1 14 TWO = 2 15 16 17class IoAddressSize(enum.Enum): 18 ONE = 1 19 TWO = 2 20 THREE = 3 21 22 23class TimeSize(enum.Enum): 24 TWO = 2 25 THREE = 3 26 FOUR = 4 27 SEVEN = 7 28 29 30class Time(typing.NamedTuple): 31 size: TimeSize 32 milliseconds: int 33 """milliseconds in range [0, 59999]""" 34 invalid: bool | None 35 """available for size THREE, FOUR, SEVEN""" 36 minutes: int | None 37 """available for size THREE, FOUR, SEVEN (minutes in range [0, 59])""" 38 summer_time: bool | None 39 """available for size FOUR, SEVEN""" 40 hours: int | None 41 """available for size FOUR, SEVEN (hours in range [0, 23])""" 42 day_of_week: int | None 43 """available for size SEVEN (day_of_week in range [1, 7])""" 44 day_of_month: int | None 45 """available for size SEVEN (day_of_month in range [1, 31])""" 46 months: int | None 47 """available for size SEVEN (months in range [1, 12])""" 48 years: int | None 49 """available for size SEVEN (years in range [0, 99])""" 50 51 52class IO(typing.NamedTuple): 53 address: int 54 elements: list 55 time: Time | None 56 57 58class ASDU(typing.NamedTuple): 59 type: int 60 cause: int 61 address: int 62 ios: list[IO] 63 64 65def time_from_datetime(dt: datetime.datetime, 66 invalid: bool = False 67 ) -> Time: 68 """Create Time from datetime.datetime""" 69 # TODO document edge cases (local time, os implementation, ...) 70 # rounding microseconds to the nearest millisecond 71 dt_rounded = ( 72 dt.replace(microsecond=0) + 73 datetime.timedelta(milliseconds=round(dt.microsecond / 1000))) 74 local_time = time.localtime(dt_rounded.timestamp()) 75 76 return Time( 77 size=TimeSize.SEVEN, 78 milliseconds=(local_time.tm_sec * 1000 + 79 dt_rounded.microsecond // 1000), 80 invalid=invalid, 81 minutes=local_time.tm_min, 82 summer_time=bool(local_time.tm_isdst), 83 hours=local_time.tm_hour, 84 day_of_week=local_time.tm_wday + 1, 85 day_of_month=local_time.tm_mday, 86 months=local_time.tm_mon, 87 years=local_time.tm_year % 100) 88 89 90def time_to_datetime(t: Time 91 ) -> datetime.datetime: 92 """Convert Time to datetime.datetime""" 93 # TODO document edge cases (local time, os implementation, ...) 94 # TODO support TimeSize.FOUR 95 if t.size == TimeSize.TWO: 96 local_now = datetime.datetime.now() 97 local_dt = local_now.replace( 98 second=int(t.milliseconds / 1000), 99 microsecond=(t.milliseconds % 1000) * 1000) 100 101 local_seconds = local_now.second + local_now.microsecond / 1_000_000 102 t_seconds = t.milliseconds / 1_000 103 104 if abs(local_seconds - t_seconds) > 30: 105 if local_seconds < t_seconds: 106 local_dt = local_dt - datetime.timedelta(minutes=1) 107 108 else: 109 local_dt = local_dt + datetime.timedelta(minutes=1) 110 111 elif t.size == TimeSize.THREE: 112 local_now = datetime.datetime.now() 113 local_dt = local_now.replace( 114 minute=t.minutes, 115 second=int(t.milliseconds / 1000), 116 microsecond=(t.milliseconds % 1000) * 1000) 117 118 local_minutes = (local_now.minute + 119 local_now.second / 60 + 120 local_now.microsecond / 60_000_000) 121 t_minutes = t.minutes + t.milliseconds / 60_000 122 123 if abs(local_minutes - t_minutes) > 30: 124 if local_minutes < t_minutes: 125 local_dt = local_dt - datetime.timedelta(hours=1) 126 127 else: 128 local_dt = local_dt + datetime.timedelta(hours=1) 129 130 elif t.size == TimeSize.SEVEN: 131 local_dt = datetime.datetime( 132 year=2000 + t.years if t.years < 70 else 1900 + t.years, 133 month=t.months, 134 day=t.day_of_month, 135 hour=t.hours, 136 minute=t.minutes, 137 second=int(t.milliseconds / 1000), 138 microsecond=(t.milliseconds % 1000) * 1000, 139 fold=not t.summer_time) 140 141 else: 142 raise ValueError('unsupported time size') 143 144 return local_dt.astimezone(tz=datetime.timezone.utc)
class
CauseSize(enum.Enum):
ONE =
<CauseSize.ONE: 1>
TWO =
<CauseSize.TWO: 2>
class
AsduAddressSize(enum.Enum):
ONE =
<AsduAddressSize.ONE: 1>
TWO =
<AsduAddressSize.TWO: 2>
class
IoAddressSize(enum.Enum):
ONE =
<IoAddressSize.ONE: 1>
TWO =
<IoAddressSize.TWO: 2>
THREE =
<IoAddressSize.THREE: 3>
class
TimeSize(enum.Enum):
TWO =
<TimeSize.TWO: 2>
THREE =
<TimeSize.THREE: 3>
FOUR =
<TimeSize.FOUR: 4>
SEVEN =
<TimeSize.SEVEN: 7>
class
Time(typing.NamedTuple):
31class Time(typing.NamedTuple): 32 size: TimeSize 33 milliseconds: int 34 """milliseconds in range [0, 59999]""" 35 invalid: bool | None 36 """available for size THREE, FOUR, SEVEN""" 37 minutes: int | None 38 """available for size THREE, FOUR, SEVEN (minutes in range [0, 59])""" 39 summer_time: bool | None 40 """available for size FOUR, SEVEN""" 41 hours: int | None 42 """available for size FOUR, SEVEN (hours in range [0, 23])""" 43 day_of_week: int | None 44 """available for size SEVEN (day_of_week in range [1, 7])""" 45 day_of_month: int | None 46 """available for size SEVEN (day_of_month in range [1, 31])""" 47 months: int | None 48 """available for size SEVEN (months in range [1, 12])""" 49 years: int | None 50 """available for size SEVEN (years in range [0, 99])"""
Time(size, milliseconds, invalid, minutes, summer_time, hours, day_of_week, day_of_month, months, years)
Time( size: TimeSize, milliseconds: int, invalid: bool | None, minutes: int | None, summer_time: bool | None, hours: int | None, day_of_week: int | None, day_of_month: int | None, months: int | None, years: int | None)
Create new instance of Time(size, milliseconds, invalid, minutes, summer_time, hours, day_of_week, day_of_month, months, years)
class
IO(typing.NamedTuple):
IO(address, elements, time)
IO( address: int, elements: list, time: Time | None)
Create new instance of IO(address, elements, time)
class
ASDU(typing.NamedTuple):
ASDU(type, cause, address, ios)
ASDU( type: int, cause: int, address: int, ios: list[IO])
Create new instance of ASDU(type, cause, address, ios)
66def time_from_datetime(dt: datetime.datetime, 67 invalid: bool = False 68 ) -> Time: 69 """Create Time from datetime.datetime""" 70 # TODO document edge cases (local time, os implementation, ...) 71 # rounding microseconds to the nearest millisecond 72 dt_rounded = ( 73 dt.replace(microsecond=0) + 74 datetime.timedelta(milliseconds=round(dt.microsecond / 1000))) 75 local_time = time.localtime(dt_rounded.timestamp()) 76 77 return Time( 78 size=TimeSize.SEVEN, 79 milliseconds=(local_time.tm_sec * 1000 + 80 dt_rounded.microsecond // 1000), 81 invalid=invalid, 82 minutes=local_time.tm_min, 83 summer_time=bool(local_time.tm_isdst), 84 hours=local_time.tm_hour, 85 day_of_week=local_time.tm_wday + 1, 86 day_of_month=local_time.tm_mday, 87 months=local_time.tm_mon, 88 years=local_time.tm_year % 100)
Create Time from datetime.datetime
91def time_to_datetime(t: Time 92 ) -> datetime.datetime: 93 """Convert Time to datetime.datetime""" 94 # TODO document edge cases (local time, os implementation, ...) 95 # TODO support TimeSize.FOUR 96 if t.size == TimeSize.TWO: 97 local_now = datetime.datetime.now() 98 local_dt = local_now.replace( 99 second=int(t.milliseconds / 1000), 100 microsecond=(t.milliseconds % 1000) * 1000) 101 102 local_seconds = local_now.second + local_now.microsecond / 1_000_000 103 t_seconds = t.milliseconds / 1_000 104 105 if abs(local_seconds - t_seconds) > 30: 106 if local_seconds < t_seconds: 107 local_dt = local_dt - datetime.timedelta(minutes=1) 108 109 else: 110 local_dt = local_dt + datetime.timedelta(minutes=1) 111 112 elif t.size == TimeSize.THREE: 113 local_now = datetime.datetime.now() 114 local_dt = local_now.replace( 115 minute=t.minutes, 116 second=int(t.milliseconds / 1000), 117 microsecond=(t.milliseconds % 1000) * 1000) 118 119 local_minutes = (local_now.minute + 120 local_now.second / 60 + 121 local_now.microsecond / 60_000_000) 122 t_minutes = t.minutes + t.milliseconds / 60_000 123 124 if abs(local_minutes - t_minutes) > 30: 125 if local_minutes < t_minutes: 126 local_dt = local_dt - datetime.timedelta(hours=1) 127 128 else: 129 local_dt = local_dt + datetime.timedelta(hours=1) 130 131 elif t.size == TimeSize.SEVEN: 132 local_dt = datetime.datetime( 133 year=2000 + t.years if t.years < 70 else 1900 + t.years, 134 month=t.months, 135 day=t.day_of_month, 136 hour=t.hours, 137 minute=t.minutes, 138 second=int(t.milliseconds / 1000), 139 microsecond=(t.milliseconds % 1000) * 1000, 140 fold=not t.summer_time) 141 142 else: 143 raise ValueError('unsupported time size') 144 145 return local_dt.astimezone(tz=datetime.timezone.utc)
Convert Time to datetime.datetime