Source code for src.tools.stability
"""Classes to monitor stability in some value."""
from time import time
from src.tools.general import simpleLinearRegression
[docs]class StabilityTrend(object):
"""A class for tracking stability based on trend.
Parameters
----------
bufferSize : int
The number of data points of which to keep track.
tolerance : float
The maximum deviation from zero which the slope can have while still
being considered stable.
timeout : float
The maximum amount of time to monitor the system.
"""
def __init__(self, bufferSize, tolerance, timeout=None):
self._bufferSize = bufferSize
self._maxAllowed = abs(tolerance)
self._minAllowed = -1.0 * abs(tolerance)
if timeout is None:
self._timeout = float('inf')
else:
self._timeout = timeout
self._times = []
self._values = []
self._length = 0
self._max = None
self._min = None
self._slope = None
self._startTime = time()
[docs] def addPoint(self, value):
"""Add a point to the monitor.
Parameters
----------
value : float
The value to add to the monitor.
"""
newTime = time()
self._times.append(newTime)
self._values.append(value)
if self._length + 1 > self._bufferSize:
self._times.pop(0)
self._values.pop(0)
else:
self._length += 1
stats = simpleLinearRegression(self._times, self._values)
self._slope = stats[0]
self._min = stats[3]
self._max = stats[4]
[docs] def isStable(self):
"""Return whether the system is stable.
Returns
-------
bool
Whether the system is stable.
"""
return self._minAllowed < self._slope < self._maxAllowed
[docs] def isTimedOut(self):
"""Return whether the monitor has timed out.
Returns
-------
bool
Whether the system has passed the timeout.
"""
return time() - self._startTime > self._timeout
[docs] def isBufferFull(self):
"""Return whether the buffer is full.
Returns
-------
bool
Whether the monitor's buffer has been filled.
"""
return self._length >= self._bufferSize
[docs] def isFinished(self):
"""Return whether the wait for stability can end.
Returns
-------
bool
Whether the software can proceed, meaning either that the buffer
is full and the system is stable or that the monitor has timed out.
"""
return (self.isBufferFull() and self.isStable()) or self.isTimedOut()
[docs] def getTrend(self):
"""Return the slope of the line formed by the data.
Returns
-------
float
The slope of the line formed by the data.
"""
return self._slope
[docs]class StabilitySetpoint(object):
"""A class for tracking stability based on proximity to a setpoint.
Parameters
----------
bufferSize : int
The number of data points of which to keep track.
setpoint : float
The desired value for the system values.
tolerance : float
The maximum deviation from zero which the slope can have while still
being considered stable.
timeout : float
The maximum amount of time to monitor the system.
"""
def __init__(self, bufferSize, setpoint, tolerance, timeout=None):
self._bufferSize = bufferSize
self._maxAllowed = setpoint + abs(tolerance)
self._minAllowed = setpoint - abs(tolerance)
if timeout is None:
self._timeout = float('inf')
else:
self._timeout = timeout
self._times = []
self._values = []
self._length = 0
self._max = None
self._min = None
self._slope = None
self._startTime = time()
[docs] def addPoint(self, value):
"""Add a point to the monitor.
Parameters
----------
value : float
The value to add to the monitor.
"""
newTime = time()
self._times.append(newTime)
self._values.append(value)
if self._length + 1 > self._bufferSize:
self._times.pop(0)
self._values.pop(0)
else:
self._length += 1
stats = simpleLinearRegression(self._times, self._values)
self._slope = stats[0]
self._min = stats[3]
self._max = stats[4]
[docs] def isStable(self):
"""Return whether the system is stable.
Returns
-------
bool
Whether the system is stable.
"""
return self._minAllowed < self._min < self._max < self._maxAllowed
[docs] def isTimedOut(self):
"""Return whether the monitor has timed out.
Returns
-------
bool
Whether the system has passed the timeout.
"""
return time() - self._startTime > self._timeout
[docs] def isBufferFull(self):
"""Return whether the buffer is full.
Returns
-------
bool
Whether the monitor's buffer has been filled.
"""
return self._length >= self._bufferSize
[docs] def isFinished(self):
"""Return whether the wait for stability can end.
Returns
-------
bool
Whether the software can proceed, meaning either that the buffer
is full and the system is stable or that the monitor has timed out.
"""
return (self.isBufferFull() and self.isStable()) or self.isTimedOut()
[docs] def getTrend(self):
"""Return the slope of the line formed by the data.
Returns
-------
float
The slope of the line formed by the data.
"""
return self._slope
[docs]class StabilityTimer(object):
"""A timer for checking for stability.
Parameters
----------
duration : float
The time in seconds over which the values should be stable before
the timer asserts that it is finished.
stability : float
The largest separation between the maximum and minimum known values
for which the timer may consider itself to be stable.
timeout : float
The maximum time in seconds to wait for the values to become
stable.
"""
def __init__(self, duration, stability, timeout=None):
self._duration = duration
self._stability = stability
self._timeout = timeout
self._times = []
self._values = []
self._startTime = time()
self._max = float('inf')
self._min = float('-inf')
[docs] def addValue(self, newValue):
"""Associate a new value with the timer.
Parameters
----------
newValue : float
The value to add to the timer.
"""
self._times.append(time())
self._values.append(newValue)
if len(self._values) == 0:
self._max = newValue
self._min = newValue
elif newValue > self._max:
self._max = newValue
elif newValue < self._min:
self._min = newValue
if not self.isStable():
self._times = [self._times[-1]]
self._values = [self._values[-1]]
self._max = newValue
self._min = newValue
[docs] def isStable(self):
"""Return whether the values are stable.
Returns
-------
bool
Whether the values associated with the specified timer are
close enough to be considered stable.
"""
return self._max - self._min < self._stability
[docs] def getStableTime(self):
"""Return how long the timer has been stable.
Returns
-------
float
How long the values associated with the timer have been
close enough to be considered stable. If the values are
*not* stable, return -1.
"""
try:
return self._times[-1] - self._times[0]
except IndexError:
return -1
[docs] def isTimeoutElapsed(self):
"""Return whether the timer has timed out.
Returns
-------
bool
Whether the timer has timed out.
"""
if self._timeout is None:
return False
try:
return self._times[-1] - self._startTime >= self._timeout
except IndexError:
return False
[docs] def isFinished(self):
"""Return whether the desired conditions have been met.
Returns
-------
bool
Whether the desired conditions have been met. Return `True`
if either the timeout has elapsed or the value has remained
stable for the specified length of time. Otherwise, return
`False`.
"""
return self.getStableTime() >= self._duration or self.isTimeoutElapsed()
[docs] def getStats(self):
"""Return the maximum and minimum values and the slope of the line.
Note that whenever a new value is added to the timer, if the new
value is outside the stability range defined by previous values, the
arrays are cleared and the maximum and minimum values are both set to
the newly added value.
Returns
-------
float
The maximum known value.
float
The minimum known value.
float
The slope of the line formed by the known values and the times
at which the respective values were added, in units/s.
"""
slope = simpleLinearRegression(self._times, self._values)[0]
return (self._max, self._min, slope)
[docs]class BufferedStabilityTimer(object):
"""A timer for checking for stability, which keeps only new points.
Parameters
----------
stability : float
The largest separation between the maximum and minimum known values
for which the timer may consider itself to be stable.
bufferSize : int
The maximum number of points to keep at any given time.
timeout : float
The maximum time in seconds to wait for the values to become
stable.
"""
def __init__(self, stability, bufferSize, timeout=None):
self._stability = stability
self._bufferSize = bufferSize
self._timeout = timeout
self._times = []
self._values = []
self._startTime = time()
[docs] def addValue(self, newValue):
"""Associate a new value with the timer.
Parameters
----------
newValue : float
The value to add to the timer.
"""
self._times.append(time())
self._values.append(newValue)
if len(self._times) > self._bufferSize:
self._times.pop(0)
self._values.pop(0)
[docs] def isBufferFull(self):
"""Return whether the buffer is full.
Returns
-------
bool
Whether the buffer is full.
"""
return len(self._times) == self._bufferSize
[docs] def isStable(self):
"""Return whether the values are stable.
Returns
-------
bool
Whether the values associated with the specified timer are
close enough to be considered stable.
"""
return max(self._values) - min(self._values) < self._stability
[docs] def isTimeoutElapsed(self):
"""Return whether the timer has timed out.
Returns
-------
bool
Whether the timer has timed out.
"""
if self._timeout is None:
return False
try:
return self._times[-1] - self._startTime >= self._timeout
except IndexError:
return False
[docs] def isFinished(self):
"""Return whether the desired conditions have been met.
Returns
-------
bool
Whether the desired conditions have been met. Return `True`
if either the timeout has elapsed or the value has remained
stable for the specified length of time. Otherwise, return
`False`.
"""
return self.isBufferFull() or self.isTimeoutElapsed()
[docs] def getStats(self):
"""Return the maximum and minimum values and the slope of the line.
Note that whenever a new value is added to the timer, if the new
value is outside the stability range defined by previous values, the
arrays are cleared and the maximum and minimum values are both set to
the newly added value.
Returns
-------
float
The maximum known value.
float
The minimum known value.
float
The slope of the line formed by the known values and the times
at which the respective values were added, in units/s.
"""
slope = simpleLinearRegression(self._times, self._values)[0]
return (max(self._values), min(self._values), slope)