Source code for src.core.experiment

"""Experiment data and execution manager.

An `Experiment` is an object which keeps tracks of all the parts which compose
an actual measurement, including instruments, the actions performed on them, the
data returned by them, and real-time graphs of the data.
"""

import cPickle as pickle
import logging
import math
import numpy as np
import os
import threading
import time

from src import settings
from src.core import instrument as instmod
from src.core.action import (ActionContainer, ActionThread,
                             ActionLoopUntilInterrupt, PARAM_ID)
from src.core.errors import (InstrumentInUseError, GeneralExperimentError)
from src.core.graph import AbstractGraphManager
from src.tools.general import formatReSTHeading
import src.tools.parsing as parsing

log = logging.getLogger('transport')

MARK_CONSTANT = '@'
MARK_PARAMETER = '$'
MARK_COLUMN = '#'
SUB_CONSTANT = '@(%s)'
SUB_PARAMETER = '$(%s)'
SUB_COLUMN = '#(%s)'

_SUPPORTED_FUNCTIONS = {'abs': 'abs',
                        'sin': 'math.sin',
                        'cos': 'math.cos',
                        'tan': 'math.tan',
                        'arcsin': 'math.asin',
                        'arccos': 'math.acos',
                        'arctan': 'math.atan',
                        'arctan2': 'math.atan2',
                        'deg2rad': 'math.radians',
                        'rad2deg': 'math.degrees',
                        'sinh': 'math.sinh',
                        'cosh': 'math.cosh',
                        'tanh': 'math.tanh',
                        'arcsinh': 'math.asinh',
                        'arccosh': 'math.acosh',
                        'arctanh': 'math.atanh',
                        'pow': 'math.pow',
                        'sqrt': 'math.sqrt',
                        'exp': 'math.exp',
                        'log': 'math.log',
                        'log10': 'math.log10',
                        'ceil': 'math.ceil',
                        'floor': 'math.floor'}

_EXTS_DATA = settings.EXTS_DATA
_EXTS_PARAMETERS = settings.EXTS_PARAMETERS
_EXTS_IMAGE = settings.EXTS_IMAGE

