Generic Messaging#
The LogixDriver.generic_message()
works in a similar way to the MSG instruction in Logix. It allows the user
to perform messaging services not directly implemented in the library. It is also used internally to implement some of the
CIP services used by the library (Forward Open, get/set PLC time, etc).
Accessing Drive Parameters#
While a drive may not be a PLC, we can use generic messaging to read parameters from it. The target drive is a PowerFlex 525 and using this Rockwell KB Article we can get the appropriate parameters to read/write parameters from the drive.
def read_pf525_parameter(): drive_path = '10.10.10.100/bp/1/enet/192.168.1.55' with CIPDriver(drive_path) as drive: param = drive.generic_message( service=Services.get_attribute_single, class_code=b'\x93', instance=41, # Parameter 41 = Accel Time attribute=b'\x09', data_type=INT, connected=False, unconnected_send=True, route_path=True, name='pf525_param' ) print(param)>>> read_pf525_parameter() pf525_param, 500, None, Nonedef write_pf525_parameter(): drive_path = '10.10.10.100/bp/1/enet/192.168.1.55' with CIPDriver(drive_path) as drive: drive.generic_message( service=Services.set_attribute_single, class_code=b'\x93', instance=41, # Parameter 41 = Accel Time attribute=b'\x09', request_data=INT.encode(500), # = 5 seconds * 100 connected=False, unconnected_send=True, route_path=True, name='pf525_param' )
Reading Device Statuses#
ENBT/EN2T OK LED Status#
This message will get the current status of the OK LED from and ENBT or EN2T module.
def enbt_ok_led_status(): message_path = '10.10.10.100/bp/2' with CIPDriver(message_path) as device: data = device.generic_message( service=Services.get_attribute_single, class_code=b'\x01', # Values from RA Knowledgebase instance=1, # Values from RA Knowledgebase attribute=5, # Values from RA Knowledgebase data_type=INT, connected=False, unconnected_send=True, route_path=True, name='OK LED Status' ) # The LED Status is returned as a binary representation on bits 4, 5, 6, and 7. The decimal equivalents are: # 0 = Solid Red, 64 = Flashing Red, and 96 = Solid Green. The ENBT/EN2T do not display link lost through the OK LED. statuses = { 0: 'solid red', 64: 'flashing red', 96: 'solid green' } print(statuses.get(data.value), 'unknown')
Link Status#
This message will read the current link status for any ethernet module.
def link_status(): message_path = '10.10.10.100/bp/2' with CIPDriver(message_path) as device: data = device.generic_message( service=Services.get_attribute_single, class_code=b'\xf6', # Values from RA Knowledgebase instance=1, # For multiport devices, change to "2" for second port, "3" for third port. # For CompactLogix, front port is "1" and back port is "2". attribute=2, # Values from RA Knowledgebase data_type=INT, connected=False, unconnected_send=True, route_path=True, name='LinkStatus' ) # Prints the binary representation of the link status. The definition of the bits are: # Bit 0 - Link Status - 0 means inactive link (Link Lost), 1 means active link. # Bit 1 - Half/Full Duplex - 0 means half duplex, 1 means full duplex # Bit 2 to 4 - Binary representation of auto-negotiation and speed detection status: # 0 = Auto-negotiation in progress # 1 = Auto-negotiation and speed detection failed # 2 = Auto-negotiation failed, speed detected # 3 = Auto-negotiation successful and speed detected # 4 = Manually forced speed and duplex # Bit 5 - Setting Requires Reset - if 1, a manual setting requires resetting of the module # Bit 6 - Local Hardware Fault - 0 indicates no hardware faults, 1 indicates a fault detected. print(bin(data.value))
Stratix Switch Power Status#
This message will read the current power status for both power inputs on a Stratix switch.
def stratix_power_status(): message_path = '10.10.10.100/bp/2/enet/192.168.1.1' with CIPDriver(message_path) as device: data = device.generic_message( service=b'\x0e', class_code=863, # use decimal representation of hex class code instance=1, attribute=8, connected=False, unconnected_send=True, route_path=True, data_type=INT, name='Power Status' ) # Returns a binary representation of the power status. Bit 0 is PWR A, Bit 1 is PWR B. If 1, power is applied. If 0, power is off. pwr_a = 'on' if data.value & 0b_1 else 'off' pwr_b = 'on' if data.value & 0b_10 else 'off' print(f'PWR A: {pwr_a}, PWR B: {pwr_b}')
IP Configuration#
Static/DHCP/BOOTP Status#
This message will read the IP setting configuration type from an ethernet module.
def ip_config(): message_path = '10.10.10.100/bp/2' with CIPDriver(message_path) as plc: # L85 data = plc.generic_message( service=b'\x0e', class_code=b'\xf5', instance=1, attribute=3, connected=False, unconnected_send=True, route_path=True, data_type=INT, name='IP_config' ) statuses = { 0b_0000: 'static', 0b_0001: 'BOOTP', 0b_0010: 'DHCP' } ip_status = data.value & 0b_1111 # only need the first 4 bits print(statuses.get(ip_status, 'unknown'))
Communication Module MAC Address#
This message will read the MAC address of ethernet module where the current connection is opened.
def get_mac_address(): with CIPDriver('10.10.10.100') as plc: response = plc.generic_message( service=Services.get_attribute_single, class_code=ClassCode.ethernet_link, instance=1, attribute=3, data_type=USINT[6], connected=False ) if response: return ':'.join(f'{x:0>2x}' for x in response.value) else: print(f'error getting MAC address - {response.error}')
Upload EDS File#
This example shows how to use generic messaging to upload and save an EDS file from a device.
from pycomm3 import (CIPDriver, Services, ClassCode, FileObjectServices, FileObjectInstances, FileObjectInstanceAttributes, Struct, UDINT, USINT, n_bytes) import itertools import gzip from pathlib import Path SAVE_PATH = Path.home() def upload_eds(): """ Uploads the EDS and ICO files from the device and saves the files. """ with CIPDriver('192.168.1.236') as driver: if initiate_transfer(driver): file_data = upload_file(driver) encoding = get_file_encoding(driver) if encoding == 'zlib': # in this case the file has both the eds and ico files in it files = decompress_eds(file_data) for filename, file_data in files.items(): file_path = SAVE_PATH / filename file_path.write_bytes(file_data) elif encoding == 'binary': file_name = get_file_name(driver) file_path = SAVE_PATH / file_name file_path.write_bytes(file_data) else: print('Unsupported Encoding') else: print('Failed to initiate transfer') def initiate_transfer(driver): """ Initiates the transfer with the device """ resp = driver.generic_message( service=FileObjectServices.initiate_upload, class_code=ClassCode.file_object, instance=FileObjectInstances.eds_file_and_icon, route_path=True, unconnected_send=True, connected=False, request_data=b'\xFF', # max transfer size data_type=Struct(UDINT('FileSize'), USINT('TransferSize')) ) return resp def upload_file(driver): contents = b'' for i in itertools.cycle(range(256)): resp = driver.generic_message( service=FileObjectServices.upload_transfer, class_code=ClassCode.file_object, instance=FileObjectInstances.eds_file_and_icon, route_path=True, unconnected_send=True, connected=False, request_data=USINT.encode(i), data_type=Struct(USINT('TransferNumber'), USINT('PacketType'), n_bytes(-1, 'FileData')) ) if resp: packet_type = resp.value['PacketType'] data = resp.value['FileData'] contents += data # CIP Vol 1 Section 5-42.4.5 # 0 - first packet # 1 - middle packet # 2 - last packet # 3 - Abort transfer # 4 - first & last packet # 5-255 - Reserved if packet_type not in (0, 1): break else: print(f'failed response {resp}') break contents = contents[:-2] # strip off checksum return contents def get_file_encoding(driver): """ get the encoding format for the eds file object """ attr = FileObjectInstanceAttributes.file_encoding_format resp = driver.generic_message( service=Services.get_attribute_single, class_code=ClassCode.file_object, attribute=attr.attr_id, instance=FileObjectInstances.eds_file_and_icon, route_path=True, unconnected_send=True, connected=False, data_type=attr.data_type, ) _enc_code = resp.value if resp else None EDS_ENCODINGS = { 0: 'binary', 1: 'zlib' } file_encoding = EDS_ENCODINGS.get(_enc_code, 'UNSUPPORTED ENCODING') return file_encoding def decompress_eds(contents): """ extract the eds and ico files from the uploaded file returns a dict of {file name: file contents} """ GZ_MAGIC_BYTES = b'\x1f\x8b' # there is actually 2 files, the eds file and the icon # we need to split the file contents since gzip # only supports single files end_file1 = contents.find(GZ_MAGIC_BYTES, 2) file1, file2 = contents[:end_file1], contents[end_file1:] eds = gzip.decompress(file1) ico = gzip.decompress(file2) eds_name = file1[10:file1.find(b'\x00', 10)].decode() ico_name = file2[10:file2.find(b'\x00', 10)].decode() return {eds_name: eds, ico_name: ico} def get_file_name(driver): """ Get the filename of the eds file object """ attr = FileObjectInstanceAttributes.file_name resp = driver.generic_message( service=Services.get_attribute_single, class_code=ClassCode.file_object, attribute=attr.attr_id, instance=FileObjectInstances.eds_file_and_icon, route_path=True, unconnected_send=True, connected=False, data_type=attr.data_type ) file_name = resp.value['FileName'][0] if resp else None return file_name if __name__ == '__main__': upload_eds()