Source code for twinpy.twincat.simulink

"""Model to wrap around a Simulink model.

An object for a Simulink model is created first before a
TwinCAT connection is made. We cannot get the original
model structure from TwinCAT alone.
"""

from __future__ import annotations  # Allows forward declarations
from typing import Optional, List, Dict, Union

import os
import xml.etree.ElementTree as ElementTree
import re
import warnings

from .connection import TwincatConnection
from .symbols import Symbol, Signal, Parameter

# Constants
FILENAME_TEMPLATE = "{0}_ModuleInfo.xml"

# Store compile regex objects to save some performance
re_characters = re.compile(r"\W+")
re_leading = re.compile(r"^[\d\_]+")


[docs]def sanitize_name(name: str) -> str: """Reduce a string to characters which are allowed in a Python variable name. This is needed because Simulink blocks can contain more characters than this. Python variables can only contain a-z, A-Z, 0-9 and '_'. Additionally, a variable cannot start with a digit, nor can it start with an underscore to prevent conflicts with semi-private properties. """ # Built-in `\W` checks for anything that is *not* a letter, digit # or underscore name = re_characters.sub("", name) # Remove all leading digits and underscores name = re_leading.sub("", name) return name
[docs]class SimulinkBlock: """ A single Simulink Block (anything, e.g. constant, gain, a sub-system) A SimulinkBlock can contain children, which are also SimulinkBlock objects. Using __getattr__ those subblocks (and their symbols) can be addressed directly: model = ... # Subblocks can be addressed smoothly: print(model.MySubsystem.MyConstant.Value) Blocks contain parameters (`Value`) in the example above. When only a single parameter or signal is present, you can directly call it from the block itself: print(model.MySubsystem.MyConstant.get()) # Short print(model.MySubsystem.MyConstant.Value.get()) # Same but longer print(model.MySubsystem.MySineWave.Phase.get()) # Multiple parameters print(model.MySubsystem.MySineWave.Amplitude.get()) """ def __init__(self, xmltree: ElementTree.Element, model: SimulinkModel): """ Create this block based on an XML tree Sub-blocks are created too based on the remaining tree structure. This means the creation of blocks works recursively. :param xmltree: A branch of a model XML tree (or the entire tree) :param model: A reference back to the original model (the root of the structure) """ name_branch = xmltree.find("Name") self.name = name_branch.text if name_branch is not None else None type_branch = xmltree.find("Type") self.type = type_branch.text if type_branch is not None else None identifier_branch = xmltree.find("Identifier") self.identifier = ( identifier_branch.text if identifier_branch is not None else None ) if self.name is None and self.identifier is not None: # Use part of identifier instead self.name = self.identifier.rsplit("/")[-1] # Private property, references back the complete model self._model = model self._subblocks = {} self._parameters = {} self._signals = {} # Dict of sub-blocks (private property) self._subblocks = self.make_subblocks(xmltree) # Dict of parameters (private property) self._parameters = self.make_parameters(xmltree) # Dict of signals (private property) self._signals = self.make_signals(xmltree)
[docs] def make_subblocks(self, xmltree: ElementTree.Element) -> Dict[str, SimulinkBlock]: """Build sub-blocks (this makes the SimulinkBlocks recursive).""" subblocks = {} # List of sub-blocks (private property) for subblock_xml in xmltree.findall("Block"): if ( subblock_xml.find("Type").text in ["Inport", "Outport", "Terminator"] or subblock_xml.find("Identifier") is None ): # Skip blocks that don't take or show any information, and skip # blocks that somehow lack an identifier continue subblock = SimulinkBlock(subblock_xml, self._model) block_name = sanitize_name(subblock.name) subblocks[block_name] = subblock return subblocks
[docs] def make_parameters(self, xmltree: ElementTree.Element) -> dict: """Find and create Parameters in the current block.""" parameters = {} for parameter_xml in xmltree.findall("Parameter"): # Skip first two characters: index_offset_hex = parameter_xml.find("AdsIdxOffs").text[2:] index_offset = int(index_offset_hex, 16) # Convert to decimal parameter = Parameter( block=self, name=parameter_xml.get("Name"), index_offset=index_offset, symbol_type=parameter_xml.find("Type").text, ) parameters[sanitize_name(parameter.name)] = parameter return parameters
[docs] def make_signals(self, xmltree: ElementTree.Element) -> dict: """Find and create Signals in the current block.""" signals = {} # Signals are part of Ports in Simulink for port_xml in xmltree.findall("Port"): signal_xml = port_xml.find("Signal") if signal_xml is None: continue # No accessible signal for this port # Skip first two characters: index_offset_hex = signal_xml.find("AdsIdxOffs").text[2:] index_offset = int(index_offset_hex, 16) # Convert to decimal name = signal_xml.find("GlobalName").text if name.startswith("BlockIO."): name = name[8:] signal = Signal( block=self, name=name, index_offset=index_offset, symbol_type=signal_xml.find("Type").text, ) type_xml = port_xml.find("Type") # Port number, 1-based (but this is merely a human-interacted # property) port_number = type_xml.get("No") type_name = type_xml.text.lower() if type_name.startswith("out"): key = "so" elif type_name.startswith("in"): key = "si" else: continue # Could be a trigger port or something else weird key += port_number signals[sanitize_name(key)] = signal # A signal name might be identical to a parameter, instead we save # it like "in_1" or "out_5" return signals
def _get_first_symbol(self) -> Symbol: """Get reference to first parameter or signal. Throw an error if there are none or more than one (to prevent accidental ambiguity). """ if len(self._parameters) > 1: raise RuntimeError( "Block `{0}` has {1} parameters, you need to " "address them by name directly".format(self.name, len(self._parameters)) ) if len(self._signals) > 1: raise RuntimeError( "Block `{0}` has {1} signals, you need to" "address them by name directly".format(self.name, len(self._signals)) ) if self._parameters: key = next(iter(self._parameters)) return self._parameters[key] if self._signals: key = next(iter(self._signals)) return self._signals[key] raise RuntimeError("Block `{0}` has no parameters or signals".format(self.name))
[docs] def get(self): """Get value of the first symbol.""" symbol = self._get_first_symbol() return symbol.get()
[docs] def set(self, val): """Set value of the first symbol.""" symbol = self._get_first_symbol() return symbol.set(val)
[docs] def get_plc(self) -> Optional[TwincatConnection]: """Return Connection (owned by model).""" return self._model.plc
[docs] def get_index_group(self) -> int: """Return the group index (owned by model).""" return self._model.object_id
[docs] def get_symbols_recursive(self) -> List[Symbol]: """Recursively navigate subblocks and collect all parameters and signals.""" for *_, parameter in self._parameters.items(): yield parameter for *_, signal in self._signals.items(): yield signal for *_, subblock in self._subblocks.items(): for sub_symbol in subblock.get_symbols_recursive(): yield sub_symbol
[docs] def print_structure(self, max_depth: Optional[int] = 3, depth: int = 0): """Recursively print the child signals and parameters of this block. Use this to test your model from the command line. :param max_depth: Max recursion depth (set to None for infinite) :param depth: Current depth (do not use this argument, it's used internally) """ # Get indentation indent = " " * depth * 8 # Block / model itself print(indent, self.name, "(" + type(self).__name__ + ")") # Parameters for name, parameter in self._parameters.items(): print(indent, " + ", name, "(" + parameter.symbol_type + ")") # Signals for name, signal in self._signals.items(): print( indent, " * ", name, "(" + signal.name + ",", signal.symbol_type + ")" ) # Do subblocks if depth allows it if max_depth is None or depth < max_depth: for *_, subblock in self._subblocks.items(): print() # Empty line subblock.print_structure(max_depth, depth=depth + 1) # Print ellipsis if cut off by max_depth elif max_depth is not None and depth >= max_depth: if self._subblocks: print(" " * (depth + 1) * 8, "...")
def __getattr__(self, item: str) -> Union[SimulinkBlock, Symbol]: """Magic method, executed when an addressed property does not exist. When a property does not exist, we think it might be a subblock and it will be retrieved from self._subblocks, or a symbol and it will be retrieved from self._signals or self._parameters. :param item: The requested item :return: """ if item in ["_subblocks", "_parameters", "_signals"]: raise RuntimeError( "Tried to find `{}` with __getattr__, " "this should not happen".format(item) ) if hasattr(self, "_subblocks") and item in self._subblocks: return self._subblocks[item] if hasattr(self, "_parameters") and item in self._parameters: return self._parameters[item] if hasattr(self, "_signals") and item in self._signals: return self._signals[item] raise AttributeError( "The current block `{0}` has no property, " "subblock, parameter or signal named `{1}`".format(self.name, item) ) def __dir__(self): """Method to show internals. Used to help autocompletion with our hidden subblocks and symbols. We override this to show the subblocks and symbols accessed through __getattr__. """ return ( list(super().__dir__()) + list(self._subblocks.keys()) + list(self._parameters.keys()) + list(self._signals.keys()) ) def __len__(self): """Result of len(object).""" return len(self._subblocks) + len(self._parameters) + len(self._signals) def __iter__(self): """Make hidden properties iterable.""" return self._subblocks.__iter__() def __getitem__(self, key): """Allow the [...] notation.""" return self.__getattr__(key) def __repr__(self): """Debug print.""" return "<%s instance at %s>, name: %s (type: %s)" % ( self.__class__.__name__, id(self), self.name, self.type, )
[docs]class SimulinkModel(SimulinkBlock): """Wrapper for a compiled Simulink model in TwinCAT. The model is built using the XML file, created when the model is compiled. Therefore the model can be loaded without TwinCAT running. This model object is actually an extension of a SimulinkBlock. The complete model is basically just the root block. """ def __init__(self, object_id: int, object_name: str, type_name: str = None): """ By default the `TWINCAT3DIR` environment variable is used to locate the TwinCAT installation and look for the installed compiled XML files. To work around this, you can pass either of the following to `type_name`: * A single name (`TWINCAT3DIR` will be used) * A path to a directory (default XML file name will be searched) * A path to the XML file, typically named like `*_ModuleInfo.xml` (no searching will be done) :param object_id: ID of the TcCOM object in TwinCAt (the symbol group index) :param object_name: Object Name (as shown in TwinCAT) :param type_name: Type name (as shown in TwinCAT) (defaults to be the same as object_name). """ if type_name is None: type_name = object_name self.type_name = type_name xml_root = self.get_xml_data(self.type_name) # The 'DefaultValues' branch contains an un-indexed list of standard # properties self.build_timestamp = None self.module_info = self.get_module_info( xml_root.find("ModuleInfo/DefaultValues") ) self.plc: Optional[TwincatConnection] = None # Nothing connected by default # Actual structure starts here block_diagram = xml_root.find("ModuleInfo/BlockDiagram") self.object_id = object_id # Use super class to build the root block and the children (recursive) super().__init__(block_diagram, self) # Pass `self` as the model to follow the SimulinkBlock pattern self.name = object_name # The super constructor will write an incorrect # name, replace it with the passed name
[docs] @staticmethod def get_xml_data(type_name: str) -> ElementTree.Element: """Find and parse model XML file. The block diagram is returned. """ user_path = os.path.realpath(type_name) if os.path.isfile(user_path): filepath = user_path else: if os.path.isdir(user_path): module_dir = user_path else: module_dir = os.path.join( os.getenv("TWINCAT3DIR", "C:/TwinCAT/3.1/"), "CustomConfig/Modules", type_name, ) dir_name = os.path.basename(os.path.normpath(user_path)) # Find the # name of the last directory in the path (this will be the name of # the XML file) filename = FILENAME_TEMPLATE.format(dir_name) filepath = os.path.join(module_dir, "deploy", filename) if not os.path.isfile(filepath): # Older TwinCAT versions have not yet added a `deploy` directory filepath = os.path.join(module_dir, filename) try: xml_root = ElementTree.parse(filepath).getroot() except FileNotFoundError as err: raise FileNotFoundError( "XML file belonging to Simulink model could not be found at " "`{}` (also not in the `deploy` subdirectory)! Is the " "type_name correct? Was the default module location " "used?".format(filepath) ) from err return xml_root
[docs] @staticmethod def get_module_info(xmltree: ElementTree.Element) -> dict: """Get dictionary of module info fields. The `DefaultValues` section is a list of names and values, this method creates a regular dict from it. """ values = {} for value_xml in xmltree: name_xml = value_xml.find("Name") if name_xml is None or not name_xml.text.startswith("ModuleInfo."): continue # Skip this one key = name_xml.text[11:] values[key] = list(value_xml)[1].text # Second element (e.g. Value, GUID, EnumText) is the actual value return values
[docs] def connect_to_twincat(self, connection: TwincatConnection): """Connect model the one running in TwinCAT. This will link all the symbols in the model to actual ADS symbols. And the remote model is compared to the local .xml file through the model checksum. :param connection: Connection object to connect through """ remote_info = connection.get_module_info(module_name=self.name) remote_checksum = remote_info["ModelCheckSum"] # Checksum in TwinCAT is displayed signed for i, val in enumerate(remote_checksum): if val < 0: remote_checksum[i] += 2 ** 32 # Convert signed to unsigned # Checksum in XML is displayed unsigned model_checksum = [ int(self.module_info["ModelCheckSum[%i]" % i]) for i in range(4) ] if model_checksum != remote_checksum: # raise RuntimeError( warnings.warn( "The model `{0}` checksum does not match with the " "online version. Maybe both rebuild and reload " "the model?".format(self.name), RuntimeWarning, ) # Every symbol needs a references to the connection object self.plc = connection # Test connection with the first symbol that was found for symbol in self.get_symbols_recursive(): symbol.set_connection(connection)
[docs] def get_plc(self) -> Optional[TwincatConnection]: return self.plc
[docs] def get_index_group(self) -> int: return self.object_id
def __repr__(self): """Debug print.""" return "<%s instance at %s>, model name: %s" % ( self.__class__.__name__, id(self), self.name, ) def __dir__(self): """List of properties (autocomplete helper), based on super method.""" return ["type_name", "object_id", "module_info"] + super().__dir__()