Source code for twinpy.element.tc_element

"""
Here we define all the non-implemented TwinCAT stuff, without any platform.

The `TcElement` class should be extended by a class of another platform, ideally
through multiple inheritance.
"""

from typing import Optional, Any, Tuple, Union, Callable, Dict, Set
from threading import Timer
from abc import ABC, abstractmethod
import time
from pyads import ADSError

from twinpy.twincat.connection import TwincatConnection
from twinpy.twincat.symbols import Symbol


class TcTimer:
    pass  # Forward-declare the timer class


[docs]class TcElement(ABC): """Abstract class for TwinCAT element. Must be inherited into a new class for something meaningful. There are different event types, which determine how and when new remote values are retrieved: * `EVENT_NOTIFICATION`: An ADS notification is created, resulting in a callback on a remote value change. Suitable for rarely changing values or when a very quick response is needed. ADS notifications have some overhead. No more than 200 symbol notifications should exist at the same time. * `EVENT_TIMER`: New values are read at a fixed interval. Useful when remote values change often but no instant response is needed. This method has very little overhead. * `EVENT_NONE`: No attempts are made to update according to remote values. You can override the defaults for your entire project by changing the ``TcWidget.DEFAULT_EVENT_TYPE`` and ``TcWidget.DEFAULT_UPDATE_FREQ`` class properties. Make sure to update these before any widgets are created and they will have new standard values. :cvar DEFAULT_EVENT_TYPE: (default: EVENT_NOTIFICATION) :cvar DEFAULT_UPDATE_FREQ: (default: 10.0 Hz) """ EVENT_NOTIFICATION = "notification" EVENT_TIMER = "timer" EVENT_NONE = "none" DEFAULT_EVENT_TYPE = EVENT_NOTIFICATION DEFAULT_UPDATE_FREQ = 10.0 # Hz _TIMERS: Dict[int, TcTimer] = {} # Store timers, keyed by interval in ms def __init__(self, *args, **kwargs): """ Note: these methods do not affect when or how a value is _written_ to the ADS pool. :param args: :param kwargs: See list below - kwargs are passed along to `connect_symbol` too :kwargs: * `symbol`: ``Symbol`` to link to (i.e. to read from and/or write to) * `format`: Formatter symbol, e.g. '%.1f' or '%d' or callable ('%.3f' by default, ignored when not relevant) Callable must have a single argument * `event_type`: Possible values are EVENT_* constants (default: ``DEFAULT_EVENT_TYPE``) * `update_freq`: Frequency (Hz) for timed update (for EVENT_TIMER only, default: ``DEFAULT_UPDATE_FREQ``) * `greyed`: When true, the widget is visibly disabled When false, the widget is shown normally even when disconnected (default: `true`) """ self._symbol: Optional[Symbol] = kwargs.pop("symbol", None) # Included None fallback self.value_format: Union[str, Callable[[Any], str]] = kwargs.pop( "format", "%.3f" ) self.event_type = kwargs.pop("event_type", self.DEFAULT_EVENT_TYPE) self.update_freq = kwargs.pop("update_freq", self.DEFAULT_UPDATE_FREQ) self.greyed = kwargs.pop("greyed", True) self._handles: Optional[Tuple[int, int]] = None # Handles to the notification specific for this widget self._skip_event: bool = False # If true, QWidget events should not result in # a change of the ADS symbol self._last_update: Optional[float] = None # Timestamp of the last successful # twincat_send(), used to prevent an event loop self._timer: Optional[TcTimer] = None # Reference to the linked TcTimer self._last_value: Optional[Any] = None # For timed event loop if self._symbol: self.connect_symbol(self._symbol, **kwargs) # Connect already if passed
[docs] def connect_symbol(self, new_symbol: Optional[Symbol] = None, **kwargs) -> bool: """Connect a symbol (copy of symbol is left as property). By default a device callback is created with an on-change event from TwinCAT. Old callbacks are deleted first. Pass None to only clear callbacks. The notification handles are stored locally. Extend (= override but call the parent first) this method to configure more of the widget, useful if e.g. widget callbacks depend on the symbol. :param new_symbol: Symbol to link to (set None to only clear the previous) :param kwargs: See list below - Keyword arguments are passed along as device notification settings too :return: `True` if new symbol was connected :kwargs: * `event_type`: See :class:`TcElement` * `update_freq`: See :class:`TcElement` """ if self._symbol is not None: if self._handles is not None: self._symbol.del_device_notification(self._handles) # In case previous callback existed, clear it self._handles = None if self._timer is not None: self._timer.remove_element(self) if self._timer.get_number_of_widgets() == 0: del self._TIMERS[self._timer.interval] self._timer = None # Clear reference self._symbol = new_symbol if "event_type" in kwargs: self.event_type = kwargs.pop("event_type") if "update_freq" in kwargs: self.update_freq = kwargs.pop("update_freq") if new_symbol is None: return False if self.event_type == self.EVENT_NOTIFICATION: self._handles = self._symbol.add_device_notification( self.twincat_receive_wrapper, **kwargs ) # It seems a notification is always fired on creation, so we don't # need to call it now return self._handles is not None elif self.event_type == self.EVENT_TIMER: m_sec = int(1000 / self.update_freq) if m_sec not in self._TIMERS: plc = ( self._symbol[0]._plc if isinstance(self._symbol, list) else self._symbol._plc ) self._TIMERS[m_sec] = self.make_timer(plc, m_sec) self._timer = self._TIMERS[m_sec] # Keep local reference self._timer.add_element(self) return True elif self.event_type == self.EVENT_NONE: return True # Nothing to be done ValueError("Unrecognized event type: " + self.event_type)
[docs] @staticmethod def make_timer(plc, interval): """Create timer instance - to be extended by a different implementation""" return TcTimer(plc, interval)
[docs] def twincat_receive_wrapper(self, value): """Intermediate twincat_receive callback to prevent event loops.""" # If incoming value equals old buffered value if value == self._symbol.value and self._last_update is not None: elapsed_ms = (time.time() - self._last_update) * 1000 if elapsed_ms < 50: # If within 50 ms of the last update, discard this notification # This is typically a callback after a twincat_send() return self.twincat_receive(value)
[docs] def on_mass_timeout(self): """Callback for the event timer. This assumes the remote read was already performed! """ # We use the buffered _symbol.value (assume it was updated externally) # This is slightly risky, because the buffered value could be changed by # something else too new_val = self._symbol._value if new_val != self._last_value: # Trigger incoming-value callback self.twincat_receive(new_val) self._last_value = new_val
[docs] @abstractmethod def twincat_receive(self, value): """Callback attached to the TwinCAT symbol. Note: changing a state of a widget (e.g. checkbox being checked through `setChecked(True)`) will typically fire the on-change events again. So be careful to prevent an event loop when updating a widget based on a remote change: a change could result in a state change, which could result in a remote change, etc. :param value: New remote value """
[docs] def twincat_send(self, value: Any): """Set value in symbol (and send to TwinCAT). Method is safe: if symbol is not connected, nothing will happen. """ self._last_update = time.time() # Current floating point timestamp if self._symbol is not None: # Safe for unconnected symbols self._symbol.set(value)
[docs] def format(self, value: Any) -> str: """ "Use the stored formatting to created a formatted text. In case the format specifier is a string and the new value is a list, element-wise string formatting will be concatenated automatically. """ if value is None: return "NaN" if isinstance(self.value_format, str): if isinstance(value, list): elements = [self.value_format % item for item in value] return ", ".join(elements) return self.value_format % value if callable(self.value_format): return self.value_format(value) raise NotImplementedError( "The format `{}` could not be processed".format(self.value_format) )
def __del__(self): """Destructor.""" if self._symbol is not None: try: # Element is about to become extinct, so clear callbacks self._symbol.clear_device_notifications() except (ADSError, KeyError): pass # Quietly continue, nothing we could do now if self._timer is not None: self._timer.remove_element(self)
class RepeatingTimer(Timer): """Simple extension of the threading timer class, only it keeps firing""" def run(self) -> None: while not self.finished.wait(self.interval): self.function(*self.args, **self.kwargs) class TcTimer: """Timer object which can trigger an update for multiple TcElements. Uses reading by list to get multiple values using a single request. A single instance of this class should be made per update rate. Additional TcWidgets can then be registered to existing timer instances. """ def __init__(self, plc: Optional[TwincatConnection], interval: int): """ :param plc: Twincat connection object - a reference will be kept :param interval: Update delay in milliseconds """ super().__init__() self._plc = plc self._interval = interval self._is_active = False self._timer = self.make_timer() self.elements: Set[TcElement] = set() # List of linked widgets self.symbols: Set[Symbol] = set() # List of symbols of those widgets def make_timer(self) -> RepeatingTimer: """Extendable method to create the timer object""" timer = RepeatingTimer(0.001 * self._interval, self.on_timeout) return timer def start(self): """Start underlying timer""" self._timer.start() self._is_active = True def stop(self): """Stop the underlying timer""" self._timer.cancel() self._is_active = False @property def interval(self): return self._interval def on_error(self, error: ADSError): print("ADS Error:", error) return # Nothing we can do def on_timeout(self): """Callback for this timer. Will do combined update for all linked widgets. """ if self._plc is None: return # Nothing we could possible do try: # Perform combined read self._plc.read_list_of_symbols(list(self.symbols)) # This will update all the `_value` properties of the symbols except ADSError as err: self.stop() self.on_error(err) return # Abort this loop # Trigger updates for all linked widgets for element in self.elements: element.on_mass_timeout() def add_element(self, element: TcElement): """Register a new element to be updated by this timer. Starts timer if it wasn't running yet. :param element: New element """ self.elements.add(element) new_symbol = element._symbol # We will keep a ready array of symbols so we won't have to find them on # every timeout. This should be a little bit faster if isinstance(new_symbol, list): for sym in new_symbol: self.symbols.add(sym) elif new_symbol is not None: self.symbols.add(new_symbol) if not self._is_active: self.start() return len(self.elements) def remove_element(self, element: TcElement): """De-register a new widget from this timer. Will also stop timer if no widgets remain. :param element: Element to remove """ if element in self.elements: self.elements.remove(element) symbol = element._symbol # We will keep a ready array of symbols so we won't have to find them on # every timeout. This should be a little bit faster if isinstance(symbol, list): for sym in symbol: if sym in self.symbols: self.symbols.remove(sym) elif symbol is not None: if symbol in self.symbols: self.symbols.remove(symbol) if not self.elements: self.stop() # Stop timer if no widgets remain def get_number_of_widgets(self) -> int: """ :return: Number of linked widgets """ return len(self.elements)