Generic Messaging#

The LogixDriver.generic_message() works in a similar way to the MSG instruction in Logix. It allows the user to perform messaging services not directly implemented in the library. It is also used internally to implement some of the CIP services used by the library (Forward Open, get/set PLC time, etc).

Accessing Drive Parameters#

While a drive may not be a PLC, we can use generic messaging to read parameters from it. The target drive is a PowerFlex 525 and using this Rockwell KB Article we can get the appropriate parameters to read/write parameters from the drive.

def read_pf525_parameter():
    drive_path = '10.10.10.100/bp/1/enet/192.168.1.55'

    with CIPDriver(drive_path) as drive:
        param = drive.generic_message(
            service=Services.get_attribute_single,
            class_code=b'\x93',
            instance=41,  # Parameter 41 = Accel Time
            attribute=b'\x09',
            data_type=INT,
            connected=False,
            unconnected_send=True,
            route_path=True,
            name='pf525_param'
        )
        print(param)
>>> read_pf525_parameter()
pf525_param, 500, None, None
def write_pf525_parameter():
    drive_path = '10.10.10.100/bp/1/enet/192.168.1.55'

    with CIPDriver(drive_path) as drive:
        drive.generic_message(
            service=Services.set_attribute_single,
            class_code=b'\x93',
            instance=41,  # Parameter 41 = Accel Time
            attribute=b'\x09',
            request_data=INT.encode(500),  # = 5 seconds * 100
            connected=False,
            unconnected_send=True,
            route_path=True,
            name='pf525_param'
        )

Reading Device Statuses#

ENBT/EN2T OK LED Status#

This message will get the current status of the OK LED from and ENBT or EN2T module.

def enbt_ok_led_status():
    message_path = '10.10.10.100/bp/2'

    with CIPDriver(message_path) as device:
        data = device.generic_message(
            service=Services.get_attribute_single,
            class_code=b'\x01',  # Values from RA Knowledgebase
            instance=1,  # Values from RA Knowledgebase
            attribute=5,  # Values from RA Knowledgebase
            data_type=INT,
            connected=False,
            unconnected_send=True,
            route_path=True,
            name='OK LED Status'
        )
        # The LED Status is returned as a binary representation on bits 4, 5, 6, and 7. The decimal equivalents are:
        # 0 = Solid Red, 64 = Flashing Red, and 96 = Solid Green. The ENBT/EN2T do not display link lost through the OK LED.
        statuses = {
            0: 'solid red',
            64: 'flashing red',
            96: 'solid green'
        }
        print(statuses.get(data.value), 'unknown')

Stratix Switch Power Status#

This message will read the current power status for both power inputs on a Stratix switch.

def stratix_power_status():
    message_path = '10.10.10.100/bp/2/enet/192.168.1.1'

    with CIPDriver(message_path) as device:
        data = device.generic_message(
            service=b'\x0e',
            class_code=863,  # use decimal representation of hex class code
            instance=1,
            attribute=8,
            connected=False,
            unconnected_send=True,
            route_path=True,
            data_type=INT,
            name='Power Status'
        )
        # Returns a binary representation of the power status. Bit 0 is PWR A, Bit 1 is PWR B. If 1, power is applied. If 0, power is off.
        pwr_a = 'on' if data.value & 0b_1 else 'off'
        pwr_b = 'on' if data.value & 0b_10 else 'off'
        print(f'PWR A: {pwr_a}, PWR B: {pwr_b}')

IP Configuration#

Static/DHCP/BOOTP Status#

This message will read the IP setting configuration type from an ethernet module.

def ip_config():
    message_path = '10.10.10.100/bp/2'

    with CIPDriver(message_path) as plc:  # L85
        data = plc.generic_message(
            service=b'\x0e',
            class_code=b'\xf5',
            instance=1,
            attribute=3,
            connected=False,
            unconnected_send=True,
            route_path=True,
            data_type=INT,
            name='IP_config'
        )

        statuses = {
            0b_0000: 'static',
            0b_0001: 'BOOTP',
            0b_0010: 'DHCP'
        }

        ip_status = data.value & 0b_1111  # only need the first 4 bits
        print(statuses.get(ip_status, 'unknown'))

Communication Module MAC Address#

This message will read the MAC address of ethernet module where the current connection is opened.

def get_mac_address():
    with CIPDriver('10.10.10.100') as plc:
        response = plc.generic_message(
            service=Services.get_attribute_single,
            class_code=ClassCode.ethernet_link,
            instance=1,
            attribute=3,
            data_type=USINT[6],
            connected=False
        )

        if response:
            return ':'.join(f'{x:0>2x}' for x in response.value)
        else:
            print(f'error getting MAC address - {response.error}')

