Source code for twinpy.twincat.connection

from pyads import AdsSymbol, Connection
from pyads.constants import MAX_ADS_SUB_COMMANDS
from import _list_slice_generator, _dict_slice_generator
import pyads
from typing import Optional, Union, Type, List, Dict, Any, Tuple

from .symbols import Parameter, Signal
from .pyads_extra import adsSumReadSymbols, adsSumWriteSymbols

[docs]class TwincatConnection(Connection): """Extend default Connection object (typically named `plc`). ADS connection with custom features. """ def __init__( self, ams_net_id: str = "", ams_net_port: int = 350, ip_address: str = None, ): """ Note that this version will connect on object creation, throwing an exception when it fails. `pyads.Connection` waits for `.open()` and will fail quietly. :param ams_net_id: TwinCAT AMS address (default is localhost) :param ams_net_port: ADS Port (default is 350) :param ip_address: Target IP (automatically deduced from AMS address) :raises pyads.ADSError: When connection failed """ super().__init__(ams_net_id, ams_net_port, ip_address) if not self.is_open: raise pyads.ADSError( text="Connection to TwinCAT could not be " "established. Is TwinCAT in run mode?" "Are the port and address correct?" )
[docs] def get_module_info(self, module_name: str) -> dict: """Get information about live module.""" # Read all info as list of bytes (doing it as a list of variable # names fails) try: var_name = module_name + ".ModuleInfo" data = self.read_by_name(var_name, pyads.PLCTYPE_ARR_DINT(37)) except pyads.ADSError as err: # Re-raise error raise pyads.ADSError( text="Could not connect to TwinCAT module. " "Does the target module exist in the " "running TwinCAT instance?" ) from err return { "ClassId": data[0:4], "BuildTimeStamp": data[4], "ModelCheckSum": data[5:9], "ModelVersion": data[9:13], "TwinCatVersion": data[13:17], "TcTargetVersion": data[17:21], "MatlabVersion": data[21:25], "SimulinkVersion": data[25:29], "CoderVersion": data[29:33], "TcTargetLicenseID": data[33:38], }
[docs] def get_signal( self, name: Optional[str] = None, index_group: Optional[int] = None, index_offset: Optional[int] = None, symbol_type: Optional[Union[str, Type]] = None, ) -> Signal: """Get Signal instance. See :class:`Signal`. """ return Signal( plc=self, name=name, index_group=index_group, index_offset=index_offset, symbol_type=symbol_type, )
[docs] def get_parameter( self, name: Optional[str] = None, index_group: Optional[int] = None, index_offset: Optional[int] = None, symbol_type: Optional[Union[str, Type]] = None, ) -> Parameter: """Get Parameter instance. See :class:`Parameter`. """ return Parameter( plc=self, name=name, index_group=index_group, index_offset=index_offset, symbol_type=symbol_type, )
[docs] def read_list_of_symbols( self, symbols: List[AdsSymbol], ads_sub_commands: int = MAX_ADS_SUB_COMMANDS, ) -> Dict[AdsSymbol, Any]: """Read a list of symbols in a single request. Same principe as `read_list_by_name`. See :func:`read_list_by_name` for more info. This version doesn't work for structs. The `_value` property for each symbol will be updated. A dictionary will also be returned of the symbol names and their new values. """ for symbol in symbols: if symbol.is_structure: raise ValueError("Method is not available for structured variables") # Relying on `adsSumRead()` is tricky, because we do not have the `dataType` # (integer) for each symbol, we only have the ctypes-type. # Limit request side, split into multiple if needed if len(symbols) <= ads_sub_commands: symbols_list = [symbols] # Turn into list of a single element else: symbols_list = _list_slice_generator(symbols, ads_sub_commands) return_data: Dict[AdsSymbol, Any] = {} for symbols_slice in symbols_list: result = adsSumReadSymbols( self._port, self._adr, symbols_slice, ) return_data.update(result) for symbol, value in return_data.items(): symbol._value = value return return_data
[docs] def write_list_of_symbols( self, symbols_and_values: Dict[AdsSymbol, Any], ads_sub_commands: int = MAX_ADS_SUB_COMMANDS, ) -> Dict[AdsSymbol, str]: """Write new values to a list of symbols. Same principe as `write_list_by_name`. See :func:`write_list_by_name` for more info. For example: .. code:: python # Using dict new_data = {symbol1: 3.14, symbol2: False} plc.write_list_of_symbols(new_data) :param symbols_and_values: Symbols to write to :param ads_sub_commands: Max. number of symbols per call (see `write_list_by_name`) """ for symbol in symbols_and_values.keys(): if symbol.is_structure: raise ValueError("Method not available for structured variables") for symbol, new_value in symbols_and_values.items(): symbol._value = new_value # Update cache if len(symbols_and_values) <= ads_sub_commands: # Turn into array of single element symbols_and_values_list = [symbols_and_values] else: symbols_and_values_list = _dict_slice_generator( symbols_and_values, ads_sub_commands ) return_data: Dict[AdsSymbol, str] = {} for symbols_and_value_slice in symbols_and_values_list: result = adsSumWriteSymbols( self._port, self._adr, symbols_and_value_slice, ) return_data.update(result) return return_data