hat.drivers.iec60870.encodings.common

  1import datetime
  2import enum
  3import time
  4import typing
  5
  6
  7class CauseSize(enum.Enum):
  8    ONE = 1
  9    TWO = 2
 10
 11
 12class AsduAddressSize(enum.Enum):
 13    ONE = 1
 14    TWO = 2
 15
 16
 17class IoAddressSize(enum.Enum):
 18    ONE = 1
 19    TWO = 2
 20    THREE = 3
 21
 22
 23class TimeSize(enum.Enum):
 24    TWO = 2
 25    THREE = 3
 26    FOUR = 4
 27    SEVEN = 7
 28
 29
 30class Time(typing.NamedTuple):
 31    size: TimeSize
 32    milliseconds: int
 33    """milliseconds in range [0, 59999]"""
 34    invalid: bool | None
 35    """available for size THREE, FOUR, SEVEN"""
 36    minutes: int | None
 37    """available for size THREE, FOUR, SEVEN (minutes in range [0, 59])"""
 38    summer_time: bool | None
 39    """available for size FOUR, SEVEN"""
 40    hours: int | None
 41    """available for size FOUR, SEVEN (hours in range [0, 23])"""
 42    day_of_week: int | None
 43    """available for size SEVEN (day_of_week in range [1, 7])"""
 44    day_of_month: int | None
 45    """available for size SEVEN (day_of_month in range [1, 31])"""
 46    months: int | None
 47    """available for size SEVEN (months in range [1, 12])"""
 48    years: int | None
 49    """available for size SEVEN (years in range [0, 99])"""
 50
 51
 52class IO(typing.NamedTuple):
 53    address: int
 54    elements: list
 55    time: Time | None
 56
 57
 58class ASDU(typing.NamedTuple):
 59    type: int
 60    cause: int
 61    address: int
 62    ios: list[IO]
 63
 64
 65def time_from_datetime(dt: datetime.datetime,
 66                       invalid: bool = False
 67                       ) -> Time:
 68    """Create Time from datetime.datetime"""
 69    # TODO document edge cases (local time, os implementation, ...)
 70    #  rounding microseconds to the nearest millisecond
 71    dt_rounded = (
 72        dt.replace(microsecond=0) +
 73        datetime.timedelta(milliseconds=round(dt.microsecond / 1000)))
 74    local_time = time.localtime(dt_rounded.timestamp())
 75
 76    return Time(
 77        size=TimeSize.SEVEN,
 78        milliseconds=(local_time.tm_sec * 1000 +
 79                      dt_rounded.microsecond // 1000),
 80        invalid=invalid,
 81        minutes=local_time.tm_min,
 82        summer_time=bool(local_time.tm_isdst),
 83        hours=local_time.tm_hour,
 84        day_of_week=local_time.tm_wday + 1,
 85        day_of_month=local_time.tm_mday,
 86        months=local_time.tm_mon,
 87        years=local_time.tm_year % 100)
 88
 89
 90def time_to_datetime(t: Time
 91                     ) -> datetime.datetime:
 92    """Convert Time to datetime.datetime"""
 93    # TODO document edge cases (local time, os implementation, ...)
 94    # TODO support TimeSize.FOUR
 95    if t.size == TimeSize.TWO:
 96        local_now = datetime.datetime.now()
 97        local_dt = local_now.replace(
 98            second=int(t.milliseconds / 1000),
 99            microsecond=(t.milliseconds % 1000) * 1000)
100
101        local_seconds = local_now.second + local_now.microsecond / 1_000_000
102        t_seconds = t.milliseconds / 1_000
103
104        if abs(local_seconds - t_seconds) > 30:
105            if local_seconds < t_seconds:
106                local_dt = local_dt - datetime.timedelta(minutes=1)
107
108            else:
109                local_dt = local_dt + datetime.timedelta(minutes=1)
110
111    elif t.size == TimeSize.THREE:
112        local_now = datetime.datetime.now()
113        local_dt = local_now.replace(
114            minute=t.minutes,
115            second=int(t.milliseconds / 1000),
116            microsecond=(t.milliseconds % 1000) * 1000)
117
118        local_minutes = (local_now.minute +
119                         local_now.second / 60 +
120                         local_now.microsecond / 60_000_000)
121        t_minutes = t.minutes + t.milliseconds / 60_000
122
123        if abs(local_minutes - t_minutes) > 30:
124            if local_minutes < t_minutes:
125                local_dt = local_dt - datetime.timedelta(hours=1)
126
127            else:
128                local_dt = local_dt + datetime.timedelta(hours=1)
129
130    elif t.size == TimeSize.SEVEN:
131        local_dt = datetime.datetime(
132            year=2000 + t.years if t.years < 70 else 1900 + t.years,
133            month=t.months,
134            day=t.day_of_month,
135            hour=t.hours,
136            minute=t.minutes,
137            second=int(t.milliseconds / 1000),
138            microsecond=(t.milliseconds % 1000) * 1000,
139            fold=not t.summer_time)
140
141    else:
142        raise ValueError('unsupported time size')
143
144    return local_dt.astimezone(tz=datetime.timezone.utc)
class CauseSize(enum.Enum):
 8class CauseSize(enum.Enum):
 9    ONE = 1
10    TWO = 2
ONE = <CauseSize.ONE: 1>
TWO = <CauseSize.TWO: 2>
class AsduAddressSize(enum.Enum):
13class AsduAddressSize(enum.Enum):
14    ONE = 1
15    TWO = 2
ONE = <AsduAddressSize.ONE: 1>
TWO = <AsduAddressSize.TWO: 2>
class IoAddressSize(enum.Enum):
18class IoAddressSize(enum.Enum):
19    ONE = 1
20    TWO = 2
21    THREE = 3
ONE = <IoAddressSize.ONE: 1>
TWO = <IoAddressSize.TWO: 2>
THREE = <IoAddressSize.THREE: 3>
class TimeSize(enum.Enum):
24class TimeSize(enum.Enum):
25    TWO = 2
26    THREE = 3
27    FOUR = 4
28    SEVEN = 7
TWO = <TimeSize.TWO: 2>
THREE = <TimeSize.THREE: 3>
FOUR = <TimeSize.FOUR: 4>
SEVEN = <TimeSize.SEVEN: 7>
class Time(typing.NamedTuple):
31class Time(typing.NamedTuple):
32    size: TimeSize
33    milliseconds: int
34    """milliseconds in range [0, 59999]"""
35    invalid: bool | None
36    """available for size THREE, FOUR, SEVEN"""
37    minutes: int | None
38    """available for size THREE, FOUR, SEVEN (minutes in range [0, 59])"""
39    summer_time: bool | None
40    """available for size FOUR, SEVEN"""
41    hours: int | None
42    """available for size FOUR, SEVEN (hours in range [0, 23])"""
43    day_of_week: int | None
44    """available for size SEVEN (day_of_week in range [1, 7])"""
45    day_of_month: int | None
46    """available for size SEVEN (day_of_month in range [1, 31])"""
47    months: int | None
48    """available for size SEVEN (months in range [1, 12])"""
49    years: int | None
50    """available for size SEVEN (years in range [0, 99])"""

Time(size, milliseconds, invalid, minutes, summer_time, hours, day_of_week, day_of_month, months, years)

Time( size: TimeSize, milliseconds: int, invalid: bool | None, minutes: int | None, summer_time: bool | None, hours: int | None, day_of_week: int | None, day_of_month: int | None, months: int | None, years: int | None)

Create new instance of Time(size, milliseconds, invalid, minutes, summer_time, hours, day_of_week, day_of_month, months, years)

size: TimeSize

Alias for field number 0

milliseconds: int

milliseconds in range [0, 59999]

invalid: bool | None

available for size THREE, FOUR, SEVEN

minutes: int | None

available for size THREE, FOUR, SEVEN (minutes in range [0, 59])

summer_time: bool | None

available for size FOUR, SEVEN

hours: int | None

available for size FOUR, SEVEN (hours in range [0, 23])

day_of_week: int | None

available for size SEVEN (day_of_week in range [1, 7])

day_of_month: int | None

available for size SEVEN (day_of_month in range [1, 31])

months: int | None

available for size SEVEN (months in range [1, 12])

years: int | None

available for size SEVEN (years in range [0, 99])

class IO(typing.NamedTuple):
53class IO(typing.NamedTuple):
54    address: int
55    elements: list
56    time: Time | None

IO(address, elements, time)

IO( address: int, elements: list, time: Time | None)

Create new instance of IO(address, elements, time)

address: int

Alias for field number 0

elements: list

Alias for field number 1

time: Time | None

Alias for field number 2

class ASDU(typing.NamedTuple):
59class ASDU(typing.NamedTuple):
60    type: int
61    cause: int
62    address: int
63    ios: list[IO]

ASDU(type, cause, address, ios)

ASDU( type: int, cause: int, address: int, ios: list[IO])

Create new instance of ASDU(type, cause, address, ios)

type: int

Alias for field number 0

cause: int

Alias for field number 1

address: int

Alias for field number 2

ios: list[IO]

Alias for field number 3

def time_from_datetime( dt: datetime.datetime, invalid: bool = False) -> Time:
66def time_from_datetime(dt: datetime.datetime,
67                       invalid: bool = False
68                       ) -> Time:
69    """Create Time from datetime.datetime"""
70    # TODO document edge cases (local time, os implementation, ...)
71    #  rounding microseconds to the nearest millisecond
72    dt_rounded = (
73        dt.replace(microsecond=0) +
74        datetime.timedelta(milliseconds=round(dt.microsecond / 1000)))
75    local_time = time.localtime(dt_rounded.timestamp())
76
77    return Time(
78        size=TimeSize.SEVEN,
79        milliseconds=(local_time.tm_sec * 1000 +
80                      dt_rounded.microsecond // 1000),
81        invalid=invalid,
82        minutes=local_time.tm_min,
83        summer_time=bool(local_time.tm_isdst),
84        hours=local_time.tm_hour,
85        day_of_week=local_time.tm_wday + 1,
86        day_of_month=local_time.tm_mday,
87        months=local_time.tm_mon,
88        years=local_time.tm_year % 100)

Create Time from datetime.datetime

def time_to_datetime(t: Time) -> datetime.datetime:
 91def time_to_datetime(t: Time
 92                     ) -> datetime.datetime:
 93    """Convert Time to datetime.datetime"""
 94    # TODO document edge cases (local time, os implementation, ...)
 95    # TODO support TimeSize.FOUR
 96    if t.size == TimeSize.TWO:
 97        local_now = datetime.datetime.now()
 98        local_dt = local_now.replace(
 99            second=int(t.milliseconds / 1000),
100            microsecond=(t.milliseconds % 1000) * 1000)
101
102        local_seconds = local_now.second + local_now.microsecond / 1_000_000
103        t_seconds = t.milliseconds / 1_000
104
105        if abs(local_seconds - t_seconds) > 30:
106            if local_seconds < t_seconds:
107                local_dt = local_dt - datetime.timedelta(minutes=1)
108
109            else:
110                local_dt = local_dt + datetime.timedelta(minutes=1)
111
112    elif t.size == TimeSize.THREE:
113        local_now = datetime.datetime.now()
114        local_dt = local_now.replace(
115            minute=t.minutes,
116            second=int(t.milliseconds / 1000),
117            microsecond=(t.milliseconds % 1000) * 1000)
118
119        local_minutes = (local_now.minute +
120                         local_now.second / 60 +
121                         local_now.microsecond / 60_000_000)
122        t_minutes = t.minutes + t.milliseconds / 60_000
123
124        if abs(local_minutes - t_minutes) > 30:
125            if local_minutes < t_minutes:
126                local_dt = local_dt - datetime.timedelta(hours=1)
127
128            else:
129                local_dt = local_dt + datetime.timedelta(hours=1)
130
131    elif t.size == TimeSize.SEVEN:
132        local_dt = datetime.datetime(
133            year=2000 + t.years if t.years < 70 else 1900 + t.years,
134            month=t.months,
135            day=t.day_of_month,
136            hour=t.hours,
137            minute=t.minutes,
138            second=int(t.milliseconds / 1000),
139            microsecond=(t.milliseconds % 1000) * 1000,
140            fold=not t.summer_time)
141
142    else:
143        raise ValueError('unsupported time size')
144
145    return local_dt.astimezone(tz=datetime.timezone.utc)

Convert Time to datetime.datetime