Source code for pycomm3.cip.data_types

# -*- coding: utf-8 -*-
#
# Copyright (c) 2021 Ian Ottoway <ian@ottoway.dev>
# Copyright (c) 2014 Agostino Ruscito <ruscito@gmail.com>
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
#

import reprlib
import ipaddress
from io import BytesIO
from itertools import chain
from struct import pack, unpack
from typing import Any, Sequence, Optional, Tuple, Dict, Union, List, Type

from ..exceptions import DataError, BufferEmptyError
from ..map import EnumMap

_BufferType = Union[BytesIO, bytes]


__all__ = [
    "DataType",
    "ElementaryDataType",
    "BOOL",
    "SINT",
    "INT",
    "DINT",
    "LINT",
    "USINT",
    "UINT",
    "UDINT",
    "ULINT",
    "REAL",
    "LREAL",
    "STIME",
    "DATE",
    "TIME_OF_DAY",
    "DATE_AND_TIME",
    "StringDataType",
    "LOGIX_STRING",
    "STRING",
    "BytesDataType",
    "n_bytes",
    "BitArrayType",
    "BYTE",
    "WORD",
    "DWORD",
    "LWORD",
    "STRING2",
    "FTIME",
    "LTIME",
    "ITIME",
    "STRINGN",
    "SHORT_STRING",
    "TIME",
    "EPATH",
    "PACKED_EPATH",
    "PADDED_EPATH",
    "ENGUNIT",
    "STRINGI",
    "DerivedDataType",
    "ArrayType",
    "Array",
    "StructType",
    "Struct",
    "CIPSegment",
    "PortSegment",
    "LogicalSegment",
    "NetworkSegment",
    "SymbolicSegment",
    "DataSegment",
    "ConstructedDataTypeSegment",
    "ElementaryDataTypeSegment",
    "DataTypes",
]


def _repr(buffer: _BufferType) -> str:
    if isinstance(buffer, BytesIO):
        return repr(buffer.getvalue())
    else:
        return repr(buffer)


def _get_bytes(buffer: _BufferType, length: int) -> bytes:
    if isinstance(buffer, bytes):
        return buffer[:length]

    return buffer.read(length)


def _as_stream(buffer: _BufferType):
    if isinstance(buffer, bytes):
        return BytesIO(buffer)
    return buffer


class _DataTypeMeta(type):
    def __repr__(cls):
        return cls.__name__

    def __getitem__(cls, item):
        return Array(item, cls)