Upload EDS File#

This example shows how to use generic messaging to upload and save an EDS file from a device.

from pycomm3 import (CIPDriver, Services, ClassCode,  FileObjectServices, FileObjectInstances,
                     FileObjectInstanceAttributes, Struct, UDINT, USINT, n_bytes)
import itertools
import gzip
from pathlib import Path

SAVE_PATH = Path.home()


def upload_eds():
    """
    Uploads the EDS and ICO files from the device and saves the files.
    """
    with CIPDriver('192.168.1.236') as driver:
        if initiate_transfer(driver):
            file_data = upload_file(driver)
            encoding = get_file_encoding(driver)

            if encoding == 'zlib':
                # in this case the file has both the eds and ico files in it
                files = decompress_eds(file_data)

                for filename, file_data in files.items():
                    file_path = SAVE_PATH / filename
                    file_path.write_bytes(file_data)

            elif encoding == 'binary':
                file_name = get_file_name(driver)
                file_path = SAVE_PATH / file_name
                file_path.write_bytes(file_data)
            else:
                print('Unsupported Encoding')
        else:
            print('Failed to initiate transfer')


def initiate_transfer(driver):
    """
    Initiates the transfer with the device
    """
    resp = driver.generic_message(
        service=FileObjectServices.initiate_upload,
        class_code=ClassCode.file_object,
        instance=FileObjectInstances.eds_file_and_icon,
        route_path=True,
        unconnected_send=True,
        connected=False,
        request_data=b'\xFF',  # max transfer size
        data_type=Struct(UDINT('FileSize'), USINT('TransferSize'))
    )
    return resp


def upload_file(driver):
    contents = b''

    for i in itertools.cycle(range(256)):
        resp = driver.generic_message(
            service=FileObjectServices.upload_transfer,
            class_code=ClassCode.file_object,
            instance=FileObjectInstances.eds_file_and_icon,
            route_path=True,
            unconnected_send=True,
            connected=False,
            request_data=USINT.encode(i),
            data_type=Struct(USINT('TransferNumber'), USINT('PacketType'), n_bytes(-1, 'FileData'))

        )

        if resp:
            packet_type = resp.value['PacketType']
            data = resp.value['FileData']

            contents += data

            # CIP Vol 1 Section 5-42.4.5
            # 0 - first packet
            # 1 - middle packet
            # 2 - last packet
            # 3 - Abort transfer
            # 4 - first & last packet
            # 5-255 - Reserved
            if packet_type not in (0, 1):
                break
        else:
            print(f'failed response {resp}')
            break

    contents = contents[:-2]  # strip off checksum
    return contents


def get_file_encoding(driver):
    """
    get the encoding format for the eds file object
    """
    attr = FileObjectInstanceAttributes.file_encoding_format

    resp = driver.generic_message(
        service=Services.get_attribute_single,
        class_code=ClassCode.file_object,
        attribute=attr.attr_id,
        instance=FileObjectInstances.eds_file_and_icon,
        route_path=True,
        unconnected_send=True,
        connected=False,
        data_type=attr.data_type,
    )
    _enc_code = resp.value if resp else None
    EDS_ENCODINGS = {
        0: 'binary',
        1: 'zlib'
    }
    file_encoding = EDS_ENCODINGS.get(_enc_code, 'UNSUPPORTED ENCODING')
    return file_encoding


def decompress_eds(contents):
    """
    extract the eds and ico files from the uploaded file

    returns a dict of {file name: file contents}
    """
    GZ_MAGIC_BYTES = b'\x1f\x8b'

    # there is actually 2 files, the eds file and the icon
    # we need to split the file contents since gzip
    # only supports single files

    end_file1 = contents.find(GZ_MAGIC_BYTES, 2)
    file1, file2 = contents[:end_file1], contents[end_file1:]
    eds = gzip.decompress(file1)
    ico = gzip.decompress(file2)
    eds_name = file1[10:file1.find(b'\x00', 10)].decode()
    ico_name = file2[10:file2.find(b'\x00', 10)].decode()

    return {eds_name: eds, ico_name: ico}


def get_file_name(driver):
    """
    Get the filename of the eds file object
    """
    attr = FileObjectInstanceAttributes.file_name
    resp = driver.generic_message(
        service=Services.get_attribute_single,
        class_code=ClassCode.file_object,
        attribute=attr.attr_id,
        instance=FileObjectInstances.eds_file_and_icon,
        route_path=True,
        unconnected_send=True,
        connected=False,
        data_type=attr.data_type
    )

    file_name = resp.value['FileName'][0] if resp else None
    return file_name


if __name__ == '__main__':
    upload_eds()