hat.drivers.iec60870.encodings.common

  1import datetime
  2import enum
  3import time
  4import typing
  5
  6
  7class AsduTypeError(Exception):
  8    pass
  9
 10
 11class CauseSize(enum.Enum):
 12    ONE = 1
 13    TWO = 2
 14
 15
 16class AsduAddressSize(enum.Enum):
 17    ONE = 1
 18    TWO = 2
 19
 20
 21class IoAddressSize(enum.Enum):
 22    ONE = 1
 23    TWO = 2
 24    THREE = 3
 25
 26
 27class TimeSize(enum.Enum):
 28    TWO = 2
 29    THREE = 3
 30    FOUR = 4
 31    SEVEN = 7
 32
 33
 34class Time(typing.NamedTuple):
 35    size: TimeSize
 36    milliseconds: int
 37    """milliseconds in range [0, 59999]"""
 38    invalid: bool | None
 39    """available for size THREE, FOUR, SEVEN"""
 40    minutes: int | None
 41    """available for size THREE, FOUR, SEVEN (minutes in range [0, 59])"""
 42    summer_time: bool | None
 43    """available for size FOUR, SEVEN"""
 44    hours: int | None
 45    """available for size FOUR, SEVEN (hours in range [0, 23])"""
 46    day_of_week: int | None
 47    """available for size SEVEN (day_of_week in range [1, 7])"""
 48    day_of_month: int | None
 49    """available for size SEVEN (day_of_month in range [1, 31])"""
 50    months: int | None
 51    """available for size SEVEN (months in range [1, 12])"""
 52    years: int | None
 53    """available for size SEVEN (years in range [0, 99])"""
 54
 55
 56class IO(typing.NamedTuple):
 57    address: int
 58    elements: list
 59    time: Time | None
 60
 61
 62class ASDU(typing.NamedTuple):
 63    type: int
 64    cause: int
 65    address: int
 66    ios: list[IO]
 67
 68
 69def time_from_datetime(dt: datetime.datetime,
 70                       invalid: bool = False
 71                       ) -> Time:
 72    """Create Time from datetime.datetime"""
 73    # TODO document edge cases (local time, os implementation, ...)
 74    #  rounding microseconds to the nearest millisecond
 75    dt_rounded = (
 76        dt.replace(microsecond=0) +
 77        datetime.timedelta(milliseconds=round(dt.microsecond / 1000)))
 78    local_time = time.localtime(dt_rounded.timestamp())
 79
 80    return Time(
 81        size=TimeSize.SEVEN,
 82        milliseconds=(local_time.tm_sec * 1000 +
 83                      dt_rounded.microsecond // 1000),
 84        invalid=invalid,
 85        minutes=local_time.tm_min,
 86        summer_time=bool(local_time.tm_isdst),
 87        hours=local_time.tm_hour,
 88        day_of_week=local_time.tm_wday + 1,
 89        day_of_month=local_time.tm_mday,
 90        months=local_time.tm_mon,
 91        years=local_time.tm_year % 100)
 92
 93
 94def time_to_datetime(t: Time
 95                     ) -> datetime.datetime:
 96    """Convert Time to datetime.datetime"""
 97    # TODO document edge cases (local time, os implementation, ...)
 98    # TODO support TimeSize.FOUR
 99    if t.size == TimeSize.TWO:
100        local_now = datetime.datetime.now()
101        local_dt = local_now.replace(
102            second=int(t.milliseconds / 1000),
103            microsecond=(t.milliseconds % 1000) * 1000)
104
105        local_seconds = local_now.second + local_now.microsecond / 1_000_000
106        t_seconds = t.milliseconds / 1_000
107
108        if abs(local_seconds - t_seconds) > 30:
109            if local_seconds < t_seconds:
110                local_dt = local_dt - datetime.timedelta(minutes=1)
111
112            else:
113                local_dt = local_dt + datetime.timedelta(minutes=1)
114
115    elif t.size == TimeSize.THREE:
116        local_now = datetime.datetime.now()
117        local_dt = local_now.replace(
118            minute=t.minutes,
119            second=int(t.milliseconds / 1000),
120            microsecond=(t.milliseconds % 1000) * 1000)
121
122        local_minutes = (local_now.minute +
123                         local_now.second / 60 +
124                         local_now.microsecond / 60_000_000)
125        t_minutes = t.minutes + t.milliseconds / 60_000
126
127        if abs(local_minutes - t_minutes) > 30:
128            if local_minutes < t_minutes:
129                local_dt = local_dt - datetime.timedelta(hours=1)
130
131            else:
132                local_dt = local_dt + datetime.timedelta(hours=1)
133
134    elif t.size == TimeSize.SEVEN:
135        local_dt = datetime.datetime(
136            year=2000 + t.years if t.years < 70 else 1900 + t.years,
137            month=t.months,
138            day=t.day_of_month,
139            hour=t.hours,
140            minute=t.minutes,
141            second=int(t.milliseconds / 1000),
142            microsecond=(t.milliseconds % 1000) * 1000,
143            fold=not t.summer_time)
144
145    else:
146        raise ValueError('unsupported time size')
147
148    return local_dt.astimezone(tz=datetime.timezone.utc)
class AsduTypeError(builtins.Exception):
8class AsduTypeError(Exception):
9    pass

Common base class for all non-exit exceptions.

class CauseSize(enum.Enum):
12class CauseSize(enum.Enum):
13    ONE = 1
14    TWO = 2
ONE = <CauseSize.ONE: 1>
TWO = <CauseSize.TWO: 2>
class AsduAddressSize(enum.Enum):
17class AsduAddressSize(enum.Enum):
18    ONE = 1
19    TWO = 2
ONE = <AsduAddressSize.ONE: 1>
TWO = <AsduAddressSize.TWO: 2>
class IoAddressSize(enum.Enum):
22class IoAddressSize(enum.Enum):
23    ONE = 1
24    TWO = 2
25    THREE = 3
ONE = <IoAddressSize.ONE: 1>
TWO = <IoAddressSize.TWO: 2>
THREE = <IoAddressSize.THREE: 3>
class TimeSize(enum.Enum):
28class TimeSize(enum.Enum):
29    TWO = 2
30    THREE = 3
31    FOUR = 4
32    SEVEN = 7
TWO = <TimeSize.TWO: 2>
THREE = <TimeSize.THREE: 3>
FOUR = <TimeSize.FOUR: 4>
SEVEN = <TimeSize.SEVEN: 7>
class Time(typing.NamedTuple):
35class Time(typing.NamedTuple):
36    size: TimeSize
37    milliseconds: int
38    """milliseconds in range [0, 59999]"""
39    invalid: bool | None
40    """available for size THREE, FOUR, SEVEN"""
41    minutes: int | None
42    """available for size THREE, FOUR, SEVEN (minutes in range [0, 59])"""
43    summer_time: bool | None
44    """available for size FOUR, SEVEN"""
45    hours: int | None
46    """available for size FOUR, SEVEN (hours in range [0, 23])"""
47    day_of_week: int | None
48    """available for size SEVEN (day_of_week in range [1, 7])"""
49    day_of_month: int | None
50    """available for size SEVEN (day_of_month in range [1, 31])"""
51    months: int | None
52    """available for size SEVEN (months in range [1, 12])"""
53    years: int | None
54    """available for size SEVEN (years in range [0, 99])"""

Time(size, milliseconds, invalid, minutes, summer_time, hours, day_of_week, day_of_month, months, years)

Time( size: TimeSize, milliseconds: int, invalid: bool | None, minutes: int | None, summer_time: bool | None, hours: int | None, day_of_week: int | None, day_of_month: int | None, months: int | None, years: int | None)

Create new instance of Time(size, milliseconds, invalid, minutes, summer_time, hours, day_of_week, day_of_month, months, years)

size: TimeSize

Alias for field number 0

milliseconds: int

milliseconds in range [0, 59999]

invalid: bool | None

available for size THREE, FOUR, SEVEN

minutes: int | None

available for size THREE, FOUR, SEVEN (minutes in range [0, 59])

summer_time: bool | None

available for size FOUR, SEVEN

hours: int | None

available for size FOUR, SEVEN (hours in range [0, 23])

day_of_week: int | None

available for size SEVEN (day_of_week in range [1, 7])

day_of_month: int | None

available for size SEVEN (day_of_month in range [1, 31])

months: int | None

available for size SEVEN (months in range [1, 12])

years: int | None

available for size SEVEN (years in range [0, 99])

class IO(typing.NamedTuple):
57class IO(typing.NamedTuple):
58    address: int
59    elements: list
60    time: Time | None

IO(address, elements, time)

IO( address: int, elements: list, time: Time | None)

Create new instance of IO(address, elements, time)

address: int

Alias for field number 0

elements: list

Alias for field number 1

time: Time | None

Alias for field number 2

class ASDU(typing.NamedTuple):
63class ASDU(typing.NamedTuple):
64    type: int
65    cause: int
66    address: int
67    ios: list[IO]

ASDU(type, cause, address, ios)

ASDU( type: int, cause: int, address: int, ios: list[IO])

Create new instance of ASDU(type, cause, address, ios)

type: int

Alias for field number 0

cause: int

Alias for field number 1

address: int

Alias for field number 2

ios: list[IO]

Alias for field number 3

def time_from_datetime( dt: datetime.datetime, invalid: bool = False) -> Time:
70def time_from_datetime(dt: datetime.datetime,
71                       invalid: bool = False
72                       ) -> Time:
73    """Create Time from datetime.datetime"""
74    # TODO document edge cases (local time, os implementation, ...)
75    #  rounding microseconds to the nearest millisecond
76    dt_rounded = (
77        dt.replace(microsecond=0) +
78        datetime.timedelta(milliseconds=round(dt.microsecond / 1000)))
79    local_time = time.localtime(dt_rounded.timestamp())
80
81    return Time(
82        size=TimeSize.SEVEN,
83        milliseconds=(local_time.tm_sec * 1000 +
84                      dt_rounded.microsecond // 1000),
85        invalid=invalid,
86        minutes=local_time.tm_min,
87        summer_time=bool(local_time.tm_isdst),
88        hours=local_time.tm_hour,
89        day_of_week=local_time.tm_wday + 1,
90        day_of_month=local_time.tm_mday,
91        months=local_time.tm_mon,
92        years=local_time.tm_year % 100)

Create Time from datetime.datetime

def time_to_datetime(t: Time) -> datetime.datetime:
 95def time_to_datetime(t: Time
 96                     ) -> datetime.datetime:
 97    """Convert Time to datetime.datetime"""
 98    # TODO document edge cases (local time, os implementation, ...)
 99    # TODO support TimeSize.FOUR
100    if t.size == TimeSize.TWO:
101        local_now = datetime.datetime.now()
102        local_dt = local_now.replace(
103            second=int(t.milliseconds / 1000),
104            microsecond=(t.milliseconds % 1000) * 1000)
105
106        local_seconds = local_now.second + local_now.microsecond / 1_000_000
107        t_seconds = t.milliseconds / 1_000
108
109        if abs(local_seconds - t_seconds) > 30:
110            if local_seconds < t_seconds:
111                local_dt = local_dt - datetime.timedelta(minutes=1)
112
113            else:
114                local_dt = local_dt + datetime.timedelta(minutes=1)
115
116    elif t.size == TimeSize.THREE:
117        local_now = datetime.datetime.now()
118        local_dt = local_now.replace(
119            minute=t.minutes,
120            second=int(t.milliseconds / 1000),
121            microsecond=(t.milliseconds % 1000) * 1000)
122
123        local_minutes = (local_now.minute +
124                         local_now.second / 60 +
125                         local_now.microsecond / 60_000_000)
126        t_minutes = t.minutes + t.milliseconds / 60_000
127
128        if abs(local_minutes - t_minutes) > 30:
129            if local_minutes < t_minutes:
130                local_dt = local_dt - datetime.timedelta(hours=1)
131
132            else:
133                local_dt = local_dt + datetime.timedelta(hours=1)
134
135    elif t.size == TimeSize.SEVEN:
136        local_dt = datetime.datetime(
137            year=2000 + t.years if t.years < 70 else 1900 + t.years,
138            month=t.months,
139            day=t.day_of_month,
140            hour=t.hours,
141            minute=t.minutes,
142            second=int(t.milliseconds / 1000),
143            microsecond=(t.milliseconds % 1000) * 1000,
144            fold=not t.summer_time)
145
146    else:
147        raise ValueError('unsupported time size')
148
149    return local_dt.astimezone(tz=datetime.timezone.utc)

Convert Time to datetime.datetime