# -*- coding: utf-8 -*-
#
# Copyright (c) 2021 Ian Ottoway <ian@ottoway.dev>
# Copyright (c) 2014 Agostino Ruscito <ruscito@gmail.com>
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
#
__all__ = [
"SLCDriver",
]
import logging
import re
from typing import List, Tuple, Optional, Union
from .cip_driver import CIPDriver, with_forward_open
from .cip import (
PCCC_CT,
PCCC_DATA_TYPE,
PCCC_DATA_SIZE,
PCCC_ERROR_CODE,
USINT,
UINT,
PCCCDataTypes,
)
from .const import (
SUCCESS,
SLC_CMD_CODE,
SLC_FNC_READ,
SLC_FNC_WRITE,
SLC_REPLY_START,
PCCC_PATH,
)
from .exceptions import ResponseError, RequestError
from .tag import Tag
from .packets import SendUnitDataRequestPacket
AtomicValueType = Union[int, float, bool]
TagValueType = Union[AtomicValueType, List[Union[AtomicValueType, str]]]
ReadWriteReturnType = Union[Tag, List[Tag]]
IO_RE = re.compile(
r"(?P<file_type>[IO])(?P<file_number>\d{1,3})?"
r"(:)(?P<element_number>\d{1,3})"
r"((\.)(?P<position_number>\d{1,3}))?"
r"(/(?P<sub_element>\d{1,2}))?"
r"(?P<_elem_cnt_token>{(?P<element_count>\d+)})?",
flags=re.IGNORECASE,
)
CT_RE = re.compile(
r"(?P<file_type>[CT])(?P<file_number>\d{1,3})"
r"(:)(?P<element_number>\d{1,3})"
r"(.)(?P<sub_element>ACC|PRE|EN|DN|TT|CU|CD|DN|OV|UN|UA)",
flags=re.IGNORECASE,
)
LFBN_RE = re.compile(
r"(?P<file_type>[LFBN])(?P<file_number>\d{1,3})"
r"(:)(?P<element_number>\d{1,3})"
r"(/(?P<sub_element>\d{1,2}))?"
r"(?P<_elem_cnt_token>{(?P<element_count>\d+)})?",
flags=re.IGNORECASE,
)
S_RE = re.compile(
r"(?P<file_type>S)"
r"(:)(?P<element_number>\d{1,3})"
r"(/(?P<sub_element>\d{1,2}))?"
r"(?P<_elem_cnt_token>{(?P<element_count>\d+)})?",
flags=re.IGNORECASE,
)
A_RE = re.compile(
r"(?P<file_type>A)(?P<file_number>\d{1,3})"
r"(:)(?P<element_number>\d{1,4})"
r"(?P<_elem_cnt_token>{(?P<element_count>\d+)})?",
flags=re.IGNORECASE,
)
B_RE = re.compile(
r"(?P<file_type>B)(?P<file_number>\d{1,3})"
r"(/)(?P<element_number>\d{1,4})"
r"(?P<_elem_cnt_token>{(?P<element_count>\d+)})?",
flags=re.IGNORECASE,
)
ST_RE = re.compile(
r"(?P<file_type>ST)(?P<file_number>\d{1,3})"
r"(:)(?P<element_number>\d{1,4})"
r"(?P<_elem_cnt_token>{(?P<element_count>[12])})?",
flags=re.IGNORECASE,
)
[docs]class SLCDriver(CIPDriver):
"""
An Ethernet/IP Client driver for reading and writing of data files in SLC or MicroLogix PLCs.
"""
__log = logging.getLogger(f"{__module__}.{__qualname__}")
_auto_slot_cip_path = True
def __init__(self, path, *args, **kwargs):
super().__init__(path, *args, large_packets=False, **kwargs)
def _msg_start(self):
"""
No idea what this part is, but it starts all messages
"""
return b"".join(
(
b"\x4b",
b"\x02",
b"\x20", # 8-bit class
PCCC_PATH, # b"\x67\x24\x01"
b"\x07",
self._cfg["vid"], #"vid": b"\x09\x10",
self._cfg["vsn"], #"vsn": b"\x09\x10\x19\x71",
)
)
[docs] @with_forward_open
def read(self, *addresses: str) -> ReadWriteReturnType:
"""
Reads data file addresses. To read multiple words add the word count to the address using curly braces,
e.g. ``N120:10{10}``.
Does not track request/response size like the CLXDriver.
:param addresses: one or many data file addresses to read
:return: a single or list of ``Tag`` objects
"""
results = [self._read_tag(tag) for tag in addresses]
if len(results) == 1:
return results[0]
return results
def _read_tag(self, tag) -> Tag:
_tag = parse_tag(tag)
if _tag is None:
raise RequestError(f"Error parsing the tag passed to read() - {tag}")
message_request = [
self._msg_start(),
# page 83 of eip manual
SLC_CMD_CODE, # request command code
b"\x00", # status code
UINT.encode(next(self._sequence)), # transaction identifier
SLC_FNC_READ, # function code
USINT.encode(PCCC_DATA_SIZE[_tag["file_type"]] * _tag["element_count"]), # byte size
USINT.encode(int(_tag["file_number"])),
PCCC_DATA_TYPE[_tag["file_type"]],
USINT.encode(int(_tag["element_number"])),
USINT.encode(int(_tag.get("pos_number", 0))), # sub-element number
]
request = SendUnitDataRequestPacket(self._sequence)
request.add(b"".join(message_request))
response = self.send(request)
self.__log.debug(f"SLC read_tag({tag})")
status = request_status(response.raw)
if status is not None:
return Tag(_tag["tag"], None, _tag["file_type"], status)
try:
return _parse_read_reply(_tag, response.raw[SLC_REPLY_START:])
except ResponseError as err:
self.__log.exception(f'Failed to parse read reply for {_tag["tag"]}')
return Tag(_tag["tag"], None, _tag["file_type"], str(err))
[docs] @with_forward_open
def write(self, *address_values: Tuple[str, TagValueType]) -> ReadWriteReturnType:
"""
Write values to data file addresses. To write to multiple words in a file use curly braces in the address
to indicate the number of words, then set the value to a list of values to write e.g. ``('N120:10{10}', [1, 2, ...])``.
Does not track request/response size like the CLXDriver.
:param address_values: one or many 2-element tuples of (address, value)
:return: a single or list of ``Tag`` objects
"""
results = [self._write_tag(tag, value) for tag, value in address_values]
if len(results) == 1:
return results[0]
return results
def _write_tag(self, tag: str, value: TagValueType) -> Tag:
"""write tag from a connected plc
Possible combination can be passed to this method:
c.write_tag('N7:0', [-30, 32767, -32767])
c.write_tag('N7:0', 21)
c.read_tag('N7:0', 10)
It is not possible to write status bit
:return: None is returned in case of error
"""
_tag = parse_tag(tag)
if _tag is None:
raise RequestError(f"Error parsing the tag passed to write() - {tag}")
_tag["data_size"] = PCCC_DATA_SIZE[_tag["file_type"]]
message_request = [
self._msg_start(),
SLC_CMD_CODE,
b"\x00",
UINT.encode(next(self._sequence)),
SLC_FNC_WRITE,
USINT.encode(_tag["data_size"] * _tag["element_count"]),
USINT.encode(int(_tag["file_number"])),
PCCC_DATA_TYPE[_tag["file_type"]],
USINT.encode(int(_tag["element_number"])),
USINT.encode(int(_tag.get("pos_number", 0))),
writeable_value(_tag, value),
]
request = SendUnitDataRequestPacket(self._sequence)
request.add(b"".join(message_request))
response = self.send(request)
status = request_status(response.raw)
if status is not None:
return Tag(_tag["tag"], None, _tag["file_type"], status)
return Tag(_tag["tag"], value, _tag["file_type"], None)
@with_forward_open
def get_processor_type(self):
msg_request = (
self._msg_start(),
# diagnostic status - CMD 06, FNC 03 - pg93 DF1 manual (1770-rm516)
b"\x06", # CMD
b"\x00", # status code
UINT.encode(next(self._sequence)), # transaction identifier
b"\x03", # FNC
)
request = SendUnitDataRequestPacket(self._sequence)
request.add(b"".join(msg_request))
response = self.send(request)
if response:
try:
typ = response.raw[SLC_REPLY_START:][5:16].decode("utf-8").strip()
except Exception as err:
self.__log.exception(f"failed getting processor type: {err}")
typ = None
finally:
return typ
else:
self.__log.error(
f"failed to get processor type: {request_status(response.raw)}",
)
return None
@with_forward_open
def get_datalog_queue(self, num_data_logs, queue_num):
data = []
for i in range(num_data_logs):
data.append(self._get_datalog(queue_num))
#extra read to clear the queue
#will thow error in _get_datalog due to Status == None
trash = self._get_datalog(queue_num)
if data is not None:
return data
else:
raise ResponseError("No Data in Queue")
raise ResponseError("Failed to read processor type")
def _get_datalog(self, queue_num):
msg_request = [
b"\x4b", # Ethernet/IP Service Code
b"\x02", # Request Path Size, 2 words
b"\x20", # Request Path, Path Segment (8-bit Class)
b"\x67", # Request Path, Path Segment, Class (PCCC Class)
b"\x24", # Request Path, Path Segment (8 Bit Instance)
b"\x01", # Request Path, Path Segment, Instance
b"\x07", # Requestor ID, Length
b"\x4d\x00", # Requestor ID, CIP Vendor ID
b"\xa1\x4e\xc3\x30",# Requestor ID, CIP Serial Number
b"\x0f", # PCCC Command Data, CMD code
b"\x00", # PCCC Command Data, Status Code
b"\x30\x00", # PCCC Command Data, Transaction Code
b"\xa2", # PCCC Command Data, Function Code
b"\x6d", # Function Specific Data, Byte Size
b"\x00", # Function Specific Data, File Number
b"\xa5", # Function Specific Data, File Type
USINT.encode(queue_num), # Function Specific Data, Element Number (queue to be read)
b"\x00", # Function Specific Data, Sub-Element Number
]
request = SendUnitDataRequestPacket(self._sequence)
request.add(b"".join(msg_request))
response = self.send(request)
status = request_status(response.raw)
if status is None:
try:
datalog_entry = response.raw[SLC_REPLY_START:]
datalog_entry = datalog_entry.decode("UTF-8")
except Exception as err:
self.__log.exception("Failed to retreive data log")
finally:
return datalog_entry
else:
self.__log.error(
f"Failed to retreive data log",
)
return None
@with_forward_open
def get_file_directory(self):
plc_type = self.get_processor_type()
if plc_type is not None:
sys0_info = _get_sys0_info(plc_type)
# file_type, element = _get_file_and_element_for_plc_type(plc_type)
sys0_info["size"] = self._get_file_directory_size(sys0_info)
if sys0_info["size"] is not None:
data = self._read_whole_file_directory(sys0_info)
return _parse_file0(sys0_info, data)
else:
raise ResponseError("Failed to read file directory size")
else:
raise ResponseError("Failed to read processor type")
def _get_file_directory_size(self, sys0_info):
msg_request = [
self._msg_start(),
SLC_CMD_CODE, # request command code
b"\x00", # status code
UINT.encode(next(self._sequence)), # transaction identifier
b"\xa1", # function code, from RSLinx capture
sys0_info["size_len"], # size
b"\x00", # file number
sys0_info["file_type"],
sys0_info["size_element"],
]
request = SendUnitDataRequestPacket(self._sequence)
request.add(b"".join(msg_request))
response = self.send(request)
status = request_status(response.raw)
if status is None:
try:
size = UINT.decode(response.raw[SLC_REPLY_START:]) - sys0_info.get("size_const", 0)
self.__log.debug(f"SYS 0 file size: {size}")
except Exception as err:
self.__log.exception("failed to parse size of File 0")
size = None
finally:
return size
else:
self.__log.error(
f"failed to read size of File 0: {status}",
)
return None
def _read_whole_file_directory(self, sys0_info):
file0_data = b""
offset = 0
file0_size = sys0_info["size"]
file_type = sys0_info["file_type"]
while len(file0_data) < file0_size:
bytes_remaining = file0_size - len(file0_data)
size = 0x50 if bytes_remaining > 0x50 else bytes_remaining
msg_request = [
self._msg_start(),
# page 83 of eip manual
SLC_CMD_CODE, # request command code
b"\x00", # status code
UINT.encode(next(self._sequence)), # transaction identifier
b"\xa1",
# SLC_FNC_READ, # function code
USINT.encode(size), # size
b"\x00", # file number
file_type,
]
msg_request += (
[USINT.encode(offset)] if offset < 256 else [b"\xFF", UINT.encode(offset)]
)
request = SendUnitDataRequestPacket(self._sequence)
request.add(b"".join(msg_request))
response = self.send(request)
status = request_status(response.raw)
if status is None:
data = response.raw[SLC_REPLY_START:]
offset += len(data) // 2
file0_data += data
else:
msg = f"Error reading File 0 contents: {status}"
self.__log.error(msg)
raise ResponseError(msg)
return file0_data
def _parse_file0(sys0_info, data):
num_data_files = data[52]
num_lad_files = data[46]
print(f"data files: {num_data_files}, logic files: {num_lad_files}")
file_pos = sys0_info["file_position"]
row_size = sys0_info["row_size"]
data_files = {}
file_num = 0
while file_pos < len(data):
file_code = data[file_pos : file_pos + 1]
file_type = PCCC_DATA_TYPE.get(file_code, None)
if file_type:
file_name = f"{file_type}{file_num}"
element_size = PCCC_DATA_SIZE.get(file_type, 2)
file_size = UINT.decode(data[file_pos + 1 :])
data_files[file_name] = {
"elements": file_size // element_size,
"length": file_size,
}
if file_type or file_code == b"\x81": # 0x81 reserved type, for skipped file numbers?
file_num += 1
file_pos += row_size
return data_files
def _get_sys0_info(plc_type):
prefix = plc_type[:4]
if prefix in {
"1761",
}: # MLX1000, SLC 5/02
# FIXME: Not sure if these are correct, never tested
return {
"file_position": 93,
"row_size": 8,
"file_type": b"\x00",
"size_element": b"\x23",
"size_len": b"\x04",
}
elif prefix in {"1763", "1762", "1764"}: # MLX 1100, 1200, 1500
# FIXME: values from 1100 and 1400, not tested on 1200/1500
return {
"file_position": 233,
"row_size": 10,
"file_type": b"\x02",
"size_element": b"\x28",
"size_len": b"\x08",
"size_const": 19968, # no idea why, but this seems like a const added to the size? wtf?
}
elif prefix in {
"1766",
}: # MLX 1400
return {
"file_position": 233,
"row_size": 10,
"file_type": b"\x03",
"size_element": b"\x2b",
"size_len": b"\x08",
"size_const": 19968, # no idea why, but this seems like a const added to the size? wtf?
"file_type_queue": b"\xA5",
}
else: # SLC 5/05
return {
"file_position": 79,
"row_size": 10,
"file_type": b"\x01",
"size_element": b"\x23",
"size_len": b"\x04",
}
def _parse_read_reply(tag, data) -> Tag:
try:
bit_read = tag.get("address_field", 0) == 3
bit_position = int(tag.get("sub_element") or 0)
data_size = PCCC_DATA_SIZE[tag["file_type"]]
unpack_func = PCCCDataTypes[tag["file_type"]].decode
if bit_read:
new_value = 0
if tag["file_type"] in {"T", "C"}:
if bit_position == PCCC_CT["PRE"]:
return Tag(
tag["tag"],
unpack_func(data[new_value + 2 : new_value + 2 + data_size]),
tag["file_type"],
None,
)
elif bit_position == PCCC_CT["ACC"]:
return Tag(
tag["tag"],
unpack_func(data[new_value + 4 : new_value + 4 + data_size]),
tag["file_type"],
None,
)
tag_value = unpack_func(data[new_value : new_value + data_size])
return Tag(tag["tag"], get_bit(tag_value, bit_position), tag["file_type"], None)
else:
values_list = [
unpack_func(data[i : i + data_size]) for i in range(0, len(data), data_size)
]
if len(values_list) > 1:
return Tag(tag["tag"], values_list, tag["file_type"], None)
else:
return Tag(tag["tag"], values_list[0], tag["file_type"], None)
except Exception as err:
raise ResponseError("Failed parsing tag read reply") from err
def parse_tag(tag: str) -> Optional[dict]:
t = CT_RE.search(tag)
if (
t
and (1 <= int(t.group("file_number")) <= 255)
and (0 <= int(t.group("element_number")) <= 255)
):
return {
"file_type": t.group("file_type").upper(),
"file_number": t.group("file_number"),
"element_number": t.group("element_number"),
"sub_element": PCCC_CT[t.group("sub_element").upper()],
"address_field": 3,
"element_count": 1,
"tag": t.group(0),
}
t = LFBN_RE.search(tag)
if t:
_cnt = t.group("_elem_cnt_token")
tag_name = t.group(0).replace(_cnt, "") if _cnt else t.group(0)
if t.group("sub_element") is not None:
if (
(1 <= int(t.group("file_number")) <= 255)
and (0 <= int(t.group("element_number")) <= 255)
and (0 <= int(t.group("sub_element")) <= 15)
):
element_count = t.group("element_count")
return {
"file_type": t.group("file_type").upper(),
"file_number": t.group("file_number"),
"element_number": t.group("element_number"),
"sub_element": t.group("sub_element"),
"address_field": 3,
"element_count": int(element_count) if element_count is not None else 1,
"tag": tag_name,
}
else:
if (1 <= int(t.group("file_number")) <= 255) and (
0 <= int(t.group("element_number")) <= 255
):
element_count = t.group("element_count")
return {
"file_type": t.group("file_type").upper(),
"file_number": t.group("file_number"),
"element_number": t.group("element_number"),
"sub_element": t.group("sub_element"),
"address_field": 2,
"element_count": int(element_count) if element_count is not None else 1,
"tag": tag_name,
}
t = IO_RE.search(tag)
if t:
_cnt = t.group("_elem_cnt_token")
tag_name = t.group(0).replace(_cnt, "") if _cnt else t.group(0)
file_number = "0" if t.group("file_type").upper() == "O" else "1"
position_number = "0" if t.group("position_number") == None else t.group("position_number")
if t.group("sub_element") is not None:
if (
(0 <= int(file_number) <= 255)
and (0 <= int(t.group("element_number")) <= 255)
and (0 <= int(t.group("sub_element")) <= 15)
):
element_count = t.group("element_count")
return {
"file_type": t.group("file_type").upper(),
"file_number": file_number,
"element_number": t.group("element_number"),
"pos_number": position_number,
"sub_element": t.group("sub_element"),
"address_field": 3,
"element_count": int(element_count) if element_count is not None else 1,
"tag": tag_name,
}
else:
if 0 <= int(t.group("element_number")) <= 255:
element_count = t.group("element_count")
return {
"file_type": t.group("file_type").upper(),
"file_number": file_number,
"element_number": t.group("element_number"),
"pos_number": position_number,
"sub_element": 0,
"address_field": 2,
"element_count": int(element_count) if element_count is not None else 1,
"tag": tag_name,
}
t = ST_RE.search(tag)
if (
t
and (1 <= int(t.group("file_number")) <= 255)
and (0 <= int(t.group("element_number")) <= 255)
):
_cnt = t.group("_elem_cnt_token")
tag_name = t.group(0).replace(_cnt, "") if _cnt else t.group(0)
element_number = int(t.group("element_number"))
element_count = t.group("element_count")
return {
"file_type": t.group("file_type").upper(),
"file_number": t.group("file_number"),
"element_number": element_number,
"address_field": 2,
"element_count": int(element_count) if element_count is not None else 1,
"tag": tag_name,
}
t = A_RE.search(tag)
if (
t
and (1 <= int(t.group("file_number")) <= 255)
and (0 <= int(t.group("element_number")) <= 255)
):
_cnt = t.group("_elem_cnt_token")
tag_name = t.group(0).replace(_cnt, "") if _cnt else t.group(0)
element_number = int(t.group("element_number"))
element_count = t.group("element_count")
return {
"file_type": t.group("file_type").upper(),
"file_number": t.group("file_number"),
"element_number": element_number,
"address_field": 2,
"element_count": int(element_count) if element_count is not None else 1,
"tag": tag_name,
}
t = S_RE.search(tag)
if t:
_cnt = t.group("_elem_cnt_token")
tag_name = t.group(0).replace(_cnt, "") if _cnt else t.group(0)
element_count = t.group("element_count")
if t.group("sub_element") is not None:
if (0 <= int(t.group("element_number")) <= 255) and (
0 <= int(t.group("sub_element")) <= 15
):
return {
"file_type": t.group("file_type").upper(),
"file_number": "2",
"element_number": t.group("element_number"),
"sub_element": t.group("sub_element"),
"address_field": 3,
"element_count": int(element_count) if element_count is not None else 1,
"tag": t.group(0),
}
else:
if 0 <= int(t.group("element_number")) <= 255:
return {
"file_type": t.group("file_type").upper(),
"file_number": "2",
"element_number": t.group("element_number"),
"address_field": 2,
"element_count": int(element_count) if element_count is not None else 1,
"tag": tag_name,
}
t = B_RE.search(tag)
if (
t
and (1 <= int(t.group("file_number")) <= 255)
and (0 <= int(t.group("element_number")) <= 4095)
):
_cnt = t.group("_elem_cnt_token")
tag_name = t.group(0).replace(_cnt, "") if _cnt else t.group(0)
bit_position = int(t.group("element_number"))
element_number = bit_position / 16
sub_element = bit_position - (element_number * 16)
element_count = t.group("element_count")
return {
"file_type": t.group("file_type").upper(),
"file_number": t.group("file_number"),
"element_number": element_number,
"sub_element": sub_element,
"address_field": 3,
"element_count": int(element_count) if element_count is not None else 1,
"tag": tag_name,
}
return None
def get_bit(value: int, idx: int) -> bool:
""":returns value of bit at position idx"""
return (value & (1 << idx)) != 0
def writeable_value(tag: dict, value: Union[bytes, TagValueType]) -> bytes:
if isinstance(value, bytes):
return value
bit_field = tag.get("address_field", 0) == 3
bit_position = int(tag.get("sub_element") or 0) if bit_field else 0
bit_mask = UINT.encode(2 ** bit_position) if bit_field else b"\xFF\xFF"
element_count = tag.get("element_count") or 1
if element_count > 1:
if len(value) < element_count:
raise RequestError(
f"Insufficient data for requested elements, expected {element_count} and got {len(value)}"
)
if len(value) > element_count:
value = value[:element_count]
try:
pack_func = PCCCDataTypes[tag["file_type"]].encode
if element_count > 1:
_value = b"".join(pack_func(val) for val in value)
else:
if bit_field:
tag["data_size"] = 2
if tag["file_type"] in ["T", "C"] and bit_position in {
PCCC_CT["PRE"],
PCCC_CT["ACC"],
}:
bit_mask = b"\xff\xff"
_value = pack_func(value)
else:
_value = bit_mask if value else b"\x00\x00"
else:
_value = pack_func(value)
except Exception as err:
raise RequestError(
f'Failed to create a writeable value for {tag["tag"]} from {value}'
) from err
else:
return bit_mask + _value
def request_status(data) -> Optional[str]:
try:
_status_code = int(data[58])
if _status_code == SUCCESS:
return None
return PCCC_ERROR_CODE.get(_status_code, "Unknown Status")
except Exception:
return "Unknown Status"