[docs]class Experiment(object): """The main driver class of an experiment. The `Experiment` class is responsible for managing instruments; the actions those instruments should perform; and the collection, storage, saving, and graphing of data. """ def __init__(self): log.info('Creating an experiment...') # The database of constants defined before the experiment is run. All # values will be coerced to floats. self._constants = {} # The list of instruments in the experiment, initialized to contain # the two built-in instruments. self._instruments = [instmod.System(self), instmod.Postprocessor(self)] # A tree structure for containing the experiment's action sequence. self._sequence = ActionContainer(self, None, 'root', None) # A dictionary of parameters self._parameters = {} # A list for storing parameter data before the file has been opened. self._parameterBuffer = [] # A dictionary for column data. They keys are the names of the columns, # and the values are sub-dictionaries with the following keys: # curr : str # The most recently acquired value, already formatted. # colindex : int # The position in the **ordered** array where the data from this # column should go. # graphx : list of `Graph` # The list of graphs which should receive x-values whenever this # column is updated. # graphy : list of `Graph` # The list of graphs which should receive y-values whenever this # column is updated. # graphadd : list of `Graph` # The list of graphs which should produce next plots whenever # this column is updated. self._columns = {} # An array containing column names in the order in which they should be # written to the file. self._columnArray = [] self._lastColumnIndex = -1 # The experiment's list of `Graph` objects. self._graphs = [] # A buffer for holding data while a simultaneous block is running. self._tempBuffer = None # The basic file name, without an extension, for forming the filenames # for the actual filenames, and the extension to be used for the data # files. self._filenameBase = '' self._extension = '.xdat' # Files to which data will be written. self._dataFile = None self._parameterFile = None self._graphFile = None # List of tuples containing the paths of all files which have been # opened during the experiment. self._allFiles = [] # Flags describing the status of the experiment. In order, they are # [running, paused, interrupted]. The third is for stopping the # indefinite-loop type actions. self._status = [False, False, False] # A StatusMonitor for displaying experiment history to the user. self._statusMonitor = None # Lists of commands to execute before and after the experiment. They # are set by whatever creates the experiment, and their primary purpose # is to update indicators in a GUI. self._preSequenceActions = None self._postSequenceActions = None # Lists of commands to execute before and after an interruptable loop. # They are set by whatever creates the experiment. self._loopEnterCommands = None self._loopExitCommands = None # A list of ActionPostprocessor objects which will be executed at the # end of the experiment (during the post-sequence). self._postprocessorActions = [] # A subclass of `AbstractGraphManager` for managing the threads for # the graphs, an instance of that class, and the frame which should # be the parent for the graphing frames. self._graphManagerClass = AbstractGraphManager self._graphManager = None self._parentFrame = None #-------------------------------------------------------- Instrument control
[docs] def getInstrumentStrings(self): """Return the names of the instruments in this experiment. Returns ------- list of str A list of strings representing the names of the `Instrument` objects which used in this experiment. """ return [instrument.getName() for instrument in self._instruments]
[docs] def addInstrument(self, instrument): """Add an instrument to this experiment. Parameters ---------- instrument : Instrument The `Instrument` object which should be added to the experiment. """ self._instruments.append(instrument)
[docs] def removeInstrument(self, instrument): """Remove an instrument from this experiment. Parameters ---------- instrument : Instrument The instrument which should be removed from the experiment. """ uses = self._checkForInstrumentUsage(instrument) if len(uses) == 0: self._instruments.remove(instrument) else: raise InstrumentInUseError(uses)
[docs] def getInstrument(self, index): """Return the instrument at the specified index. Parameters ---------- index : int The index of the `Instrument` which should be returned. Returns ------- Instrument The instrument located at the specified position. """ return self._instruments[index]
[docs] def getEqualEnoughInstruments(self, action): """Return a list of instruments which implement the given action. Sometimes, the user may want to change the instrument which carries out a particular action. However, in order to change the instrument, it is necessary that the new instrument should have the *ability* to carry out the action. This method returns a list of dictionaries with information about the instruments (and their respective actions) which could potentially be used as a replacement for the instrument associated with the specified action. Parameters ---------- action : Action The `Action` instance whose `Instrument` the user wants to replace. Returns ------- list of dict A list of dictionaries. Each dictionary represents information about one `Action` which could replace `action`. The elements of the dictionary are the following: instrument_index The index of the instrument in the experiment's list. instrument The actual `Instrument` object. instrument_name The name of the instrument. action_index The index of the `Action` object which can replace `action`. The index corresponds to the location in the list of actions returned by the instrument's `getActions` method. action An `ActionSpec` representing the `Action` which can replace the specified `action`. Notes ----- Two `Action` objects are said to be "equal enough" if they have the same description and the same parameters. Two `Parameter` objects are said to be the same if they are identical in all *immutable* aspects (that is, they have the same name, the same description, and the same format string). """ result = [] for index, inst in enumerate(self._instruments): equalEnough = inst.getEqualEnoughAction(action) if equalEnough is not None: result.append({'instrument_index': index, 'instrument': inst, 'instrument_name': inst.getName(), 'action_index': equalEnough[0], 'action': equalEnough[1]}) return result #------------------------------------------------------ Constants management
[docs] def setConstant(self, name, value): """Set the value of a constant, creating it if necessary. Parameters ---------- name : str The name of the constant whose value should be set. value : float The new value for the constant. """ self._constants[name] = value
[docs] def removeConstant(self, name): """Delete a defined constant. Parameters ---------- name : str The name of the constant which should be deleted. """ del self._constants[name]
[docs] def getConstant(self, name): """Return the value of the named constant. Parameters ---------- name : str The name of the constant whose value is desired. Returns ------- float The value of the requested constant, or `None` if the constant does not exist. """ try: return self._constants[name] except KeyError: return None
[docs] def getAllConstants(self): """Return a copy of the constants dictionary. Returns ------- dict A dictionary of constants in which the keys (str) are the names of the constants, and the values (float) are the values associated with the relevant constant. """ return self._constants.copy()
[docs] def renameConstant(self, oldName, newName): """Rename the specified constant. Rename a constant and update all expressions in conditional loops and calculations to reflect the name change. Parameters ---------- oldName : str The name of the constant to whose name should be changed. newName : str The new name of the constant. """ self._constants[newName] = self._constants.pop(oldName) self._updateEvaluations(oldName, 'constant', newName, 'constant')
[docs] def addDefaultConstants(self): """Add pi and the natural logarithm base to the constant dictionary.""" self._constants['pi'] = math.pi self._constants['e'] = math.e #-------------------------------------------------------- Data storage setup
[docs] def handleStorageBins(self, oldName, oldType, newName, newType): """Automatically handle the addition, removal, or renaming of a bin. If the new name is `None` or '', delete the old bin. If the old name is `None` or '', create the new bin. If the old name and the new name are are both non-trivial, the old name is changed to the new name, and the types are switched if necessary. Parameters ---------- oldName : str The name of the column or parameter to be removed. If none is to be removed, this should be either `None` or an empty string. oldType : str The type ("column" or "parameter") of the bin to be removed. If a bin is to be created and none removed, this value is ignored. newName : str The name of the column or parameter to be created. If none is to be created, this should be either `None` or an empty string. newType : str The type ("column" or "parameter") of the bin to be created. If none is to be created, this value is ignored. """ if oldName is None: oldName = '' if newName is None: newName = '' oldName = oldName.strip() newName = newName.strip() if oldName == newName and oldType == newType: return if oldName == '': self._addStorageBin(newName, newType) log.debug('Adding %s %s', newType, newName) return if newName == '': self._deleteStorageBin(oldName, oldType) log.debug('Removing %s %s', oldType, oldName) return self._renameStorageBin(oldName, oldType, newName, newType) log.debug('Replacing %s %s with %s %s', oldType, oldName, newType, newName) log.debug('New columns are: ' + str(self._columns.keys()))
def _addStorageBin(self, name, binType): """Create a column or parameter. If the bin to create is a column, and a missing column of the same name is the reason a graph is disabled, bind the new column to the graph and reenable it. Parameters ---------- name : str The name of the bin to create. binType : str The type of the bin to create. It may be either 'parameter' or 'column'. If `binType` is not recognized, it defaults to 'column'. """ name = name.strip() if binType == 'parameter': if name not in self._parameters: self._parameters[name] = 0 return True else: if name not in self._columns: self._columns[name] = {'curr' : '', 'colindex' : None, 'graphx' : [], 'graphy' : [], 'graphadd' : []} for graph in self._graphs: if not graph.isEnabled(): reenable = True graphColumns = graph.getColumns() for graphColumn in graphColumns: if (graphColumn not in self._columns and graphColumn is not None): reenable = False if reenable: graph.setEnabled(True) if name == graphColumns[0]: self._columns[name]['graphx'].append(graph) if name == graphColumns[1]: self._columns[name]['graphy'].append(graph) if name == graphColumns[2]: self._columns[name]['graphadd'].append(graph) log.info('Reenabling graph %s.', graph.getTitle()) return True return False def _renameStorageBin(self, oldName, oldType, newName, newType): """Rename a storage bin. Rename a storage bin, and then update all calculations, conditional expressions, and graphs to reflect the change in name. If after renaming one instance of `oldName` there are no `Action` instances referring to `oldName`, then delete all data bins associated with `oldName`. If an attempt is made to change a column to a parameter and if that column is bound to a graph, disable the graph. Parameters ---------- oldName : str The name of the bin to rename. oldType : str The type of the bin to rename, which may be either 'parameter' or 'column'. newName : str The new name for the bin. newType : str The new type for the bin, which may be either 'parameter' or 'column'. """ if oldType == 'column': graphs = (self._columns[oldName]['graphx'], self._columns[oldName]['graphy'], self._columns[oldName]['graphadd']) self._addStorageBin(newName, newType) if self._deleteStorageBin(oldName, oldType, False): self._updateEvaluations(oldName, oldType, newName, newType) if oldType == 'column': if newType == 'column': for graph in graphs[0]: graph.updateColumnsIfNecessary(oldName, newName) for graph in graphs[1]: graph.updateColumnsIfNecessary(oldName, newName) for graph in graphs[2]: graph.updateColumnsIfNecessary(oldName, newName) self._columns[newName]['graphx'] = graphs[0] self._columns[newName]['graphy'] = graphs[1] self._columns[newName]['graphadd'] = graphs[2] if newType == 'parameter': for graph in graphs[0] + graphs[1] + graphs[2]: graph.setEnabled(False) log.warn(('Graph %s is being disabled because ' 'it requires data from the column %s, ' 'which is being changed to a parameter.'), graph.getTitle(), oldName) def _deleteStorageBin(self, name, binType, checkGraphs=True): """Delete the specified storage bin. Delete the specified storage bin if and only if only one action in the sequence refers to the bin at the time the command is issued. If a column bound to a graph is deleted, then disable the bound graph, unless this behavior is overridden by setting `checkGraphs` to `False`. Parameters ---------- name : str The name of the storage bin to remove. binType : str The type of the storage bin to remove. Allowed values are "column" and "parameter". checkGraphs : bool Whether to check the graph list to see whether any of the graphs require the column which is about to be deleted. If this flag is set to `True` and a graph does require the column, that graph will be disabled. Returns ------- bool Whether the bin was actually deleted. """ graphs = [] if checkGraphs: graphs = (self._columns[name]['graphx'] + self._columns[name]['graphy'] + self._columns[name]['graphadd']) num = self._scanSequenceForStorageName(name, binType) if num < 1: if binType == 'parameter' and name in self._parameters: del self._parameters[name] elif binType == 'column' and name in self._columns: for graph in graphs: graph.setEnabled(False) log.warn(('Graph %s is being disabled because it ' 'receives data from column %s, which ' 'is being deleted.'), graph.getTitle(), name) del self._columns[name] return True return False def _prepareColumns(self): """Order the columns. Automatically scan through the list of actions to put the columns into the appropriate order, so that the order in the data file reflects the order in which the actions are executed. After this is run, each element in the column dictionary should contain an index which points to a position in the column array. The column array is filled in with the names of the columns in the order in which they will be written to the data file. """ num = [0] def numberColumns(act): """Number the columns used by the specified action.""" if act is None: return cols = act.getInputColumns() + act.getOutputColumns() for col in cols: if col not in self._columnArray and col in self._columns: self._columns[col]['colindex'] = num[0] self._columnArray.append(col) if __debug__: log.debug('Numbering column %s to %d.', col, num[0]) num[0] += 1 self._traverse(numberColumns)
[docs] def getStorageBinNames(self): """Get the names of all data storage bins. Returns ------- list of str A list of the names of all constants in the experiment. list of str A list of the names of all columns in the experiment. list of str A list of the names of all parameters in the experiment. """ return (list(self._constants.keys()), list(self._columns.keys()), list(self._parameters.keys()))
[docs] def getStorageBinNamesString(self): """Return a formatted string representing the storage bin names. Returns ------- str A three-line string, where the first line is a list of constants, the second is a list of columns, and the third is a list of parameters. """ return ('constants: %s\ncolumns: %s\nparameters: %s' % self.getStorageBinNames())
[docs] def getColumnDetails(self): """Return a formatted string containing a list of column details. Returns ------- str A string containing details about the current status of each data column. """ allElements = [] for column in self._columns: data = self._columns[column] elements = ['Column %s ----------------' % column, 'Current: %s' % data['curr'], 'Index: %s' % data['colindex'], 'Graph X: %s' % ['[' + g.getTitle() + ']' for g in data['graphx']], 'Graph Y: %s' % ['[' + g.getTitle() + ']' for g in data['graphy']], 'Graph Add: %s' % ['[' + g.getTitle() + ']' for g in data['graphadd']]] allElements.append('\n'.join(elements)) return '\n'.join(allElements) #------------------------------------------------------- Actual data storage
[docs] def activateTemporaryBuffer(self): """Create a temporary buffer for simultaneous blocks. Create a temporary buffer, with the same keys as the main column dictionary, for storing data while running a simultaneous block. This alleviates the danger of getting goofy garbage in the data file due to, e.g., race conditions. The principal problem this averts is writing data to the file prematurely, which would happen if the actions return in the wrong order. """ self._tempBuffer = dict.fromkeys(self._columns) for column in self._columns: self._tempBuffer[column] = []
[docs] def deactivateTemporaryBuffer(self): """Save the data from the temporary buffer, then destroy the buffer. Take the data stored in the temporary buffer while running the simultaneous block. If the same amount of data has been added for each column, add them to the appropriate column bins. Otherwise, save them as a separate file in the same folder as the data. Then destroy the temporary buffer. """ tempBuffer = self._tempBuffer self._tempBuffer = None sameSize = True length = None toKeep = {} indices = [] for name, column in tempBuffer.iteritems(): columnLength = len(column) if columnLength > 0: toKeep[name] = column indices.append((name, self._columns[name]['colindex'])) if length is None: length = columnLength elif length != columnLength: sameSize = False if sameSize: sortedColumns = sorted(indices, key=lambda colname: colname[1]) for index in range(length): for column in sortedColumns: name = column[0] value = tempBuffer[name][index] self.saveData('column', name, value) else: log.error('Unable to merge simultaneous block columns. ' 'The data is being saved to a separate file for you ' 'to sort out.') with open(self._filenameBase + '_simblock.txt', 'a') as simFile: simFile.write('-------------') for name, column in toKeep.iteritems(): simFile.write(name) simFile.write(str(column))
[docs] def saveData(self, binType, binName, value): """Add data returned from actions to the appropriate dictionary. Take the input from the execute commands bound to an `Action` object and add it to the data column dictionary, the parameter dictionary, or the temporary buffer as appropriate. Parameters ---------- binType : str Where the data should go (either "parameter" or "column"). binName : str The name of the column or parameter. value : str The **formatted** value to be saved to the bin specified by the above parameters. """ if binType is None or binType == '' or binName == '': return if binType == 'parameter': self._parameters[binName] = value try: self._parameterFile.write('%s: %s\n' % (binName, value)) except (AttributeError, IOError): log.warn(('Cannot write %s: %s. Perhaps the parameter file ' + 'is not open.'), binName, value) self._parameterBuffer.append('%s: %s\n' % (binName, value)) elif self._tempBuffer is not None: self._tempBuffer[binName].append(value) else: cols = self._columns columnIndex = cols[binName]['colindex'] if columnIndex <= self._lastColumnIndex: self._dataFile.write('\t'.join([cols[index]['curr'] for index in self._columnArray]) + '\n') self._lastColumnIndex = columnIndex currcol = cols[binName] currcol['curr'] = value for graph in currcol['graphx']: graph.addX(value) for graph in currcol['graphy']: graph.addY(value) for graph in currcol['graphadd']: graph.flagNewPlot()
[docs] def checkExpression(self, expr): """Check the syntactic validity of the supplied expression. Substitute numeric values for columns, constants, and parameters and attempt to evaluate the given expression to determine whether the syntax of the expression is correct. Parameters ---------- expr : str The expression to evaluate in string form. Returns ------- bool `True` if the expression's syntax is valid, or `False` otherwise. """ try: for name in self._constants: key = SUB_CONSTANT % name expr = expr.replace(key, str(self._constants[name])) for name in self._columns: key = SUB_COLUMN % name expr = expr.replace(key, str(np.random.rand())) for name in self._parameters: key = SUB_PARAMETER % name expr = expr.replace(key, str(np.random.rand())) for name in _SUPPORTED_FUNCTIONS: newName = _SUPPORTED_FUNCTIONS[name] expr = expr.replace(name + '(', newName + '(') expr = expr.replace(name + ' (', newName + '(') float(eval(expr)) return True except SyntaxError: return False
[docs] def evaluateExpression(self, expr, conditional=False): """Evaluate a given expression. Substitute all defined constants and the most recent values of all columns and parameters into `expr`, evaluate it, and return the outcome. This is mainly used by the `Calculate` action from the `System` class. Parameters ---------- expr : str A string giving the expression which should be evaluated. conditional : bool Whether the expression should evaluate to a boolean (`True` or `False`). The default is `False`, meaning that the expression should evaluate to a number. Returns ------- float or bool If `conditional` is `True`, the boolean to which the supplied expression evaluates when known data have been substituted. Otherwise, the **number** to which the supplied expression evaluates. """ try: for name in self._constants: key = SUB_CONSTANT % name expr = expr.replace(key, str(self._constants[name])) if self._tempBuffer is not None: for name in self._tempBuffer: if len(self._tempBuffer[name]) > 0: key = SUB_COLUMN % name expr = expr.replace(key, self._tempBuffer[name][-1]) for name in self._columns: key = SUB_COLUMN % name expr = expr.replace(key, self._columns[name]['curr']) for name in self._parameters: key = SUB_PARAMETER % name expr = expr.replace(key, self._parameters[name]) for name in _SUPPORTED_FUNCTIONS: newName = _SUPPORTED_FUNCTIONS[name] expr = expr.replace(name + '(', newName + '(') expr = expr.replace(name + ' (', newName + '(') ans = eval(expr) if conditional: if isinstance(ans, bool): return ans else: return float(ans) except (TypeError, ValueError, SyntaxError), err: if conditional: log.error('Cannot evaluate conditional [%s]. ' 'Returning False.\n>>>>%s', expr, err) return False log.error('Cannot evaluate expression [%s]. Returning NaN\n>>>>%s.', expr, err) return float('nan') #----------------------------------------------------------- Action sequence
[docs] def getActionRoot(self): """Return the root of the action tree. Returns ------- ActionContainer The `ActionContainer` object which serves as the root of the sequence tree. """ return self._sequence
[docs] def addPostprocessorAction(self, action): """Add a Postprocessor action to be executed after the experiment. Parameters ---------- action : ActionPostprocessor An action to execute at the end of the experiment. """ self._postprocessorActions.append(action)
def _checkSequenceForErrors(self): """Check the sequence for problems. Returns ------- list of tuple of str A list of tuples, where each tuple contains two elements. The first is either 'warning' or 'error', depending on the severity of the problem, and the second is a message giving more detail about the problem. """ fileOpen = [False] answer = [] definedColumns = [] definedParameters = [] def checkAuxActions(action): """Check each action for problems.""" if not action.isEnabled(): return name = action.getName() inputProperties = action.getInputProperties() outputProperties = action.getOutputProperties() if action.getName() == 'set_file': value = inputProperties[0]['value'] if not os.path.exists(os.path.normpath(value)): answer.append('Folder "%s" does not exist.' % value) fileOpen[0] = True elif name == 'calculate' or name == 'loop_while': if name == 'calculate': expression = inputProperties[0]['value'] else: expression = action.getExpression() allData = (self._constants.keys(), self._columns.keys(), self._parameters.keys()) definedData = (definedColumns, definedParameters) answer.extend(_checkExpressionForErrors(expression, allData, definedData)) if not self.checkExpression(expression): answer.append(('error', 'Syntax error in expression [%s].' % expression)) newBins = _getCreatedBins(inputProperties, outputProperties) definedColumns.extend(newBins[0]) definedParameters.extend(newBins[1]) if not fileOpen[0] and len(definedColumns) > 0: answer.append(('error', 'Writing to columns before ' + 'a file is opened: ' + str(definedColumns))) self._traverse(checkAuxActions) for item in self._graphs: toDisable = False for column in item.getColumns(): if column is not None and column not in definedColumns: toDisable = True if toDisable: item.setEnabled(False) if not item.isEnabled(): answer.append(('warning', 'Graph [%s] is disabled.' % item.getTitle())) return answer def _checkForInstrumentUsage(self, instrument): """Return a list of actions which use the specified instrument. Parameters ---------- instrument : Instrument The `Instrument` object whose usage is to be determined. Returns ------- list of str A list of strings representing the actions which rely on `instrument`. """ usageData = [instrument, []] def checkAux(action): """Helper function to check a single instrument.""" if action.getInstrument() is usageData[0]: usageData[1].append(str(action)) self._traverse(checkAux) return usageData[1] def _scanSequenceForStorageName(self, name, binType): """Count occurrences of the specified bin in the action sequence. Search through the action tree for references to the bin named `name` of type `binType`, and return the total number of matches. Parameters ---------- name : str The name of the bin for which to search. binType : str The type of the bin named `name`. May be either 'column' or 'parameter'. Returns ------- int The number of times the bin of type `binType` and name `name` occurs in the action sequence tree. """ name = name.strip() if binType == 'parameter': name = PARAM_ID + name count = [0] def counter(action): """Helper function to count occurrences in each action.""" colnames = action.getInputColumns() + action.getOutputColumns() for colname in colnames: if colname == name: count[0] += 1 self._traverse(counter) return count[0] def _updateEvaluations(self, oldName, oldType, newName, newType): """Change evaluations to reflect bin name changes. Run through the actions defined in the experiment. Update any calculation or conditional which refers to the bin `oldName` to refer to the bin `newName` instead. Parameters ---------- oldName : str The old name of the data bin. oldType : str The type of the old bin, which may be 'constant', 'column', or 'parameter'. newName : str The type name of the data bin. newType : str The type of the new bin, which may be 'constant', 'column', or 'parameter'. """ if oldType == 'constant': mOld = SUB_CONSTANT % oldName elif oldType == 'parameter': mOld = SUB_PARAMETER % oldName else: mOld = SUB_COLUMN % oldName if newType == 'constant': mNew = SUB_CONSTANT % newName elif newType == 'parameter': mNew = SUB_PARAMETER % newName else: mNew = SUB_COLUMN % newName columnData = [mOld, mNew] def updateName(act): """Helper to update the name in a single conditional.""" if (act.getDescription() == 'Calculate' or act.getDescription() == 'Conditional interrupt'): act.replaceStringInInput(0, columnData[0], columnData[1]) self._traverse(updateName) #------------------------------------------------------------ Graphs control
[docs] def getGraphStrings(self): """Return a list of the names of the experiment's graphs. Returns ------- list of str A list of strings, where each string is the name of a graph in the experiment. """ return [graph.getTitle() for graph in self._graphs]
[docs] def getGraphStringsAndStates(self): """Return the names of graphs and whether each graph is enabled. Returns ------- str The title of the graph. bool Whether the graph is enabled. """ return [(graph.getTitle(), graph.isEnabled()) for graph in self._graphs]
[docs] def getGraph(self, index): """Return the graph at the specified index. Parameters ---------- index : int The position of the desired graph in the experiment's list. Returns ------- Graph The graph at the specified position. """ return self._graphs[index]
[docs] def addGraph(self, graph): """Add a graph to the experiment. Store `graph` in the list, and bind it to the columns which will supply its data. Parameters ---------- graph : Graph A `Graph` object to add to the experiment. """ cols = graph.getColumns() self._graphs.append(graph) self._columns[cols[0]]['graphx'].append(graph) self._columns[cols[1]]['graphy'].append(graph) if cols[2] is not None: self._columns[cols[2]]['graphadd'].append(graph)
[docs] def removeGraph(self, graph): """Remove a graph from the experiment First, remove `graph` from all columns which reference it. Then delete it from the list of graphs. Parameters ---------- graph : Graph The `Graph` object to remove from the list. """ for col in self._columns: if graph in self._columns[col]['graphx']: self._columns[col]['graphx'].remove(graph) if graph in self._columns[col]['graphy']: self._columns[col]['graphy'].remove(graph) if graph in self._columns[col]['graphadd']: self._columns[col]['graphadd'].remove(graph) self._graphs.remove(graph)
[docs] def updateGraphColumns(self, graph): """Fix the column dictionary so that the graph is in the right slots. Remove the graph from the list (which includes unbinding it from the old columns), and then add it back, automatically placing the graph in the slots for the newly-specified columns. Parameters ---------- graph : Graph The `Graph` object whose associated columns have changed. """ self.removeGraph(graph) self.addGraph(graph)
def _updateGraphLabels(self, oldLabel, newLabel): """Update the axes' labels to reflect column name changes. Parameters ---------- oldLabel : str The name of the column from which the graph previously received data. newLabel : str The name of the column from which the graph should now receive data. """ for graph in self._graphs: graph.updateColumnsIfNecessary(oldLabel, newLabel) #-------------------------------------------------------------- File control
[docs] def setFilenames(self, basePath): """Set the filenames and open the files. Use the supplied `baseName` to generate names for the data and parameter files, as well as the graph image file if applicable, by appending the appropriate extensions. Then open the files. Write the column headers to the data file and dump the parameter buffer into the parameter file. Important: All folders in the path to the files must already exist. Parameters ---------- basePath : str The path where the data files should be stored. It should include all folders as well as a filename. The extension may be left off, but the only extensions that will be understood are ".xdat" (the default), ".txt" and ".dat". """ if self._dataFile is not None: self._closeFiles() found = False for ext in _EXTS_DATA: dotext = '.' + ext if basePath.endswith(dotext): self._extension = ext self._filenameBase = basePath[:-len(dotext)] found = True if not found: self._filenameBase = basePath self._extension = _EXTS_DATA[0] # Open the data file and write the headers. filename = self._filenameBase + '.' + self._extension log.info('Opening data file: ' + filename) self._dataFile = open(filename, 'w', 0) headers = [None] * len(self._columnArray) for columnName in self._columns: headers[self._columns[columnName]['colindex']] = columnName self._dataFile.write('\t'.join(headers) + '\n') # Open the parameter file and dump the parameter buffer. filename = self._filenameBase + '.' + _EXTS_PARAMETERS[0] log.info('Opening parameter file: ' + filename) self._parameterFile = open(filename, 'w', 0) for line in self._parameterBuffer: self._parameterFile.write(line + '\n') self._parameterBuffer = []
def _closeFiles(self): """Close the files. First, write the last row of data to the file (since writing to the file is normally determined by collisions, the last line will never be written without this). Then close the data and parameter files if they are open. Then attempt to save any graphs. """ if self._dataFile is not None: self._dataFile.write('\t'.join([self._columns[name]['curr'] for name in self._columnArray]) + '\n') self._dataFile.close() self._dataFile = None if self._parameterFile is not None: self._parameterFile.close() self._parameterFile = None try: self._graphManager.saveGraphs(self._filenameBase + '.' + _EXTS_IMAGE[0]) self._allFiles.append((self._filenameBase + '.' + self._extension, self._filenameBase + '.' + _EXTS_PARAMETERS[0], self._filenameBase + '.' + _EXTS_IMAGE[0])) except (ValueError, NotImplementedError, AttributeError), err: self._allFiles.append((self._filenameBase + '.' + self._extension, self._filenameBase + '.' + _EXTS_PARAMETERS[0], None)) log.error('Problem with graph manager. Graphs cannot be saved.' + '\n>>>>%s', err)
[docs] def getFiles(self): """Return a list of all files used so far in the experiment. Returns ------- list of tuple of str A list of tuples. Each tuple represents one set of files opened and contains three strings. The first is a data file, the second is a parameter file, and the third is a graph file (the third can be `None` if no graph file was saved). """ return list(self._allFiles) #------------------------------------------ Interaction with user interfaces
[docs] def setInteractionParameters(self, **kwargs): """Set how the experiment should communicate with the outside. These parameters can be set only using keyword arguments. Any parameter which takes a list can be disabled by setting it to an empty list. Any parameter which takes something else can be disabled by setting it to `None`. Parameters ---------- parentFrame : Frame A graphical frame which will be passed as the parent for the graph frames, if applicable. graphManagerClass : GraphManager A class (not an instance) to use for managing the graph threads, if applicable. graphManager : GraphManager A GraphManager **object** for managing the graph threads. If this is set, it will be preferred over creating a graphManagerClass. preSequenceCommands : list of Command A list of `Command` objects which should be executed immediately before the experiment begins to run. postSequenceCommands : list of Command A list of `Command` objects which should be executed immediately after the experiment has finished. statusMonitor : statusMonitor The new `StatusMonitor` object for the instruments to display information to the user. loopEnterCommands : list of Command The `Command` objects to execute before any other actions when the sequence enters an interruptable loop. loopExitCommands : list of Command The `Command` objects to execute after any other actions when an interruptable loop has been interrupted. """ if 'parentFrame' in kwargs: self._parentFrame = kwargs['parentFrame'] if 'graphManager' in kwargs: self._graphManager = kwargs['graphManager'] if 'graphManagerClass' in kwargs: self._graphManagerClass = kwargs['graphManagerClass'] if 'preSequenceCommands' in kwargs: self._preSequenceActions = kwargs['preSequenceCommands'] if 'postSequenceCommands' in kwargs: self._postSequenceActions = kwargs['postSequenceCommands'] if 'statusMonitor' in kwargs: self._statusMonitor = kwargs['statusMonitor'] if 'loopEnterCommands' in kwargs: self._loopEnterCommands = kwargs['loopEnterCommands'] if 'loopExitCommands' in kwargs: self._loopExitCommands = kwargs['loopExitCommands'] #------------------------------------------------------ Experiment execution
[docs] def run(self, errorCheck=True): """Perform the pre-sequence actions and begin the experiment. Parameters ---------- errorCheck : bool Whether to check for errors before beginning execution. The default is `True`. Raises ------ GeneralExperimentError An exception containing a list of warnings and errors about problems with the experiment. This is only raised if `errorCheck` is `True`. """ if not self._status[0]: if errorCheck: errors = self._checkSequenceForErrors() if len(errors) > 0: log.error('Errors detected.') for error in errors: log.error('Severity: [%s]. Message: [%s]', *error) raise GeneralExperimentError(errors) else: log.warn('Running experiment without checking for errors.') self._preSequence() mainThread = ExecutionThread(self) mainThread.start() else: log.error('Cannot run: the sequence is already running.')
[docs] def pause(self): """Pause the experiment.""" log.info('Pausing the experiment.') self._status[1] = True
[docs] def resume(self): """Unpause the experiment.""" log.info('Resuming the experiment.') self._status[1] = False
[docs] def abort(self): """Stop the experiment and run the post-sequence actions.""" if self._status[0]: self._status[0] = False time.sleep(1) self._postSequence()
[docs] def interruptLoop(self): """Interrupt a running loop.""" self._status[2] = True
[docs] def isRunning(self): """Return whether the experiment is running. Returns ------- Whether the experiment is running. """ return self._status[0]
[docs] def isPaused(self): """Return whether the experiment has been paused. Returns ------- bool Whether the experiment is paused. """ return self._status[1]
[docs] def isInterrupted(self): """Return whether the next loop should be interrupted. Returns ------- bool Whether a loop has been manually interrupted. """ if self._status[2]: self._status[2] = False return True return False
def _preSequence(self): """Prepare the experiment to execute.""" log.info('Starting pre-sequence...') # Set the "running" flag to true. self._status[0] = True # Execute any pre-sequence actions specified by the user. if self._preSequenceActions is not None: try: for action in self._preSequenceActions: action.execute() except TypeError: log.error('Invalid pre-sequence action list') # Initialize the instruments. self._parameterBuffer.append(formatReSTHeading('Instruments', 0)) for instrument in self._instruments: instrument.initialize() if self._statusMonitor is not None: instrument.setStatusMonitor(self._statusMonitor) self._parameterBuffer.append(instrument.getInformation()) self._parameterBuffer.append('') # Write the defined data to the parameter buffer. self._parameterBuffer.append(formatReSTHeading('Constants', 0)) for constant in self._constants.items(): self._parameterBuffer.append('%s : %f' % constant) self._parameterBuffer.append('') self._parameterBuffer.append(formatReSTHeading('Sequence', 0)) self._parameterBuffer.append(self._sequence.getTreeString(0)) self._parameterBuffer.append('') self._parameterBuffer.append(formatReSTHeading('Parameters', 0)) # Prepare all actions for execution (including ensuring that they # refer to this experiment). self._sequence.setExperiment(self) def applyActionCommands(action): """Set the pre- and post-loop actions for interruptable loops.""" if isinstance(action, ActionLoopUntilInterrupt): action.setLoopCommands(self._loopEnterCommands, self._loopExitCommands) self._traverse(applyActionCommands) if self._statusMonitor is not None: self._sequence.setStatusMonitor(self._statusMonitor) self._sequence.prepareToExecute() # Prepare columns to store data (i.e. number them). self._prepareColumns() # Prepare and start the graph manager. try: if self._graphManagerClass is None: pass elif self._graphManager is None: self._graphManager = self._graphManagerClass(self._parentFrame) if self._graphManager is not None: enabledGraphs = [gph for gph in self._graphs if gph.isEnabled()] self._graphManager.setGraphs(enabledGraphs) self._graphManager.start() time.sleep(1) except (ValueError, NotImplementedError, TypeError), err: log.error('Problem with graph manager...proceeding without ' + 'graphs.\n>>>>%s', err) log.info('Pre-sequence finished.') def _postSequence(self): """Free resources and clear stored data.""" log.info('Starting post-sequence...') # Finalize all instruments for instrument in self._instruments: instrument.finalize() # Abort the graph manager and clear graph data try: self._graphManager.abort() except (AttributeError, TypeError, ValueError, NotImplementedError): log.error('Problem with graph manager...cannot abort.') time.sleep(1) for graph in self._graphs: graph.clear() # Close files (and save graphs, if applicable) self._closeFiles() # Execute postprocessorAction objects for action in self._postprocessorActions: action.executeReal() self._postprocessorActions = [] self._allFiles = [] # Delete the convenience attributes in the `Action` objects self._sequence.cleanupAfterExecution() # Empty the column and parameter information storage. for colname in self._columns: col = self._columns[colname] col['curr'] = '' col['colindex'] = None self._columnArray = [] # Reset the running flag to False self._status[0] = False # Execute user-defined post-sequence actions if self._postSequenceActions is not None: try: for action in self._postSequenceActions: action.execute() except TypeError, err: log.error('Invalid post-sequence action list.\n>>>>%s', err) # Clear the status monitor if self._statusMonitor is not None: self._statusMonitor.clear() log.info('Post-sequence finished.') #------------------------------------------------------------ Helper methods def _traverse(self, func): """Scan the action sequence, applying a function to each element.""" def traverseAux(node, func): """Helper for traverse. Actually does the work.""" if node.allowsChildren(): children = node.getChildren() for child in children: func(child) traverseAux(child, func) func(self._sequence) traverseAux(self._sequence, func) #---------------------------------------------------------- Data persistence def __getstate__(self): """Return a dictionary of the defining properties of the experiment. Returns ------- dict The full dictionary of the class except that those elements are incompatible with `pickle` have been removed. """ odict = self.__dict__.copy() odict['_parentFrame'] = None odict['_graphManager'] = None odict['_graphManagerClass'] = None odict['_preSequenceActions'] = None odict['_postSequenceActions'] = None odict['_loopEnterCommands'] = None odict['_loopExitCommands'] = None odict['_status'] = [False, False, False] odict['_statusMonitor'] = None odict['_postprocessorActions'] = [] return odict def __setstate__(self, dictionary): """Set the dictionary defining the properties of the experiment.""" self.__dict__.update(dictionary)
[docs] def getXML(self): """Return an XML string representing the experiment. Returns ------- str A string containing XML data representing all of the data related to an experiment, possibly useful as an alternative to pickle. """ ans = ['<experiment>'] # Process the constants ans.append(' <constants>') tempformat = ' <constant name="%s" value="%s" />' ans.extend([tempformat % x for x in self._constants.items()]) ans.append(' </constants>') # Process the instruments ans.append(' <instruments>') ans.extend([x.getXML(4) for x in self._instruments]) ans.append(' </instruments>') # Process the actions ans.append(' <sequence>') ans.append(self._sequence.getXML(4)) ans.append(' </sequence>') # Process the graphs ans.append(' <graphs>') ans.extend([x.getXML(4) for x in self._graphs]) ans.append(' </graphs>') ans.append('</experiment>') return '\n'.join(ans)
@classmethod
[docs] def open(cls, filename): """Open an experiment from a file.""" with open(filename, 'rb') as inputFile: expt = pickle.load(inputFile) return expt
@classmethod
[docs] def save(cls, experiment, filename): """Save an experiment to a file.""" with open(filename, 'wb') as outputFile: pickle.dump(experiment, outputFile) #--------------------------------------------------------------- ExecutionThread
[docs]class ExecutionThread(threading.Thread): """Helper class for executing the sequence without blocking the program.""" def __init__(self, experiment): super(ExecutionThread, self).__init__() self.experiment = experiment self.name = 'Experiment Thread Starter'
[docs] def run(self): """Begin executing the main sequence.""" log.info('Main sequence started.') actionThread = ActionThread(self.experiment.getActionRoot(), True) actionThread.name = 'Main Experiment Thread' actionThread.start() actionThread.join() log.info('Main sequence finished.') self.experiment.abort() #-------------------------------------------------------------- Helper functions
def _checkExpressionForErrors(expression, allProperties, definedProperties): """Return errors related to undefined data bins. Parameters ---------- expression : str The expression to check for errors. allProperties : tuple of list of str A tuple of lists. The first list contains the names of all constants defined in the experiment. The second contains the names of all columns, and the third contains the names of all parameters. definedProperties : tuple of list of str A tuple of lists. The first list contains the names of the columns which have been defined by the time the expression is to be evaluated, and the second contains the names of parameters defined up to that same point. Returns ------- list of tuple of str A list of tuples, where each tuple contains two elements. The first is either 'warning' or 'error', depending on the severity of the problem, and the second is a message giving more detail about the problem. """ answer = [] constants, columns, parameters = allProperties definedColumns, definedParameters = definedProperties needConst, needCol, needParam = parsing.extractNames(expression) for item in needConst: if item not in constants: answer.append(('error', 'Undefined constant in ' + 'expression [%s]: [%s]' % (expression, item))) for item in needCol: if item not in columns: answer.append(('error', 'Undefined column in ' + 'expression [%s]: [%s].' % (expression, item))) elif item not in definedColumns: answer.append(('warning', 'Column referenced before ' + 'assignment in expression ' + '[%s]: [%s]. Zero will be used.' % (expression, item))) for item in needParam: if item not in parameters: answer.append(('error', 'Undefined parameter in ' + 'expression [%s]: [%s].' % (expression, item))) elif item not in definedParameters: answer.append(('warning', 'Parameter referenced ' + 'before assignment in expression ' + '[%s]: [%s]. Zero will be used.' % (expression, item))) return answer def _getCreatedBins(inputProperties, outputProperties): """Get the parameters and columns defined by an action. Parameters ---------- inputProperties : list of dict A list of dictionaries of the form returned by the `Action` class's `getInputProperties()` method. outputProperties : list of dict A list of dictionaries of the form returned by the `Action` class's `getOutputProperties()` method. Returns ------- tuple of list of str A tuple consisting of two lists, the first of which names all the columns created by the relevant action, and the second names all the parameters. """ definedColumns = [] definedParameters = [] for item in inputProperties: name = item['column'] if name.startswith(MARK_PARAMETER): parameterName = name[len(MARK_PARAMETER):] if parameterName not in definedParameters: definedParameters.append(parameterName) elif len(name) > 0 and name not in definedColumns: definedColumns.append(name) for item in outputProperties: name = item['column'] if name.startswith(MARK_PARAMETER): parameterName = name[len(MARK_PARAMETER):] if parameterName not in definedParameters: definedParameters.append(parameterName) elif len(name) > 0 and name not in definedColumns: definedColumns.append(name) return (definedColumns, definedParameters)