"""
Here we define all the non-implemented TwinCAT stuff, without any platform.
The `TcElement` class should be extended by a class of another platform, ideally
through multiple inheritance.
"""
from typing import Optional, Any, Tuple, Union, Callable, Dict, Set
from threading import Timer
from abc import ABC, abstractmethod
import time
from pyads import ADSError
from twinpy.twincat.connection import TwincatConnection
from twinpy.twincat.symbols import Symbol
class TcTimer:
pass # Forward-declare the timer class
[docs]class TcElement(ABC):
"""Abstract class for TwinCAT element.
Must be inherited into a new class for something meaningful.
There are different event types, which determine how and when new remote
values are retrieved:
* `EVENT_NOTIFICATION`: An ADS notification is created, resulting in a
callback on a remote value change. Suitable for rarely changing values or
when a very quick response is needed.
ADS notifications have some overhead. No more than 200 symbol
notifications should exist at the same time.
* `EVENT_TIMER`: New values are read at a fixed interval. Useful when
remote values change often but no instant response is needed. This
method has very little overhead.
* `EVENT_NONE`: No attempts are made to update according to remote values.
You can override the defaults for your entire project by changing the
``TcWidget.DEFAULT_EVENT_TYPE`` and ``TcWidget.DEFAULT_UPDATE_FREQ``
class properties. Make sure to update these before any widgets are
created and they will have new standard values.
:cvar DEFAULT_EVENT_TYPE: (default: EVENT_NOTIFICATION)
:cvar DEFAULT_UPDATE_FREQ: (default: 10.0 Hz)
"""
EVENT_NOTIFICATION = "notification"
EVENT_TIMER = "timer"
EVENT_NONE = "none"
DEFAULT_EVENT_TYPE = EVENT_NOTIFICATION
DEFAULT_UPDATE_FREQ = 10.0 # Hz
_TIMERS: Dict[int, TcTimer] = {} # Store timers, keyed by interval in ms
def __init__(self, *args, **kwargs):
"""
Note: these methods do not affect when or how a value is _written_ to the ADS
pool.
:param args:
:param kwargs: See list below - kwargs are passed along to
`connect_symbol` too
:kwargs:
* `symbol`: ``Symbol`` to link to
(i.e. to read from and/or write to)
* `format`: Formatter symbol, e.g. '%.1f' or '%d' or callable
('%.3f' by default, ignored when not relevant)
Callable must have a single argument
* `event_type`: Possible values are EVENT_* constants
(default: ``DEFAULT_EVENT_TYPE``)
* `update_freq`: Frequency (Hz) for timed update
(for EVENT_TIMER only, default: ``DEFAULT_UPDATE_FREQ``)
* `greyed`: When true, the widget is visibly disabled
When false, the widget is shown normally even when
disconnected (default: `true`)
"""
self._symbol: Optional[Symbol] = kwargs.pop("symbol", None)
# Included None fallback
self.value_format: Union[str, Callable[[Any], str]] = kwargs.pop(
"format", "%.3f"
)
self.event_type = kwargs.pop("event_type", self.DEFAULT_EVENT_TYPE)
self.update_freq = kwargs.pop("update_freq", self.DEFAULT_UPDATE_FREQ)
self.greyed = kwargs.pop("greyed", True)
self._handles: Optional[Tuple[int, int]] = None
# Handles to the notification specific for this widget
self._skip_event: bool = False # If true, QWidget events should not result in
# a change of the ADS symbol
self._last_update: Optional[float] = None # Timestamp of the last successful
# twincat_send(), used to prevent an event loop
self._timer: Optional[TcTimer] = None # Reference to the linked TcTimer
self._last_value: Optional[Any] = None # For timed event loop
if self._symbol:
self.connect_symbol(self._symbol, **kwargs)
# Connect already if passed
[docs] def connect_symbol(self, new_symbol: Optional[Symbol] = None, **kwargs) -> bool:
"""Connect a symbol (copy of symbol is left as property).
By default a device callback is created with an on-change event
from TwinCAT.
Old callbacks are deleted first. Pass None to only clear callbacks.
The notification handles are stored locally.
Extend (= override but call the parent first) this method to
configure more of the widget, useful if e.g. widget callbacks depend
on the symbol.
:param new_symbol: Symbol to link to (set None to only clear the
previous)
:param kwargs: See list below - Keyword arguments are passed along as
device notification settings too
:return: `True` if new symbol was connected
:kwargs:
* `event_type`: See :class:`TcElement`
* `update_freq`: See :class:`TcElement`
"""
if self._symbol is not None:
if self._handles is not None:
self._symbol.del_device_notification(self._handles)
# In case previous callback existed, clear it
self._handles = None
if self._timer is not None:
self._timer.remove_element(self)
if self._timer.get_number_of_widgets() == 0:
del self._TIMERS[self._timer.interval]
self._timer = None # Clear reference
self._symbol = new_symbol
if "event_type" in kwargs:
self.event_type = kwargs.pop("event_type")
if "update_freq" in kwargs:
self.update_freq = kwargs.pop("update_freq")
if new_symbol is None:
return False
if self.event_type == self.EVENT_NOTIFICATION:
self._handles = self._symbol.add_device_notification(
self.twincat_receive_wrapper, **kwargs
)
# It seems a notification is always fired on creation, so we don't
# need to call it now
return self._handles is not None
elif self.event_type == self.EVENT_TIMER:
m_sec = int(1000 / self.update_freq)
if m_sec not in self._TIMERS:
plc = (
self._symbol[0]._plc
if isinstance(self._symbol, list)
else self._symbol._plc
)
self._TIMERS[m_sec] = self.make_timer(plc, m_sec)
self._timer = self._TIMERS[m_sec] # Keep local reference
self._timer.add_element(self)
return True
elif self.event_type == self.EVENT_NONE:
return True # Nothing to be done
ValueError("Unrecognized event type: " + self.event_type)
[docs] @staticmethod
def make_timer(plc, interval):
"""Create timer instance - to be extended by a different implementation"""
return TcTimer(plc, interval)
[docs] def twincat_receive_wrapper(self, value):
"""Intermediate twincat_receive callback to prevent event loops."""
# If incoming value equals old buffered value
if value == self._symbol.value and self._last_update is not None:
elapsed_ms = (time.time() - self._last_update) * 1000
if elapsed_ms < 50:
# If within 50 ms of the last update, discard this notification
# This is typically a callback after a twincat_send()
return
self.twincat_receive(value)
[docs] def on_mass_timeout(self):
"""Callback for the event timer.
This assumes the remote read was already performed!
"""
# We use the buffered _symbol.value (assume it was updated externally)
# This is slightly risky, because the buffered value could be changed by
# something else too
new_val = self._symbol._value
if new_val != self._last_value:
# Trigger incoming-value callback
self.twincat_receive(new_val)
self._last_value = new_val
[docs] @abstractmethod
def twincat_receive(self, value):
"""Callback attached to the TwinCAT symbol.
Note: changing a state of a widget (e.g. checkbox being checked through
`setChecked(True)`) will typically fire the on-change events again. So be
careful to prevent an event loop when updating a widget based on a remote
change: a change could result in a state change, which could result in a
remote change, etc.
:param value: New remote value
"""
[docs] def twincat_send(self, value: Any):
"""Set value in symbol (and send to TwinCAT).
Method is safe: if symbol is not connected, nothing will happen.
"""
self._last_update = time.time() # Current floating point timestamp
if self._symbol is not None: # Safe for unconnected symbols
self._symbol.set(value)
def __del__(self):
"""Destructor."""
if self._symbol is not None:
try:
# Element is about to become extinct, so clear callbacks
self._symbol.clear_device_notifications()
except (ADSError, KeyError):
pass # Quietly continue, nothing we could do now
if self._timer is not None:
self._timer.remove_element(self)
class RepeatingTimer(Timer):
"""Simple extension of the threading timer class, only it keeps firing"""
def run(self) -> None:
while not self.finished.wait(self.interval):
self.function(*self.args, **self.kwargs)
class TcTimer:
"""Timer object which can trigger an update for multiple TcElements.
Uses reading by list to get multiple values using a single request.
A single instance of this class should be made per update rate. Additional
TcWidgets can then be registered to existing timer instances.
"""
def __init__(self, plc: Optional[TwincatConnection], interval: int):
"""
:param plc: Twincat connection object - a reference will be kept
:param interval: Update delay in milliseconds
"""
super().__init__()
self._plc = plc
self._interval = interval
self._is_active = False
self._timer = self.make_timer()
self.elements: Set[TcElement] = set() # List of linked widgets
self.symbols: Set[Symbol] = set() # List of symbols of those widgets
def make_timer(self) -> RepeatingTimer:
"""Extendable method to create the timer object"""
timer = RepeatingTimer(0.001 * self._interval, self.on_timeout)
return timer
def start(self):
"""Start underlying timer"""
self._timer.start()
self._is_active = True
def stop(self):
"""Stop the underlying timer"""
self._timer.cancel()
self._is_active = False
@property
def interval(self):
return self._interval
def on_error(self, error: ADSError):
print("ADS Error:", error)
return # Nothing we can do
def on_timeout(self):
"""Callback for this timer.
Will do combined update for all linked widgets.
"""
if self._plc is None:
return # Nothing we could possible do
try:
# Perform combined read
self._plc.read_list_of_symbols(list(self.symbols))
# This will update all the `_value` properties of the symbols
except ADSError as err:
self.stop()
self.on_error(err)
return # Abort this loop
# Trigger updates for all linked widgets
for element in self.elements:
element.on_mass_timeout()
def add_element(self, element: TcElement):
"""Register a new element to be updated by this timer.
Starts timer if it wasn't running yet.
:param element: New element
"""
self.elements.add(element)
new_symbol = element._symbol
# We will keep a ready array of symbols so we won't have to find them on
# every timeout. This should be a little bit faster
if isinstance(new_symbol, list):
for sym in new_symbol:
self.symbols.add(sym)
elif new_symbol is not None:
self.symbols.add(new_symbol)
if not self._is_active:
self.start()
return len(self.elements)
def remove_element(self, element: TcElement):
"""De-register a new widget from this timer.
Will also stop timer if no widgets remain.
:param element: Element to remove
"""
if element in self.elements:
self.elements.remove(element)
symbol = element._symbol
# We will keep a ready array of symbols so we won't have to find them on
# every timeout. This should be a little bit faster
if isinstance(symbol, list):
for sym in symbol:
if sym in self.symbols:
self.symbols.remove(sym)
elif symbol is not None:
if symbol in self.symbols:
self.symbols.remove(symbol)
if not self.elements:
self.stop() # Stop timer if no widgets remain
def get_number_of_widgets(self) -> int:
"""
:return: Number of linked widgets
"""
return len(self.elements)