[docs]class DataType(metaclass=_DataTypeMeta): """ Base class to represent a CIP data type. Instances of a type are only used when defining the members of a structure. Each type class provides ``encode`` / ``decode`` class methods. If overriding them, they must catch any unhandled exception and raise a ``DataError`` from it. For ``decode``, ``BufferEmptyError`` should be reraised immediately without modification. The buffer empty error is needed for decoding arrays of unknown length. Typically for custom types, overriding the private ``_encode``/``_decode`` methods are sufficient. The private methods do not need to do any exception handling if using the base public methods. For ``_decode`` use the private ``_stream_read`` method instead of ``stream.read``, so that ``BufferEmptyError`` exceptions are raised appropriately. """ name = None # class attr so class can be used in a struct w/o making an instance def __init__(self, name: Optional[str] = None): self.name = name
[docs] @classmethod def encode(cls, value: Any) -> bytes: """ Serializes a Python object ``value`` to ``bytes``. .. note:: Any subclass overriding this method must catch any exception and re-raise a ``DataError`` """ try: return cls._encode(value) except Exception as err: raise DataError(f"Error packing {value!r} as {cls.__name__}") from err
@classmethod def _encode(cls, value: Any) -> bytes: ...
[docs] @classmethod def decode(cls, buffer: _BufferType) -> Any: """ Deserializes a Python object from the ``buffer`` of ``bytes`` .. note:: Any subclass overriding this method must catch any exception and re-raise as a ``DataError``. Except ``BufferEmptyErrors`` they must be re-raised as such, array decoding relies on this. """ try: stream = _as_stream(buffer) return cls._decode(stream) except Exception as err: if isinstance(err, BufferEmptyError): raise else: raise DataError( f"Error unpacking {_repr(buffer)} as {cls.__name__}" ) from err
@classmethod def _decode(cls, stream: BytesIO) -> Any: ... @classmethod def _stream_read(cls, stream: BytesIO, size: int): """ Reads `size` bytes from `stream`. Raises `BufferEmptyError` if stream returns no data. """ data = stream.read(size) if not data: raise BufferEmptyError() return data def __repr__(self) -> str: return f"{self.__class__.__name__}(name={self.name!r})" __str__ = __repr__
[docs]class ElementaryDataType(DataType): """ Type that represents a single primitive value in CIP. """ code: int = 0x00 #: CIP data type identifier size: int = 0 #: size of type in bytes _format: str = "" @classmethod def _encode(cls, value: Any) -> bytes: return pack(cls._format, value) @classmethod def _decode(cls, stream: BytesIO) -> Any: data = cls._stream_read(stream, cls.size) return unpack(cls._format, data)[0]
[docs]class BOOL(ElementaryDataType): """ A boolean value, decodes ``0x00`` and ``False`` and ``True`` otherwise. ``True`` encoded as ``0xFF`` and ``False`` as ``0x00`` """ code = 0xC1 #: 0xC1 size = 1 @classmethod def _encode(cls, value: Any) -> bytes: return b"\xFF" if value else b"\x00" @classmethod def _decode(cls, stream: BytesIO) -> bool: data = cls._stream_read(stream, cls.size) return data != b"\x00"
[docs]class SINT(ElementaryDataType): """ Signed 8-bit integer """ code = 0xC2 #: 0xC2 size = 1 _format = "<b"
[docs]class INT(ElementaryDataType): """ Signed 16-bit integer """ code = 0xC3 #: 0xC3 size = 2 _format = "<h"
[docs]class DINT(ElementaryDataType): """ Signed 32-bit integer """ code = 0xC4 #: 0xC4 size = 4 _format = "<i"
[docs]class LINT(ElementaryDataType): """ Signed 64-bit integer """ code = 0xC5 #: 0xC5 size = 8 _format = "<q"
[docs]class USINT(ElementaryDataType): """ Unsigned 8-bit integer """ code = 0xC6 #: 0xC6 size = 1 _format = "<B"
[docs]class UINT(ElementaryDataType): """ Unsigned 16-bit integer """ code = 0xC7 #: 0xC7 size = 2 _format = "<H"
[docs]class UDINT(ElementaryDataType): """ Unsigned 32-bit integer """ code = 0xC8 #: 0xC8 size = 4 _format = "<I"
[docs]class ULINT(ElementaryDataType): """ Unsigned 64-bit integer """ code = 0xC9 #: 0xC9 size = 8 _format = "<Q"
[docs]class REAL(ElementaryDataType): """ 32-bit floating point """ code = 0xCA #: 0xCA size = 4 _format = "<f"
[docs]class LREAL(ElementaryDataType): """ 64-bit floating point """ code = 0xCB #: 0xCB size = 8 _format = "<d"
[docs]class STIME(DINT): """ Synchronous time information """ code = 0xCC #: 0xCC
[docs]class DATE(UINT): """ Date information """ code = 0xCD #: 0xCD
[docs]class TIME_OF_DAY(UDINT): """ Time of day """ code = 0xCE #: 0xCE
[docs]class DATE_AND_TIME(ElementaryDataType): """ Date and time of day """ code = 0xCF #: 0xCF size = 8
[docs] @classmethod def encode(cls, time: int, date: int, *args, **kwargs) -> bytes: try: return UDINT.encode(time) + UINT.encode(date) except Exception as err: raise DataError(f"Error packing {time!r} as {cls.__name__}") from err
@classmethod def _decode(cls, stream: BytesIO) -> Tuple[int, int]: return UDINT.decode(stream), UINT.decode(stream)
[docs]class StringDataType(ElementaryDataType): """ Base class for any string type """ len_type = None #: data type of the string length encoding = "iso-8859-1" #: encoding of string data @classmethod def _encode(cls, value: str, *args, **kwargs) -> bytes: return cls.len_type.encode(len(value)) + value.encode(cls.encoding) @classmethod def _decode(cls, stream: BytesIO) -> str: str_len = cls.len_type.decode(stream) if str_len == 0: return "" str_data = cls._stream_read(stream, str_len) return str_data.decode(cls.encoding)
[docs]class LOGIX_STRING(StringDataType): """ Character string, 1-byte per character, 4-byte length """ len_type = UDINT
[docs]class STRING(StringDataType): """ Character string, 1-byte per character, 2-byte length """ code = 0xD0 #: 0xD0 len_type = UINT
[docs]class BytesDataType(ElementaryDataType): """ Base type for placeholder bytes. """ @classmethod def _encode(cls, value: bytes, *args, **kwargs) -> bytes: return value[: cls.size] if cls.size != -1 else value[:] @classmethod def _decode(cls, stream: BytesIO) -> bytes: data = cls._stream_read(stream, cls.size) return data
[docs]def n_bytes(count: int, name: str = ""): """ Create an instance of a byte string of ``count`` length. Setting ``count`` to ``-1`` will consume the entire remaining buffer. """ class BYTES(BytesDataType): size = count return BYTES(name)
[docs]class BitArrayType(ElementaryDataType): """ Array of bits (Python bools) for ``host_type`` integer value """ host_type = None @classmethod def _decode(cls, stream: BytesIO) -> Any: val = cls.host_type.decode(stream) bits = [x == "1" for x in bin(val)[2:]] bools = [False for _ in range((cls.size * 8) - len(bits))] + bits bools.reverse() return bools @classmethod def _encode(cls, value: Any) -> bytes: if len(value) != (8 * cls.size): raise DataError(f"boolean arrays must be multiple of 8: not {len(value)}") _value = 0 for i, val in enumerate(value): if val: _value |= 1 << i return cls.host_type._encode(_value)
[docs]class BYTE(BitArrayType): """ bit string - 8-bits """ code = 0xD1 #: 0xD1 size = 1 host_type = USINT
[docs]class WORD(BitArrayType): """ bit string - 16-bits """ code = 0xD2 #: 0xD2 size = 2 host_type = UINT
[docs]class DWORD(BitArrayType): """ bit string - 32-bits """ code = 0xD3 #: 0xD3 size = 4 host_type = UDINT
[docs]class LWORD(BitArrayType): """ bit string - 64-bits """ code = 0xD4 #: 0xD4 size = 8 host_type = ULINT
[docs]class STRING2(StringDataType): """ character string, 2-bytes per character """ code = 0xD5 #: 0xD5 len_type = UINT encoding = "utf-16-le"
[docs]class FTIME(DINT): """ duration - high resolution """ code = 0xD6 #: 0xD6
[docs]class LTIME(LINT): """ duration - long """ code = 0xD7 #: 0xD7
[docs]class ITIME(INT): """ duration - short """ code = 0xD8 #: 0xD8
[docs]class STRINGN(StringDataType): """ character string, n-bytes per character """ code = 0xD9 #: 0xD9 ENCODINGS = {1: "utf-8", 2: "utf-16-le", 4: "utf-32-le"}
[docs] @classmethod def encode(cls, value: str, char_size: int = 1) -> bytes: try: encoding = cls.ENCODINGS[char_size] return ( UINT.encode(char_size) + UINT.encode(len(value)) + value.encode(encoding) ) except Exception as err: raise DataError( f"Error encoding {value!r} as STRINGN using char. size {char_size}" ) from err
@classmethod def _decode(cls, stream: BytesIO) -> Any: char_size = UINT.decode(stream) char_count = UINT.decode(stream) try: encoding = cls.ENCODINGS[char_size] except KeyError as err: raise DataError(f"Unsupported character size: {char_size}") from err else: data = cls._stream_read(stream, char_count * char_size) return data.decode(encoding)
[docs]class SHORT_STRING(StringDataType): """ character string, 1-byte per character, 1-byte length """ code = 0xDA #: 0xDA len_type = USINT
[docs]class TIME(DINT): """ duration - milliseconds """ code = 0xDB #: 0xDB
[docs]class EPATH(ElementaryDataType): """ CIP path segments """ code = 0xDC #: 0xDC padded = False
[docs] @classmethod def encode( cls, segments: Sequence[Union["CIPSegment", bytes]], length: bool = False, pad_length: bool = False, ) -> bytes: try: path = b"".join( segment if isinstance(segment, bytes) else segment.encode(segment, padded=cls.padded) for segment in segments ) if length: _len = USINT.encode(len(path) // 2) if pad_length: _len += b"\x00" path = _len + path return path except Exception as err: raise DataError( f"Error packing {reprlib.repr(segments)} as {cls.__name__}" ) from err
[docs] @classmethod def decode(cls, buffer: _BufferType) -> Sequence["CIPSegment"]: raise NotImplementedError("Decoding EPATHs not supported")
[docs]class PADDED_EPATH(EPATH): padded = True
[docs]class PACKED_EPATH(EPATH): padded = False
[docs]class ENGUNIT(WORD): """ engineering units """ code = 0xDD #: 0xDD
# TODO: create lookup table of defined eng. units
[docs]class STRINGI(StringDataType): """ international character string """ code = 0xDE #: 0xDE STRING_TYPES = { STRING.code: STRING, STRING2.code: STRING2, STRINGN.code: STRINGN, SHORT_STRING.code: SHORT_STRING, } LANGUAGE_CODES = { "english": "eng", "french": "fra", "spanish": "spa", "italian": "ita", "german": "deu", "japanese": "jpn", "portuguese": "por", "chinese": "zho", "russian": "rus", } CHARACTER_SETS = { "iso-8859-1": 4, "iso-8859-2": 5, "iso-8859-3": 6, "iso-8859-4": 7, "iso-8859-5": 8, "iso-8859-6": 9, "iso-8859-7": 10, "iso-8859-8": 11, "iso-8859-9": 12, "utf-16-le": 1000, "utf-32-le": 1001, }
[docs] @classmethod def encode(cls, *strings: Sequence[Tuple[str, StringDataType, str, int]]) -> bytes: """ Encodes ``strings`` to bytes """ try: count = len(strings) data = USINT.encode(count) for (string, str_type, lang, char_set) in strings: _str_type = bytes([str_type.code]) _lang = bytes(lang, "ascii") _char_set = UINT.encode(char_set) _string = str_type.encode(string) data += _lang + _str_type + _char_set + _string return data except Exception as err: raise DataError( f"Error packing {reprlib.repr(strings)} as {cls.__name__}" ) from err
[docs] @classmethod def decode( cls, buffer: _BufferType ) -> Tuple[Sequence[str], Sequence[str], Sequence[int]]: stream = _as_stream(buffer) try: count = USINT.decode(stream) strings = [] langs = [] char_sets = [] for _ in range(count): lang = SHORT_STRING.decode(b"\x03" + stream.read(3)) langs.append(lang) _str_type = cls.STRING_TYPES[stream.read(1)[0]] char_set = UINT.decode(stream) char_sets.append(char_set) string = _str_type.decode(stream) strings.append(string) return strings, langs, char_sets except Exception as err: if isinstance(err, BufferEmptyError): raise else: raise DataError( f"Error unpacking {_repr(buffer)} as {cls.__name__}" ) from err
[docs]class DerivedDataType(DataType): """ Base type for types composed of :class:`ElementaryDataType` """ ...
class _ArrayReprMeta(_DataTypeMeta): def __repr__(cls: "ArrayType"): return f"{cls.element_type}[{cls.length!r}]" __str__ = __repr__
[docs]class ArrayType(DerivedDataType, metaclass=_ArrayReprMeta): """ Base type for an array """ ...
[docs]def Array( length_: Union[USINT, UINT, UDINT, ULINT, int, None], element_type_: Union[DataType, Type[DataType]], ) -> Type[ArrayType]: """ Creates a new array type from ``element_type_`` of ``length_``. ``length_`` can be 3 possible types: - ``int`` - fixed length of the array - ``DataType`` - length read from beginning of buffer as type - ``None`` - unbound array, consumes entire buffer on decode """ class Array(ArrayType): length: Union[USINT, UINT, UDINT, ULINT, int, None] = length_ element_type: Union[DataType, Type[DataType]] = element_type_ @classmethod def encode(cls, values: List[Any], length: Optional[int] = None) -> bytes: _length = length or cls.length if isinstance(_length, int): if len(values) < _length: raise DataError( f"Not enough values to encode array of {cls.element_type}[{_length}]" ) _len = _length else: _len = len(values) try: if issubclass(cls.element_type, BitArrayType): chunk_size = cls.element_type.size * 8 _len = len(values) // chunk_size values = [ values[i : i + chunk_size] for i in range(0, len(values), chunk_size) ] return b"".join(cls.element_type.encode(values[i]) for i in range(_len)) except Exception as err: raise DataError( f"Error packing {reprlib.repr(values)} into {cls.element_type}[{_length}]" ) from err @classmethod def _decode_all(cls, stream): _array = [] while True: try: _array.append(cls.element_type.decode(stream)) except BufferEmptyError: break return _array @classmethod def decode(cls, buffer: _BufferType, length: Optional[int] = None) -> List[str]: _length = length or cls.length try: stream = _as_stream(buffer) if _length is None: return cls._decode_all(stream) if isinstance(_length, DataType): _len = _length.decode(stream) else: _len = _length _val = [cls.element_type.decode(stream) for _ in range(_length)] if issubclass(cls.element_type, BitArrayType): return list(chain.from_iterable(_val)) return _val except Exception as err: if isinstance(err, BufferEmptyError): raise else: raise DataError( f"Error unpacking into {cls.element_type}[{_length}] from {_repr(buffer)}" ) from err def __repr__(self) -> str: return f"{repr(self.__class__)}(name={self.name!r})" return Array
class _StructReprMeta(_DataTypeMeta): def __repr__(cls): return f'{cls.__name__}({", ".join(repr(m) for m in cls.members)})'
[docs]class StructType(DerivedDataType, metaclass=_StructReprMeta): """ Base type for a structure """ ...
[docs]def Struct(*members_: Union[DataType, Type[DataType]]) -> Type[StructType]: """ Creates a new structure type comprised of ``members_``. Members can be instances of a ``DataType`` with a ``name``. The decoded value of a struct will a dictionary of ``{member.name: value}``, members without names will be excluded from the return value. To encode a struct, the value should be a dict of ``{member.name: value}`` or a sequence of just values (nesting as needed). Avoid multiple no-name members if planning on encoding the struct using a dict. """ class Struct(StructType): members: Tuple[Union[DataType, Type[DataType]]] = members_ @classmethod def _encode(cls, values: Union[Dict[str, Any], Sequence[Any]]) -> bytes: if isinstance(values, dict): return b"".join(typ.encode(values[typ.name]) for typ in cls.members) else: return b"".join( typ.encode(val) for typ, val in zip(cls.members, values) ) @classmethod def _decode(cls, stream: BytesIO) -> Any: values = {typ.name: typ.decode(stream) for typ in cls.members} # filter any members w/o a name values.pop("", None) values.pop(None, None) return values return Struct
[docs]class CIPSegment(DataType): """ Base type for a CIP path segment +----+----+----+---+---+---+---+---+ | Segment Type | Segment Format | +====+====+====+===+===+===+===+===+ | 7 | 6 | 5 | 4 | 3 | 2 | 1 | 0 | +----+----+----+---+---+---+---+---+ """ segment_type = 0b_000_00000
[docs] @classmethod def encode(cls, segment: "CIPSegment", padded: bool = False) -> bytes: """ Encodes an instance of a ``CIPSegment`` to bytes """ try: return cls._encode(segment, padded) except Exception as err: raise DataError( f"Error packing {reprlib.repr(segment)} as {cls.__name__}" ) from err
[docs] @classmethod def decode(cls, buffer: _BufferType) -> Any: """ .. attention:: Not Implemented """ raise NotImplementedError("Decoding of CIP Segments not supported")
[docs]class PortSegment(CIPSegment): """ Port segment of a CIP path. +----+----+----+--------------------+----+----+----+----+ | Segment Type | Extended Link Addr | Port Identifier | +====+====+====+====================+====+====+====+====+ | 7 | 6 | 5 | 4 | 3 | 2 | 1 | 0 | +----+----+----+--------------------+----+----+----+----+ """ segment_type = 0b_000_0_0000 extended_link = 0b_000_1_0000 #: available port names for use in a CIP path port_segments = { "backplane": 0b_000_0_0001, "bp": 0b_000_0_0001, "enet": 0b_000_0_0010, "dhrio-a": 0b_000_0_0010, "dhrio-b": 0b_000_0_0011, "dnet": 0b_000_0_0010, "cnet": 0b_000_0_0010, "dh485-a": 0b_000_0_0010, "dh485-b": 0b_000_0_0011, } def __init__( self, port: Union[int, str], link_address: Union[int, str, bytes], name: str = "", ): super().__init__(name) self.port = port self.link_address = link_address @classmethod def _encode(cls, segment: "PortSegment", padded: bool = False) -> bytes: if isinstance(segment.port, str): port = cls.port_segments[segment.port] else: port = segment.port if isinstance(segment.link_address, str): if segment.link_address.isnumeric(): link = USINT.encode(int(segment.link_address)) else: ipaddress.ip_address(segment.link_address) link = segment.link_address.encode() elif isinstance(segment.link_address, int): link = USINT.encode(segment.link_address) else: link = segment.link_address if len(link) > 1: port |= cls.extended_link _len = USINT.encode(len(link)) else: _len = b"" _segment = USINT.encode(port) + _len + link if len(_segment) % 2: _segment += b"\x00" return _segment def __eq__(self, other): return self.encode(self) == self.encode(other) def __repr__(self): return f"{self.__class__.__name__}(port={self.port!r}, link_address={self.link_address!r})"
[docs]class LogicalSegment(CIPSegment): """ Logical segment of a CIP path +----+----+----+----+----+----+-------+--------+ | Segment Type | Logical Type | Logical Format | +====+====+====+====+====+====+=======+========+ | 7 | 6 | 5 | 4 | 3 | 2 | 1 | 0 | +----+----+----+----+----+----+-------+--------+ """ segment_type = 0b_001_00000 #: available logical types logical_types = { "class_id": 0b_000_000_00, "instance_id": 0b_000_001_00, "member_id": 0b_000_010_00, "connection_point": 0b_000_011_00, "attribute_id": 0b_000_100_00, "special": 0b_000_101_00, "service_id": 0b_000_110_00, } logical_format = { 1: 0b_000_000_00, # 8-bit 2: 0b_000_000_01, # 16-bit 4: 0b_000_000_11, # 32-bit } # 32-bit only valid for Instance ID and Connection Point types def __init__( self, logical_value: Union[int, bytes], logical_type: str, *args, **kwargs ): super().__init__(*args, **kwargs) self.logical_value = logical_value self.logical_type = logical_type @classmethod def _encode(cls, segment: "LogicalSegment", padded: bool = False) -> bytes: _type = cls.logical_types.get(segment.logical_type) _value = segment.logical_value if _type is None: raise DataError("Invalid logical type") if isinstance(_value, int): if _value <= 0xFF: _value = USINT.encode(_value) elif _value <= 0xFFFF: _value = UINT.encode(_value) elif _value <= 0xFFFF_FFFF: _value = UDINT.encode(_value) else: raise DataError(f"Invalid segment value: {segment!r}") _fmt = cls.logical_format.get(len(_value)) if _fmt is None: raise DataError(f"Segment value not valid for segment type") _segment = bytes([cls.segment_type | _type | _fmt]) if padded and (len(_segment) + len(_value)) % 2: _segment += b"\x00" return _segment + _value
[docs]class NetworkSegment(CIPSegment): segment_type = 0b_010_00000
[docs]class SymbolicSegment(CIPSegment): segment_type = 0b_011_00000
[docs]class DataSegment(CIPSegment): """ +----+----+----+---+---+---+---+---+ | Segment Type | Segment Sub-Type | +====+====+====+===+===+===+===+===+ | 7 | 6 | 5 | 4 | 3 | 2 | 1 | 0 | +----+----+----+---+---+---+---+---+ """ segment_type = 0b_100_00000 extended_symbol = 0b_000_10001 def __init__(self, data: Union[str, bytes], name: str = ""): super().__init__(name) self.data = data @classmethod def _encode(cls, segment: "DataSegment", padded: bool = False) -> bytes: _segment = cls.segment_type if not isinstance(segment.data, str): return ( USINT.encode(_segment) + USINT.encode(len(segment.data)) + segment.data ) _segment |= cls.extended_symbol _data = segment.data.encode() _len = len(_data) if _len % 2: _data += b"\x00" return USINT.encode(_segment) + USINT.encode(_len) + _data
[docs]class ConstructedDataTypeSegment(CIPSegment): segment_type = 0b_101_00000
[docs]class ElementaryDataTypeSegment(CIPSegment): segment_type = 0b_110_00000
def _by_type_code(typ: ElementaryDataType): return typ.code
[docs]class DataTypes(EnumMap): """ Lookup table/map of elementary data types. Reverse lookup is by CIP code for data type. """ _return_caps_only_ = True _value_key_ = _by_type_code bool = BOOL sint = SINT int = INT dint = DINT lint = LINT usint = USINT uint = UINT udint = UDINT ulint = ULINT real = REAL lreal = LREAL stime = STIME date = DATE time_of_day = TIME_OF_DAY date_and_time = DATE_AND_TIME logix_string = LOGIX_STRING string = STRING byte = BYTE word = WORD dword = DWORD lword = LWORD string2 = STRING2 ftime = FTIME ltime = LTIME itime = ITIME stringn = STRINGN short_string = SHORT_STRING time = TIME padded_epath = PADDED_EPATH packed_epath = PACKED_EPATH engunit = ENGUNIT stringi = STRINGI @classmethod def get_type(cls, type_code): return cls.get(cls.get(type_code))