hat.drivers.iec60870.encodings.common

  1import datetime
  2import enum
  3import time
  4import typing
  5
  6
  7class AsduTypeError(Exception):
  8    pass
  9
 10
 11class CauseSize(enum.Enum):
 12    ONE = 1
 13    TWO = 2
 14
 15
 16class AsduAddressSize(enum.Enum):
 17    ONE = 1
 18    TWO = 2
 19
 20
 21class IoAddressSize(enum.Enum):
 22    ONE = 1
 23    TWO = 2
 24    THREE = 3
 25
 26
 27class TimeSize(enum.Enum):
 28    TWO = 2
 29    THREE = 3
 30    FOUR = 4
 31    SEVEN = 7
 32
 33
 34class Time(typing.NamedTuple):
 35    size: TimeSize
 36    milliseconds: int
 37    """milliseconds in range [0, 59999]"""
 38    invalid: bool | None
 39    """available for size THREE, FOUR, SEVEN"""
 40    substituted: bool | None
 41    """available for size THREE, FOUR, SEVEN"""
 42    minutes: int | None
 43    """available for size THREE, FOUR, SEVEN (minutes in range [0, 59])"""
 44    summer_time: bool | None
 45    """available for size FOUR, SEVEN"""
 46    hours: int | None
 47    """available for size FOUR, SEVEN (hours in range [0, 23])"""
 48    day_of_week: int | None
 49    """available for size SEVEN (day_of_week in range [1, 7])"""
 50    day_of_month: int | None
 51    """available for size SEVEN (day_of_month in range [1, 31])"""
 52    months: int | None
 53    """available for size SEVEN (months in range [1, 12])"""
 54    years: int | None
 55    """available for size SEVEN (years in range [0, 99])"""
 56
 57
 58class IO(typing.NamedTuple):
 59    address: int
 60    elements: list
 61    time: Time | None
 62
 63
 64class ASDU(typing.NamedTuple):
 65    type: int
 66    cause: int
 67    address: int
 68    ios: list[IO]
 69
 70
 71def time_from_datetime(dt: datetime.datetime,
 72                       invalid: bool = False,
 73                       substituted: bool = False
 74                       ) -> Time:
 75    """Create Time from datetime.datetime"""
 76    # TODO document edge cases (local time, os implementation, ...)
 77    #  rounding microseconds to the nearest millisecond
 78    dt_rounded = (
 79        dt.replace(microsecond=0) +
 80        datetime.timedelta(milliseconds=round(dt.microsecond / 1000)))
 81    local_time = time.localtime(dt_rounded.timestamp())
 82
 83    return Time(
 84        size=TimeSize.SEVEN,
 85        milliseconds=(local_time.tm_sec * 1000 +
 86                      dt_rounded.microsecond // 1000),
 87        invalid=invalid,
 88        substituted=substituted,
 89        minutes=local_time.tm_min,
 90        summer_time=bool(local_time.tm_isdst),
 91        hours=local_time.tm_hour,
 92        day_of_week=local_time.tm_wday + 1,
 93        day_of_month=local_time.tm_mday,
 94        months=local_time.tm_mon,
 95        years=local_time.tm_year % 100)
 96
 97
 98def time_to_datetime(t: Time
 99                     ) -> datetime.datetime:
100    """Convert Time to datetime.datetime"""
101    # TODO document edge cases (local time, os implementation, ...)
102    # TODO support TimeSize.FOUR
103    if t.size == TimeSize.TWO:
104        local_now = datetime.datetime.now()
105        local_dt = local_now.replace(
106            second=int(t.milliseconds / 1000),
107            microsecond=(t.milliseconds % 1000) * 1000)
108
109        local_seconds = local_now.second + local_now.microsecond / 1_000_000
110        t_seconds = t.milliseconds / 1_000
111
112        if abs(local_seconds - t_seconds) > 30:
113            if local_seconds < t_seconds:
114                local_dt = local_dt - datetime.timedelta(minutes=1)
115
116            else:
117                local_dt = local_dt + datetime.timedelta(minutes=1)
118
119    elif t.size == TimeSize.THREE:
120        local_now = datetime.datetime.now()
121        local_dt = local_now.replace(
122            minute=t.minutes,
123            second=int(t.milliseconds / 1000),
124            microsecond=(t.milliseconds % 1000) * 1000)
125
126        local_minutes = (local_now.minute +
127                         local_now.second / 60 +
128                         local_now.microsecond / 60_000_000)
129        t_minutes = t.minutes + t.milliseconds / 60_000
130
131        if abs(local_minutes - t_minutes) > 30:
132            if local_minutes < t_minutes:
133                local_dt = local_dt - datetime.timedelta(hours=1)
134
135            else:
136                local_dt = local_dt + datetime.timedelta(hours=1)
137
138    elif t.size == TimeSize.SEVEN:
139        local_dt = datetime.datetime(
140            year=2000 + t.years if t.years < 70 else 1900 + t.years,
141            month=t.months,
142            day=t.day_of_month,
143            hour=t.hours,
144            minute=t.minutes,
145            second=int(t.milliseconds / 1000),
146            microsecond=(t.milliseconds % 1000) * 1000,
147            fold=not t.summer_time)
148
149    else:
150        raise ValueError('unsupported time size')
151
152    return local_dt.astimezone(tz=datetime.timezone.utc)
class AsduTypeError(builtins.Exception):
8class AsduTypeError(Exception):
9    pass

Common base class for all non-exit exceptions.

class CauseSize(enum.Enum):
12class CauseSize(enum.Enum):
13    ONE = 1
14    TWO = 2
ONE = <CauseSize.ONE: 1>
TWO = <CauseSize.TWO: 2>
class AsduAddressSize(enum.Enum):
17class AsduAddressSize(enum.Enum):
18    ONE = 1
19    TWO = 2
ONE = <AsduAddressSize.ONE: 1>
TWO = <AsduAddressSize.TWO: 2>
class IoAddressSize(enum.Enum):
22class IoAddressSize(enum.Enum):
23    ONE = 1
24    TWO = 2
25    THREE = 3
ONE = <IoAddressSize.ONE: 1>
TWO = <IoAddressSize.TWO: 2>
THREE = <IoAddressSize.THREE: 3>
class TimeSize(enum.Enum):
28class TimeSize(enum.Enum):
29    TWO = 2
30    THREE = 3
31    FOUR = 4
32    SEVEN = 7
TWO = <TimeSize.TWO: 2>
THREE = <TimeSize.THREE: 3>
FOUR = <TimeSize.FOUR: 4>
SEVEN = <TimeSize.SEVEN: 7>
class Time(typing.NamedTuple):
35class Time(typing.NamedTuple):
36    size: TimeSize
37    milliseconds: int
38    """milliseconds in range [0, 59999]"""
39    invalid: bool | None
40    """available for size THREE, FOUR, SEVEN"""
41    substituted: bool | None
42    """available for size THREE, FOUR, SEVEN"""
43    minutes: int | None
44    """available for size THREE, FOUR, SEVEN (minutes in range [0, 59])"""
45    summer_time: bool | None
46    """available for size FOUR, SEVEN"""
47    hours: int | None
48    """available for size FOUR, SEVEN (hours in range [0, 23])"""
49    day_of_week: int | None
50    """available for size SEVEN (day_of_week in range [1, 7])"""
51    day_of_month: int | None
52    """available for size SEVEN (day_of_month in range [1, 31])"""
53    months: int | None
54    """available for size SEVEN (months in range [1, 12])"""
55    years: int | None
56    """available for size SEVEN (years in range [0, 99])"""

Time(size, milliseconds, invalid, substituted, minutes, summer_time, hours, day_of_week, day_of_month, months, years)

Time( size: TimeSize, milliseconds: int, invalid: bool | None, substituted: bool | None, minutes: int | None, summer_time: bool | None, hours: int | None, day_of_week: int | None, day_of_month: int | None, months: int | None, years: int | None)

Create new instance of Time(size, milliseconds, invalid, substituted, minutes, summer_time, hours, day_of_week, day_of_month, months, years)

size: TimeSize

Alias for field number 0

milliseconds: int

milliseconds in range [0, 59999]

invalid: bool | None

available for size THREE, FOUR, SEVEN

substituted: bool | None

available for size THREE, FOUR, SEVEN

minutes: int | None

available for size THREE, FOUR, SEVEN (minutes in range [0, 59])

summer_time: bool | None

available for size FOUR, SEVEN

hours: int | None

available for size FOUR, SEVEN (hours in range [0, 23])

day_of_week: int | None

available for size SEVEN (day_of_week in range [1, 7])

day_of_month: int | None

available for size SEVEN (day_of_month in range [1, 31])

months: int | None

available for size SEVEN (months in range [1, 12])

years: int | None

available for size SEVEN (years in range [0, 99])

class IO(typing.NamedTuple):
59class IO(typing.NamedTuple):
60    address: int
61    elements: list
62    time: Time | None

IO(address, elements, time)

IO( address: int, elements: list, time: Time | None)

Create new instance of IO(address, elements, time)

address: int

Alias for field number 0

elements: list

Alias for field number 1

time: Time | None

Alias for field number 2

class ASDU(typing.NamedTuple):
65class ASDU(typing.NamedTuple):
66    type: int
67    cause: int
68    address: int
69    ios: list[IO]

ASDU(type, cause, address, ios)

ASDU( type: int, cause: int, address: int, ios: list[IO])

Create new instance of ASDU(type, cause, address, ios)

type: int

Alias for field number 0

cause: int

Alias for field number 1

address: int

Alias for field number 2

ios: list[IO]

Alias for field number 3

def time_from_datetime( dt: datetime.datetime, invalid: bool = False, substituted: bool = False) -> Time:
72def time_from_datetime(dt: datetime.datetime,
73                       invalid: bool = False,
74                       substituted: bool = False
75                       ) -> Time:
76    """Create Time from datetime.datetime"""
77    # TODO document edge cases (local time, os implementation, ...)
78    #  rounding microseconds to the nearest millisecond
79    dt_rounded = (
80        dt.replace(microsecond=0) +
81        datetime.timedelta(milliseconds=round(dt.microsecond / 1000)))
82    local_time = time.localtime(dt_rounded.timestamp())
83
84    return Time(
85        size=TimeSize.SEVEN,
86        milliseconds=(local_time.tm_sec * 1000 +
87                      dt_rounded.microsecond // 1000),
88        invalid=invalid,
89        substituted=substituted,
90        minutes=local_time.tm_min,
91        summer_time=bool(local_time.tm_isdst),
92        hours=local_time.tm_hour,
93        day_of_week=local_time.tm_wday + 1,
94        day_of_month=local_time.tm_mday,
95        months=local_time.tm_mon,
96        years=local_time.tm_year % 100)

Create Time from datetime.datetime

def time_to_datetime(t: Time) -> datetime.datetime:
 99def time_to_datetime(t: Time
100                     ) -> datetime.datetime:
101    """Convert Time to datetime.datetime"""
102    # TODO document edge cases (local time, os implementation, ...)
103    # TODO support TimeSize.FOUR
104    if t.size == TimeSize.TWO:
105        local_now = datetime.datetime.now()
106        local_dt = local_now.replace(
107            second=int(t.milliseconds / 1000),
108            microsecond=(t.milliseconds % 1000) * 1000)
109
110        local_seconds = local_now.second + local_now.microsecond / 1_000_000
111        t_seconds = t.milliseconds / 1_000
112
113        if abs(local_seconds - t_seconds) > 30:
114            if local_seconds < t_seconds:
115                local_dt = local_dt - datetime.timedelta(minutes=1)
116
117            else:
118                local_dt = local_dt + datetime.timedelta(minutes=1)
119
120    elif t.size == TimeSize.THREE:
121        local_now = datetime.datetime.now()
122        local_dt = local_now.replace(
123            minute=t.minutes,
124            second=int(t.milliseconds / 1000),
125            microsecond=(t.milliseconds % 1000) * 1000)
126
127        local_minutes = (local_now.minute +
128                         local_now.second / 60 +
129                         local_now.microsecond / 60_000_000)
130        t_minutes = t.minutes + t.milliseconds / 60_000
131
132        if abs(local_minutes - t_minutes) > 30:
133            if local_minutes < t_minutes:
134                local_dt = local_dt - datetime.timedelta(hours=1)
135
136            else:
137                local_dt = local_dt + datetime.timedelta(hours=1)
138
139    elif t.size == TimeSize.SEVEN:
140        local_dt = datetime.datetime(
141            year=2000 + t.years if t.years < 70 else 1900 + t.years,
142            month=t.months,
143            day=t.day_of_month,
144            hour=t.hours,
145            minute=t.minutes,
146            second=int(t.milliseconds / 1000),
147            microsecond=(t.milliseconds % 1000) * 1000,
148            fold=not t.summer_time)
149
150    else:
151        raise ValueError('unsupported time size')
152
153    return local_dt.astimezone(tz=datetime.timezone.utc)

Convert Time to datetime.datetime