# -*- coding: utf-8 -*-
#
# Copyright (c) 2021 Ian Ottoway <ian@ottoway.dev>
# Copyright (c) 2014 Agostino Ruscito <ruscito@gmail.com>
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
#
import ipaddress
from io import BytesIO
from typing import Any, Type, Dict, Tuple, Union, Set
from .cip import (
DataType,
DerivedDataType,
Struct,
UINT,
USINT,
DWORD,
UDINT,
SHORT_STRING,
n_bytes,
StructType,
StringDataType,
PRODUCT_TYPES,
VENDORS,
INT,
ULINT,
)
from .cip.data_types import _StructReprMeta
__all__ = [
"IPAddress",
"ModuleIdentityObject",
"ListIdentityObject",
"StructTemplateAttributes",
"FixedSizeString",
"Revision",
"StructTag",
]
[docs]def FixedSizeString(size_: int, len_type_: Union[DataType, Type[DataType]] = UDINT):
"""
Creates a custom string tag type
"""
class FixedSizeString(StringDataType):
size = size_
len_type = len_type_
@classmethod
def _encode(cls, value: str, *args, **kwargs) -> bytes:
return (
cls.len_type.encode(len(value))
+ value.encode(cls.encoding)
+ b"\x00" * (cls.size - len(value))
)
@classmethod
def _decode(cls, stream: BytesIO) -> str:
_len = cls.len_type.decode(stream)
_data = cls._stream_read(stream, cls.size)[:_len]
return _data.decode(cls.encoding)
return FixedSizeString
[docs]class IPAddress(DerivedDataType):
@classmethod
def _encode(cls, value: str) -> bytes:
return ipaddress.IPv4Address(value).packed
@classmethod
def _decode(cls, stream: BytesIO) -> Any:
return ipaddress.IPv4Address(cls._stream_read(stream, 4)).exploded
[docs]class Revision(Struct(USINT("major"), USINT("minor"))):
...
[docs]class ModuleIdentityObject(
Struct(
UINT("vendor"),
UINT("product_type"),
UINT("product_code"),
Revision("revision"),
n_bytes(2, "status"),
UDINT("serial"),
SHORT_STRING("product_name"),
)
):
@classmethod
def _decode(cls, stream: BytesIO):
values = super(ModuleIdentityObject, cls)._decode(stream)
values["product_type"] = PRODUCT_TYPES.get(values["product_type"], "UNKNOWN")
values["vendor"] = VENDORS.get(values["vendor"], "UNKNOWN")
values["serial"] = f"{values['serial']:08x}"
return values
@classmethod
def _encode(cls, values: Dict[str, Any]):
values = values.copy()
values["product_type"] = PRODUCT_TYPES[values["product_type"]]
values["vendor"] = VENDORS[values["vendor"]]
values["serial"] = int.from_bytes(bytes.fromhex(values["serial"]), "big")
return super(ModuleIdentityObject, cls)._encode(values)
[docs]class ListIdentityObject(
Struct(
UINT,
UINT,
UINT("encap_protocol_version"),
INT,
UINT,
IPAddress("ip_address"),
ULINT,
UINT("vendor"),
UINT("product_type"),
UINT("product_code"),
Revision("revision"),
n_bytes(2, "status"),
UDINT("serial"),
SHORT_STRING("product_name"),
USINT("state"),
)
):
@classmethod
def _decode(cls, stream: BytesIO):
values = super(ListIdentityObject, cls)._decode(stream)
values["product_type"] = PRODUCT_TYPES.get(values["product_type"], "UNKNOWN")
values["vendor"] = VENDORS.get(values["vendor"], "UNKNOWN")
values["serial"] = f"{values['serial']:08x}"
return values
StructTemplateAttributes = Struct(
UINT("count"),
Struct(UINT("attr_num"), UINT("status"), UDINT("size"))(name="object_definition_size"),
Struct(UINT("attr_num"), UINT("status"), UDINT("size"))(name="structure_size"),
Struct(UINT("attr_num"), UINT("status"), UINT("count"))(name="member_count"),
Struct(UINT("attr_num"), UINT("status"), UINT("handle"))(name="structure_handle"),
)
class _StructTagReprMeta(_StructReprMeta):
def __repr__(cls):
members = ", ".join(repr(m) for m in cls.members)
return f"{cls.__name__}({members}, bool_members={cls.bits!r}, struct_size={cls.size!r})" # TODO
def StructTag(
# (datatype, offset) of each member of the struct, does not include bit members aliased to other members
*members: Tuple[DataType, int],
bit_members: Dict[str, Tuple[int, int]], # {member name, (offset, bit #) }
private_members: Set[str], # private members that should not be in the final value
struct_size: int,
) -> Type[StructType]:
_members = [x[0] for x in members]
_offsets_ = {member: offset for (member, offset) in members}
_struct = Struct(*_members)
class StructTag(_struct, metaclass=_StructTagReprMeta):
bits = bit_members
private = private_members
size = struct_size
_offsets = _offsets_
@classmethod
def _decode(cls, stream: BytesIO):
stream = BytesIO(stream.read(cls.size))
raw = stream.getvalue()
values = {}
for member in cls.members:
offset = cls._offsets[member]
if stream.tell() < offset:
stream.read(offset - stream.tell())
values[member.name] = member.decode(stream)
for bit_member, (offset, bit) in cls.bits.items():
bit_value = bool(raw[offset] & (1 << bit))
values[bit_member] = bit_value
return {k: v for k, v in values.items() if k not in cls.private}
@classmethod
def _encode(cls, values: Dict[str, Any]):
# make a copy so that private host members aren't added to the original
values = {k: v for k, v in values.items()}
value = bytearray(cls.size)
for member in cls.members:
if member.name in cls.private:
continue
offset = cls._offsets[member]
encoded = member.encode(values[member.name])
value[offset : offset + len(encoded)] = encoded
for bit_member, (offset, bit) in cls.bits.items():
val = values[bit_member]
if val:
value[offset] |= 1 << bit
else:
value[offset] &= ~(1 << bit)
return value
return StructTag